diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 0dd569e816..f69322f8ec 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -940,6 +940,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1950,6 +1959,7 @@ dependencies = [ "codex-windows-sandbox", "core-foundation 0.9.4", "core_test_support", + "crypto_box", "csv", "ctor 0.6.3", "dirs", @@ -1980,6 +1990,7 @@ dependencies = [ "serde_json", "serial_test", "sha1", + "sha2", "shlex", "similar", "tempfile", @@ -3560,9 +3571,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", + "rand_core 0.6.4", "typenum", ] +[[package]] +name = "crypto_box" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16182b4f39a82ec8a6851155cc4c0cda3065bb1db33651726a29e1951de0f009" +dependencies = [ + "aead", + "blake2", + "crypto_secretbox", + "curve25519-dalek", + "salsa20", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto_secretbox" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1" +dependencies = [ + "aead", + "cipher", + "generic-array", + "poly1305", + "salsa20", + "subtle", + "zeroize", +] + [[package]] name = "csv" version = "1.4.0" @@ -4846,6 +4888,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", + "zeroize", ] [[package]] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 4b5f253337..c610ee34e7 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -216,6 +216,7 @@ color-eyre = "0.6.3" constant_time_eq = "0.3.1" crossbeam-channel = "0.5.15" crossterm = "0.28.1" +crypto_box = { version = "0.9.1", features = ["seal"] } csv = "1.3.1" ctor = "0.6.3" derive_more = "2" diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 539c474aae..7b3acd6460 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -34,6 +34,7 @@ codex-code-mode = { workspace = true } codex-connectors = { workspace = true } codex-config = { workspace = true } codex-core-skills = { workspace = true } +crypto_box = { workspace = true } codex-exec-server = { workspace = true } codex-features = { workspace = true } codex-feedback = { workspace = true } @@ -97,6 +98,7 @@ rmcp = { workspace = true, default-features = false, features = [ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha1 = { workspace = true } +sha2 = { workspace = true } shlex = { workspace = true } similar = { workspace = true } tempfile = { workspace = true } diff --git a/codex-rs/core/src/agent_identity.rs b/codex-rs/core/src/agent_identity.rs index f1f371caf3..4087be32b5 100644 --- a/codex-rs/core/src/agent_identity.rs +++ b/codex-rs/core/src/agent_identity.rs @@ -27,6 +27,10 @@ use tracing::debug; use tracing::info; use tracing::warn; +mod task_registration; + +pub(crate) use task_registration::RegisteredAgentTask; + use crate::config::Config; const AGENT_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15); @@ -119,32 +123,48 @@ impl AgentIdentityManager { return Ok(None); } - let Some(auth) = self.auth_manager.auth().await else { - debug!("skipping agent identity registration because no auth is available"); + let Some((auth, binding)) = self.current_auth_binding().await else { return Ok(None); }; - let Some(binding) = - AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id()) - else { - debug!("skipping agent identity registration because ChatGPT auth is unavailable"); - return Ok(None); - }; + self.ensure_registered_identity_for_binding(&auth, &binding) + .await + .map(Some) + } + async fn ensure_registered_identity_for_binding( + &self, + auth: &CodexAuth, + binding: &AgentIdentityBinding, + ) -> Result { let _guard = self.ensure_lock.lock().await; - if let Some(stored_identity) = self.load_stored_identity(&auth, &binding)? { + if let Some(stored_identity) = self.load_stored_identity(auth, binding)? { info!( agent_runtime_id = %stored_identity.agent_runtime_id, binding_id = %binding.binding_id, "reusing stored agent identity" ); - return Ok(Some(stored_identity)); + return Ok(stored_identity); } - let stored_identity = self.register_agent_identity(&binding).await?; - self.store_identity(&auth, &stored_identity)?; - Ok(Some(stored_identity)) + let stored_identity = self.register_agent_identity(binding).await?; + self.store_identity(auth, &stored_identity)?; + Ok(stored_identity) + } + + async fn current_auth_binding(&self) -> Option<(CodexAuth, AgentIdentityBinding)> { + let Some(auth) = self.auth_manager.auth().await else { + debug!("skipping agent identity flow because no auth is available"); + return None; + }; + + let binding = + AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id()); + if binding.is_none() { + debug!("skipping agent identity flow because ChatGPT auth is unavailable"); + } + binding.map(|binding| (auth, binding)) } async fn register_agent_identity( diff --git a/codex-rs/core/src/agent_identity/task_registration.rs b/codex-rs/core/src/agent_identity/task_registration.rs new file mode 100644 index 0000000000..af39504c10 --- /dev/null +++ b/codex-rs/core/src/agent_identity/task_registration.rs @@ -0,0 +1,367 @@ +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use crypto_box::SecretKey as Curve25519SecretKey; +use ed25519_dalek::Signer as _; +use reqwest::StatusCode; +use serde::Deserialize; +use serde::Serialize; +use sha2::Digest as _; +use sha2::Sha512; +use tracing::debug; +use tracing::info; + +use super::*; + +const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct RegisteredAgentTask { + pub(crate) agent_runtime_id: String, + pub(crate) task_id: String, + pub(crate) registered_at: String, +} + +#[derive(Debug, Serialize)] +struct RegisterTaskRequest { + signature: String, + timestamp: String, +} + +#[derive(Debug, Deserialize)] +struct RegisterTaskResponse { + encrypted_task_id: String, +} + +impl AgentIdentityManager { + pub(crate) async fn register_task(&self) -> Result> { + if !self.feature_enabled { + return Ok(None); + } + + let Some((auth, binding)) = self.current_auth_binding().await else { + return Ok(None); + }; + let stored_identity = self + .ensure_registered_identity_for_binding(&auth, &binding) + .await?; + + let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true); + let request_body = RegisterTaskRequest { + signature: sign_task_registration_payload(&stored_identity, ×tamp)?, + timestamp, + }; + + let client = create_client(); + let urls = + agent_task_registration_urls(&self.chatgpt_base_url, &stored_identity.agent_runtime_id); + + for (index, url) in urls.iter().enumerate() { + let response = client + .post(url) + .bearer_auth(&binding.access_token) + .header("chatgpt-account-id", &binding.chatgpt_account_id) + .json(&request_body) + .timeout(AGENT_TASK_REGISTRATION_TIMEOUT) + .send() + .await + .with_context(|| { + format!("failed to send agent task registration request to {url}") + })?; + + if response.status().is_success() { + let response_body = response + .json::() + .await + .with_context(|| format!("failed to parse agent task response from {url}"))?; + let registered_task = RegisteredAgentTask { + agent_runtime_id: stored_identity.agent_runtime_id.clone(), + task_id: decrypt_task_id_response( + &stored_identity, + &response_body.encrypted_task_id, + )?, + registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + }; + info!( + agent_runtime_id = %registered_task.agent_runtime_id, + task_id = %registered_task.task_id, + "registered agent task" + ); + return Ok(Some(registered_task)); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + let is_last_candidate = index + 1 == urls.len(); + if !is_last_candidate + && matches!( + status, + StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED + ) + { + debug!( + url = %url, + status = %status, + "agent task registration endpoint unavailable at candidate URL; trying fallback" + ); + continue; + } + + anyhow::bail!("agent task registration failed with status {status} from {url}: {body}"); + } + + anyhow::bail!("no candidate URLs were available for agent task registration") + } +} + +fn sign_task_registration_payload( + stored_identity: &StoredAgentIdentity, + timestamp: &str, +) -> Result { + let signing_key = stored_identity.signing_key()?; + let payload = format!("{}:{timestamp}", stored_identity.agent_runtime_id); + Ok(BASE64_STANDARD.encode(signing_key.sign(payload.as_bytes()).to_bytes())) +} + +fn decrypt_task_id_response( + stored_identity: &StoredAgentIdentity, + encrypted_task_id: &str, +) -> Result { + let signing_key = stored_identity.signing_key()?; + let ciphertext = BASE64_STANDARD + .decode(encrypted_task_id) + .context("encrypted task id is not valid base64")?; + let plaintext = curve25519_secret_key_from_signing_key(&signing_key) + .unseal(&ciphertext) + .map_err(|_| anyhow::anyhow!("failed to decrypt encrypted task id"))?; + String::from_utf8(plaintext).context("decrypted task id is not valid UTF-8") +} + +fn curve25519_secret_key_from_signing_key(signing_key: &SigningKey) -> Curve25519SecretKey { + let digest = Sha512::digest(signing_key.to_bytes()); + let mut secret_key = [0u8; 32]; + secret_key.copy_from_slice(&digest[..32]); + secret_key[0] &= 248; + secret_key[31] &= 127; + secret_key[31] |= 64; + Curve25519SecretKey::from(secret_key) +} + +fn agent_task_registration_urls(chatgpt_base_url: &str, agent_runtime_id: &str) -> Vec { + let trimmed = chatgpt_base_url.trim_end_matches('/'); + let path = format!("/v1/agent/{agent_runtime_id}/task/register"); + if let Some(root) = trimmed.strip_suffix("/backend-api") { + return vec![format!("{root}{path}"), format!("{trimmed}{path}")]; + } + vec![format!("{trimmed}{path}")] +} + +#[cfg(test)] +mod tests { + use base64::engine::general_purpose::URL_SAFE_NO_PAD; + use codex_app_server_protocol::AuthMode as ApiAuthMode; + use codex_login::AuthCredentialsStoreMode; + use codex_login::AuthDotJson; + use codex_login::save_auth; + use codex_login::token_data::IdTokenInfo; + use codex_login::token_data::TokenData; + use pretty_assertions::assert_eq; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::header; + use wiremock::matchers::method; + use wiremock::matchers::path; + + use super::*; + + #[tokio::test] + async fn register_task_skips_when_feature_is_disabled() { + let auth_manager = + AuthManager::from_auth_for_testing(make_chatgpt_auth("account-123", Some("user-123"))); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ false, + "https://chatgpt.com/backend-api/".to_string(), + SessionSource::Cli, + ); + + assert_eq!(manager.register_task().await.unwrap(), None); + } + + #[tokio::test] + async fn register_task_skips_for_api_key_auth() { + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test-key")); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + "https://chatgpt.com/backend-api/".to_string(), + SessionSource::Cli, + ); + + assert_eq!(manager.register_task().await.unwrap(), None); + } + + #[tokio::test] + async fn register_task_registers_and_decrypts_plaintext_task_id() { + let server = MockServer::start().await; + let auth = make_chatgpt_auth("account-123", Some("user-123")); + let auth_manager = AuthManager::from_auth_for_testing(auth.clone()); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + format!("{}/backend-api/", server.uri()), + SessionSource::Cli, + ); + let stored_identity = seed_stored_identity(&manager, &auth, "agent_123", "account-123"); + let encrypted_task_id = + encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext"); + + Mock::given(method("POST")) + .and(path("/v1/agent/agent_123/task/register")) + .and(header("authorization", "Bearer access-token-account-123")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "encrypted_task_id": encrypted_task_id, + }))) + .expect(1) + .mount(&server) + .await; + + let task = manager + .register_task() + .await + .unwrap() + .expect("task should be registered"); + + assert_eq!( + task, + RegisteredAgentTask { + agent_runtime_id: "agent_123".to_string(), + task_id: "task_123".to_string(), + registered_at: task.registered_at.clone(), + } + ); + } + + #[tokio::test] + async fn register_task_falls_back_to_backend_api_v1() { + let server = MockServer::start().await; + let auth = make_chatgpt_auth("account-123", Some("user-123")); + let auth_manager = AuthManager::from_auth_for_testing(auth.clone()); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + format!("{}/backend-api/", server.uri()), + SessionSource::Cli, + ); + let stored_identity = + seed_stored_identity(&manager, &auth, "agent_fallback", "account-123"); + let encrypted_task_id = encrypt_task_id_for_identity(&stored_identity, "task_fallback") + .expect("task ciphertext"); + + Mock::given(method("POST")) + .and(path("/v1/agent/agent_fallback/task/register")) + .respond_with(ResponseTemplate::new(404)) + .expect(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/backend-api/v1/agent/agent_fallback/task/register")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "encrypted_task_id": encrypted_task_id, + }))) + .expect(1) + .mount(&server) + .await; + + let task = manager + .register_task() + .await + .unwrap() + .expect("task should be registered"); + + assert_eq!(task.agent_runtime_id, "agent_fallback"); + assert_eq!(task.task_id, "task_fallback"); + } + + fn seed_stored_identity( + manager: &AgentIdentityManager, + auth: &CodexAuth, + agent_runtime_id: &str, + account_id: &str, + ) -> StoredAgentIdentity { + let key_material = generate_agent_key_material().expect("key material"); + let binding = AgentIdentityBinding::from_auth(auth, None).expect("binding"); + let stored_identity = StoredAgentIdentity { + binding_id: binding.binding_id.clone(), + chatgpt_account_id: account_id.to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: agent_runtime_id.to_string(), + private_key_pkcs8_base64: key_material.private_key_pkcs8_base64, + public_key_ssh: key_material.public_key_ssh, + registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + abom: manager.abom.clone(), + }; + manager + .store_identity(auth, &stored_identity) + .expect("store identity"); + let persisted = auth + .get_agent_identity(account_id) + .expect("persisted identity"); + assert_eq!(persisted.agent_runtime_id, agent_runtime_id); + stored_identity + } + + fn encrypt_task_id_for_identity( + stored_identity: &StoredAgentIdentity, + task_id: &str, + ) -> Result { + let mut rng = crypto_box::aead::OsRng; + let public_key = + curve25519_secret_key_from_signing_key(&stored_identity.signing_key()?).public_key(); + let ciphertext = public_key + .seal(&mut rng, task_id.as_bytes()) + .map_err(|_| anyhow::anyhow!("failed to encrypt test task id"))?; + Ok(BASE64_STANDARD.encode(ciphertext)) + } + + fn make_chatgpt_auth(account_id: &str, user_id: Option<&str>) -> CodexAuth { + let tempdir = tempfile::tempdir().expect("tempdir"); + let auth_json = AuthDotJson { + auth_mode: Some(ApiAuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(TokenData { + id_token: IdTokenInfo { + email: None, + chatgpt_plan_type: None, + chatgpt_user_id: user_id.map(ToOwned::to_owned), + chatgpt_account_id: Some(account_id.to_string()), + raw_jwt: fake_id_token(account_id, user_id), + }, + access_token: format!("access-token-{account_id}"), + refresh_token: "refresh-token".to_string(), + account_id: Some(account_id.to_string()), + }), + last_refresh: Some(Utc::now()), + agent_identity: None, + }; + save_auth(tempdir.path(), &auth_json, AuthCredentialsStoreMode::File).expect("save auth"); + CodexAuth::from_auth_storage(tempdir.path(), AuthCredentialsStoreMode::File) + .expect("load auth") + .expect("auth") + } + + fn fake_id_token(account_id: &str, user_id: Option<&str>) -> String { + let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#); + let payload = serde_json::json!({ + "https://api.openai.com/auth": { + "chatgpt_user_id": user_id, + "chatgpt_account_id": account_id, + } + }); + let payload = URL_SAFE_NO_PAD.encode(payload.to_string()); + format!("{header}.{payload}.signature") + } +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8533552020..51256e6e13 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -15,6 +15,7 @@ use crate::agent::MailboxReceiver; use crate::agent::agent_status_from_event; use crate::agent::status::is_final; use crate::agent_identity::AgentIdentityManager; +use crate::agent_identity::RegisteredAgentTask; use crate::apps::render_apps_section; use crate::commit_attribution::commit_message_trailer_instruction; use crate::compact; @@ -1528,6 +1529,39 @@ impl Session { handlers::shutdown(self, self.next_internal_sub_id()).await; } + async fn ensure_agent_task_registered(&self) -> anyhow::Result> { + { + let state = self.state.lock().await; + if let Some(agent_task) = state.agent_task() { + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "reusing cached agent task" + ); + return Ok(Some(agent_task)); + } + } + + let Some(agent_task) = self.services.agent_identity_manager.register_task().await? else { + return Ok(None); + }; + { + let mut state = self.state.lock().await; + if let Some(existing_agent_task) = state.agent_task() { + return Ok(Some(existing_agent_task)); + } + state.set_agent_task(agent_task.clone()); + } + + info!( + thread_id = %self.conversation_id, + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "registered agent task for thread" + ); + Ok(Some(agent_task)) + } + #[allow(clippy::too_many_arguments)] fn make_turn_context( conversation_id: ThreadId, @@ -6234,6 +6268,9 @@ pub(crate) async fn run_turn( })) .await; } + if let Err(error) = sess.ensure_agent_task_registered().await { + warn!(error = %error, "agent task registration failed"); + } if !skill_items.is_empty() { sess.record_conversation_items(&turn_context, &skill_items) diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 206f75060c..f712b11f61 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -6,6 +6,7 @@ use codex_sandboxing::policy_transforms::merge_permission_profiles; use std::collections::HashMap; use std::collections::HashSet; +use crate::agent_identity::RegisteredAgentTask; use crate::codex::PreviousTurnSettings; use crate::codex::SessionConfiguration; use crate::context_manager::ContextManager; @@ -30,6 +31,7 @@ pub(crate) struct SessionState { previous_turn_settings: Option, /// Startup prewarmed session prepared during session initialization. pub(crate) startup_prewarm: Option, + pub(crate) agent_task: Option, pub(crate) active_connector_selection: HashSet, pub(crate) pending_session_start_source: Option, granted_permissions: Option, @@ -48,6 +50,7 @@ impl SessionState { mcp_dependency_prompted: HashSet::new(), previous_turn_settings: None, startup_prewarm: None, + agent_task: None, active_connector_selection: HashSet::new(), pending_session_start_source: None, granted_permissions: None, @@ -174,6 +177,14 @@ impl SessionState { self.startup_prewarm.take() } + pub(crate) fn agent_task(&self) -> Option { + self.agent_task.clone() + } + + pub(crate) fn set_agent_task(&mut self, agent_task: RegisteredAgentTask) { + self.agent_task = Some(agent_task); + } + // Adds connector IDs to the active set and returns the merged selection. pub(crate) fn merge_connector_selection(&mut self, connector_ids: I) -> HashSet where diff --git a/codex-rs/core/src/state/session_tests.rs b/codex-rs/core/src/state/session_tests.rs index 1af7ccc8f6..171e39599b 100644 --- a/codex-rs/core/src/state/session_tests.rs +++ b/codex-rs/core/src/state/session_tests.rs @@ -1,4 +1,5 @@ use super::*; +use crate::agent_identity::RegisteredAgentTask; use crate::codex::make_session_configuration_for_tests; use codex_protocol::protocol::CreditsSnapshot; use codex_protocol::protocol::RateLimitWindow; @@ -33,6 +34,21 @@ async fn clear_connector_selection_removes_entries() { assert_eq!(state.get_connector_selection(), HashSet::new()); } +#[tokio::test] +async fn set_agent_task_persists_plaintext_task_for_session_reuse() { + let session_configuration = make_session_configuration_for_tests().await; + let mut state = SessionState::new(session_configuration); + let agent_task = RegisteredAgentTask { + agent_runtime_id: "agent_123".to_string(), + task_id: "task_123".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + }; + + state.set_agent_task(agent_task.clone()); + + assert_eq!(state.agent_task(), Some(agent_task)); +} + #[tokio::test] async fn set_rate_limits_defaults_limit_id_to_codex_when_missing() { let session_configuration = make_session_configuration_for_tests().await;