Register agent tasks behind use_agent_identity

This commit is contained in:
adrian
2026-04-09 12:19:48 -07:00
parent 5589186a8e
commit d4138f3eba
8 changed files with 510 additions and 13 deletions

43
codex-rs/Cargo.lock generated
View File

@@ -940,6 +940,15 @@ dependencies = [
"serde_core",
]
[[package]]
name = "blake2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@@ -1950,6 +1959,7 @@ dependencies = [
"codex-windows-sandbox",
"core-foundation 0.9.4",
"core_test_support",
"crypto_box",
"csv",
"ctor 0.6.3",
"dirs",
@@ -1980,6 +1990,7 @@ dependencies = [
"serde_json",
"serial_test",
"sha1",
"sha2",
"shlex",
"similar",
"tempfile",
@@ -3560,9 +3571,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
dependencies = [
"generic-array",
"rand_core 0.6.4",
"typenum",
]
[[package]]
name = "crypto_box"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16182b4f39a82ec8a6851155cc4c0cda3065bb1db33651726a29e1951de0f009"
dependencies = [
"aead",
"blake2",
"crypto_secretbox",
"curve25519-dalek",
"salsa20",
"subtle",
"zeroize",
]
[[package]]
name = "crypto_secretbox"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1"
dependencies = [
"aead",
"cipher",
"generic-array",
"poly1305",
"salsa20",
"subtle",
"zeroize",
]
[[package]]
name = "csv"
version = "1.4.0"
@@ -4846,6 +4888,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
"zeroize",
]
[[package]]

View File

@@ -216,6 +216,7 @@ color-eyre = "0.6.3"
constant_time_eq = "0.3.1"
crossbeam-channel = "0.5.15"
crossterm = "0.28.1"
crypto_box = { version = "0.9.1", features = ["seal"] }
csv = "1.3.1"
ctor = "0.6.3"
derive_more = "2"

View File

@@ -34,6 +34,7 @@ codex-code-mode = { workspace = true }
codex-connectors = { workspace = true }
codex-config = { workspace = true }
codex-core-skills = { workspace = true }
crypto_box = { workspace = true }
codex-exec-server = { workspace = true }
codex-features = { workspace = true }
codex-feedback = { workspace = true }
@@ -97,6 +98,7 @@ rmcp = { workspace = true, default-features = false, features = [
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha1 = { workspace = true }
sha2 = { workspace = true }
shlex = { workspace = true }
similar = { workspace = true }
tempfile = { workspace = true }

View File

@@ -27,6 +27,10 @@ use tracing::debug;
use tracing::info;
use tracing::warn;
mod task_registration;
pub(crate) use task_registration::RegisteredAgentTask;
use crate::config::Config;
const AGENT_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15);
@@ -119,32 +123,48 @@ impl AgentIdentityManager {
return Ok(None);
}
let Some(auth) = self.auth_manager.auth().await else {
debug!("skipping agent identity registration because no auth is available");
let Some((auth, binding)) = self.current_auth_binding().await else {
return Ok(None);
};
let Some(binding) =
AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id())
else {
debug!("skipping agent identity registration because ChatGPT auth is unavailable");
return Ok(None);
};
self.ensure_registered_identity_for_binding(&auth, &binding)
.await
.map(Some)
}
async fn ensure_registered_identity_for_binding(
&self,
auth: &CodexAuth,
binding: &AgentIdentityBinding,
) -> Result<StoredAgentIdentity> {
let _guard = self.ensure_lock.lock().await;
if let Some(stored_identity) = self.load_stored_identity(&auth, &binding)? {
if let Some(stored_identity) = self.load_stored_identity(auth, binding)? {
info!(
agent_runtime_id = %stored_identity.agent_runtime_id,
binding_id = %binding.binding_id,
"reusing stored agent identity"
);
return Ok(Some(stored_identity));
return Ok(stored_identity);
}
let stored_identity = self.register_agent_identity(&binding).await?;
self.store_identity(&auth, &stored_identity)?;
Ok(Some(stored_identity))
let stored_identity = self.register_agent_identity(binding).await?;
self.store_identity(auth, &stored_identity)?;
Ok(stored_identity)
}
async fn current_auth_binding(&self) -> Option<(CodexAuth, AgentIdentityBinding)> {
let Some(auth) = self.auth_manager.auth().await else {
debug!("skipping agent identity flow because no auth is available");
return None;
};
let binding =
AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id());
if binding.is_none() {
debug!("skipping agent identity flow because ChatGPT auth is unavailable");
}
binding.map(|binding| (auth, binding))
}
async fn register_agent_identity(

View File

@@ -0,0 +1,367 @@
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use crypto_box::SecretKey as Curve25519SecretKey;
use ed25519_dalek::Signer as _;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use sha2::Digest as _;
use sha2::Sha512;
use tracing::debug;
use tracing::info;
use super::*;
const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15);
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct RegisteredAgentTask {
pub(crate) agent_runtime_id: String,
pub(crate) task_id: String,
pub(crate) registered_at: String,
}
#[derive(Debug, Serialize)]
struct RegisterTaskRequest {
signature: String,
timestamp: String,
}
#[derive(Debug, Deserialize)]
struct RegisterTaskResponse {
encrypted_task_id: String,
}
impl AgentIdentityManager {
pub(crate) async fn register_task(&self) -> Result<Option<RegisteredAgentTask>> {
if !self.feature_enabled {
return Ok(None);
}
let Some((auth, binding)) = self.current_auth_binding().await else {
return Ok(None);
};
let stored_identity = self
.ensure_registered_identity_for_binding(&auth, &binding)
.await?;
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
let request_body = RegisterTaskRequest {
signature: sign_task_registration_payload(&stored_identity, &timestamp)?,
timestamp,
};
let client = create_client();
let urls =
agent_task_registration_urls(&self.chatgpt_base_url, &stored_identity.agent_runtime_id);
for (index, url) in urls.iter().enumerate() {
let response = client
.post(url)
.bearer_auth(&binding.access_token)
.header("chatgpt-account-id", &binding.chatgpt_account_id)
.json(&request_body)
.timeout(AGENT_TASK_REGISTRATION_TIMEOUT)
.send()
.await
.with_context(|| {
format!("failed to send agent task registration request to {url}")
})?;
if response.status().is_success() {
let response_body = response
.json::<RegisterTaskResponse>()
.await
.with_context(|| format!("failed to parse agent task response from {url}"))?;
let registered_task = RegisteredAgentTask {
agent_runtime_id: stored_identity.agent_runtime_id.clone(),
task_id: decrypt_task_id_response(
&stored_identity,
&response_body.encrypted_task_id,
)?,
registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
};
info!(
agent_runtime_id = %registered_task.agent_runtime_id,
task_id = %registered_task.task_id,
"registered agent task"
);
return Ok(Some(registered_task));
}
let status = response.status();
let body = response.text().await.unwrap_or_default();
let is_last_candidate = index + 1 == urls.len();
if !is_last_candidate
&& matches!(
status,
StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED
)
{
debug!(
url = %url,
status = %status,
"agent task registration endpoint unavailable at candidate URL; trying fallback"
);
continue;
}
anyhow::bail!("agent task registration failed with status {status} from {url}: {body}");
}
anyhow::bail!("no candidate URLs were available for agent task registration")
}
}
fn sign_task_registration_payload(
stored_identity: &StoredAgentIdentity,
timestamp: &str,
) -> Result<String> {
let signing_key = stored_identity.signing_key()?;
let payload = format!("{}:{timestamp}", stored_identity.agent_runtime_id);
Ok(BASE64_STANDARD.encode(signing_key.sign(payload.as_bytes()).to_bytes()))
}
fn decrypt_task_id_response(
stored_identity: &StoredAgentIdentity,
encrypted_task_id: &str,
) -> Result<String> {
let signing_key = stored_identity.signing_key()?;
let ciphertext = BASE64_STANDARD
.decode(encrypted_task_id)
.context("encrypted task id is not valid base64")?;
let plaintext = curve25519_secret_key_from_signing_key(&signing_key)
.unseal(&ciphertext)
.map_err(|_| anyhow::anyhow!("failed to decrypt encrypted task id"))?;
String::from_utf8(plaintext).context("decrypted task id is not valid UTF-8")
}
fn curve25519_secret_key_from_signing_key(signing_key: &SigningKey) -> Curve25519SecretKey {
let digest = Sha512::digest(signing_key.to_bytes());
let mut secret_key = [0u8; 32];
secret_key.copy_from_slice(&digest[..32]);
secret_key[0] &= 248;
secret_key[31] &= 127;
secret_key[31] |= 64;
Curve25519SecretKey::from(secret_key)
}
fn agent_task_registration_urls(chatgpt_base_url: &str, agent_runtime_id: &str) -> Vec<String> {
let trimmed = chatgpt_base_url.trim_end_matches('/');
let path = format!("/v1/agent/{agent_runtime_id}/task/register");
if let Some(root) = trimmed.strip_suffix("/backend-api") {
return vec![format!("{root}{path}"), format!("{trimmed}{path}")];
}
vec![format!("{trimmed}{path}")]
}
#[cfg(test)]
mod tests {
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use codex_app_server_protocol::AuthMode as ApiAuthMode;
use codex_login::AuthCredentialsStoreMode;
use codex_login::AuthDotJson;
use codex_login::save_auth;
use codex_login::token_data::IdTokenInfo;
use codex_login::token_data::TokenData;
use pretty_assertions::assert_eq;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
#[tokio::test]
async fn register_task_skips_when_feature_is_disabled() {
let auth_manager =
AuthManager::from_auth_for_testing(make_chatgpt_auth("account-123", Some("user-123")));
let manager = AgentIdentityManager::new_for_tests(
auth_manager,
/*feature_enabled*/ false,
"https://chatgpt.com/backend-api/".to_string(),
SessionSource::Cli,
);
assert_eq!(manager.register_task().await.unwrap(), None);
}
#[tokio::test]
async fn register_task_skips_for_api_key_auth() {
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test-key"));
let manager = AgentIdentityManager::new_for_tests(
auth_manager,
/*feature_enabled*/ true,
"https://chatgpt.com/backend-api/".to_string(),
SessionSource::Cli,
);
assert_eq!(manager.register_task().await.unwrap(), None);
}
#[tokio::test]
async fn register_task_registers_and_decrypts_plaintext_task_id() {
let server = MockServer::start().await;
let auth = make_chatgpt_auth("account-123", Some("user-123"));
let auth_manager = AuthManager::from_auth_for_testing(auth.clone());
let manager = AgentIdentityManager::new_for_tests(
auth_manager,
/*feature_enabled*/ true,
format!("{}/backend-api/", server.uri()),
SessionSource::Cli,
);
let stored_identity = seed_stored_identity(&manager, &auth, "agent_123", "account-123");
let encrypted_task_id =
encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext");
Mock::given(method("POST"))
.and(path("/v1/agent/agent_123/task/register"))
.and(header("authorization", "Bearer access-token-account-123"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"encrypted_task_id": encrypted_task_id,
})))
.expect(1)
.mount(&server)
.await;
let task = manager
.register_task()
.await
.unwrap()
.expect("task should be registered");
assert_eq!(
task,
RegisteredAgentTask {
agent_runtime_id: "agent_123".to_string(),
task_id: "task_123".to_string(),
registered_at: task.registered_at.clone(),
}
);
}
#[tokio::test]
async fn register_task_falls_back_to_backend_api_v1() {
let server = MockServer::start().await;
let auth = make_chatgpt_auth("account-123", Some("user-123"));
let auth_manager = AuthManager::from_auth_for_testing(auth.clone());
let manager = AgentIdentityManager::new_for_tests(
auth_manager,
/*feature_enabled*/ true,
format!("{}/backend-api/", server.uri()),
SessionSource::Cli,
);
let stored_identity =
seed_stored_identity(&manager, &auth, "agent_fallback", "account-123");
let encrypted_task_id = encrypt_task_id_for_identity(&stored_identity, "task_fallback")
.expect("task ciphertext");
Mock::given(method("POST"))
.and(path("/v1/agent/agent_fallback/task/register"))
.respond_with(ResponseTemplate::new(404))
.expect(1)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/backend-api/v1/agent/agent_fallback/task/register"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"encrypted_task_id": encrypted_task_id,
})))
.expect(1)
.mount(&server)
.await;
let task = manager
.register_task()
.await
.unwrap()
.expect("task should be registered");
assert_eq!(task.agent_runtime_id, "agent_fallback");
assert_eq!(task.task_id, "task_fallback");
}
fn seed_stored_identity(
manager: &AgentIdentityManager,
auth: &CodexAuth,
agent_runtime_id: &str,
account_id: &str,
) -> StoredAgentIdentity {
let key_material = generate_agent_key_material().expect("key material");
let binding = AgentIdentityBinding::from_auth(auth, None).expect("binding");
let stored_identity = StoredAgentIdentity {
binding_id: binding.binding_id.clone(),
chatgpt_account_id: account_id.to_string(),
chatgpt_user_id: Some("user-123".to_string()),
agent_runtime_id: agent_runtime_id.to_string(),
private_key_pkcs8_base64: key_material.private_key_pkcs8_base64,
public_key_ssh: key_material.public_key_ssh,
registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
abom: manager.abom.clone(),
};
manager
.store_identity(auth, &stored_identity)
.expect("store identity");
let persisted = auth
.get_agent_identity(account_id)
.expect("persisted identity");
assert_eq!(persisted.agent_runtime_id, agent_runtime_id);
stored_identity
}
fn encrypt_task_id_for_identity(
stored_identity: &StoredAgentIdentity,
task_id: &str,
) -> Result<String> {
let mut rng = crypto_box::aead::OsRng;
let public_key =
curve25519_secret_key_from_signing_key(&stored_identity.signing_key()?).public_key();
let ciphertext = public_key
.seal(&mut rng, task_id.as_bytes())
.map_err(|_| anyhow::anyhow!("failed to encrypt test task id"))?;
Ok(BASE64_STANDARD.encode(ciphertext))
}
fn make_chatgpt_auth(account_id: &str, user_id: Option<&str>) -> CodexAuth {
let tempdir = tempfile::tempdir().expect("tempdir");
let auth_json = AuthDotJson {
auth_mode: Some(ApiAuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(TokenData {
id_token: IdTokenInfo {
email: None,
chatgpt_plan_type: None,
chatgpt_user_id: user_id.map(ToOwned::to_owned),
chatgpt_account_id: Some(account_id.to_string()),
raw_jwt: fake_id_token(account_id, user_id),
},
access_token: format!("access-token-{account_id}"),
refresh_token: "refresh-token".to_string(),
account_id: Some(account_id.to_string()),
}),
last_refresh: Some(Utc::now()),
agent_identity: None,
};
save_auth(tempdir.path(), &auth_json, AuthCredentialsStoreMode::File).expect("save auth");
CodexAuth::from_auth_storage(tempdir.path(), AuthCredentialsStoreMode::File)
.expect("load auth")
.expect("auth")
}
fn fake_id_token(account_id: &str, user_id: Option<&str>) -> String {
let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
let payload = serde_json::json!({
"https://api.openai.com/auth": {
"chatgpt_user_id": user_id,
"chatgpt_account_id": account_id,
}
});
let payload = URL_SAFE_NO_PAD.encode(payload.to_string());
format!("{header}.{payload}.signature")
}
}

View File

@@ -15,6 +15,7 @@ use crate::agent::MailboxReceiver;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
use crate::agent_identity::AgentIdentityManager;
use crate::agent_identity::RegisteredAgentTask;
use crate::apps::render_apps_section;
use crate::commit_attribution::commit_message_trailer_instruction;
use crate::compact;
@@ -1528,6 +1529,39 @@ impl Session {
handlers::shutdown(self, self.next_internal_sub_id()).await;
}
async fn ensure_agent_task_registered(&self) -> anyhow::Result<Option<RegisteredAgentTask>> {
{
let state = self.state.lock().await;
if let Some(agent_task) = state.agent_task() {
debug!(
agent_runtime_id = %agent_task.agent_runtime_id,
task_id = %agent_task.task_id,
"reusing cached agent task"
);
return Ok(Some(agent_task));
}
}
let Some(agent_task) = self.services.agent_identity_manager.register_task().await? else {
return Ok(None);
};
{
let mut state = self.state.lock().await;
if let Some(existing_agent_task) = state.agent_task() {
return Ok(Some(existing_agent_task));
}
state.set_agent_task(agent_task.clone());
}
info!(
thread_id = %self.conversation_id,
agent_runtime_id = %agent_task.agent_runtime_id,
task_id = %agent_task.task_id,
"registered agent task for thread"
);
Ok(Some(agent_task))
}
#[allow(clippy::too_many_arguments)]
fn make_turn_context(
conversation_id: ThreadId,
@@ -6234,6 +6268,9 @@ pub(crate) async fn run_turn(
}))
.await;
}
if let Err(error) = sess.ensure_agent_task_registered().await {
warn!(error = %error, "agent task registration failed");
}
if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)

View File

@@ -6,6 +6,7 @@ use codex_sandboxing::policy_transforms::merge_permission_profiles;
use std::collections::HashMap;
use std::collections::HashSet;
use crate::agent_identity::RegisteredAgentTask;
use crate::codex::PreviousTurnSettings;
use crate::codex::SessionConfiguration;
use crate::context_manager::ContextManager;
@@ -30,6 +31,7 @@ pub(crate) struct SessionState {
previous_turn_settings: Option<PreviousTurnSettings>,
/// Startup prewarmed session prepared during session initialization.
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
pub(crate) agent_task: Option<RegisteredAgentTask>,
pub(crate) active_connector_selection: HashSet<String>,
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
granted_permissions: Option<PermissionProfile>,
@@ -48,6 +50,7 @@ impl SessionState {
mcp_dependency_prompted: HashSet::new(),
previous_turn_settings: None,
startup_prewarm: None,
agent_task: None,
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
granted_permissions: None,
@@ -174,6 +177,14 @@ impl SessionState {
self.startup_prewarm.take()
}
pub(crate) fn agent_task(&self) -> Option<RegisteredAgentTask> {
self.agent_task.clone()
}
pub(crate) fn set_agent_task(&mut self, agent_task: RegisteredAgentTask) {
self.agent_task = Some(agent_task);
}
// Adds connector IDs to the active set and returns the merged selection.
pub(crate) fn merge_connector_selection<I>(&mut self, connector_ids: I) -> HashSet<String>
where

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::agent_identity::RegisteredAgentTask;
use crate::codex::make_session_configuration_for_tests;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::RateLimitWindow;
@@ -33,6 +34,21 @@ async fn clear_connector_selection_removes_entries() {
assert_eq!(state.get_connector_selection(), HashSet::new());
}
#[tokio::test]
async fn set_agent_task_persists_plaintext_task_for_session_reuse() {
let session_configuration = make_session_configuration_for_tests().await;
let mut state = SessionState::new(session_configuration);
let agent_task = RegisteredAgentTask {
agent_runtime_id: "agent_123".to_string(),
task_id: "task_123".to_string(),
registered_at: "2026-03-23T12:00:00Z".to_string(),
};
state.set_agent_task(agent_task.clone());
assert_eq!(state.agent_task(), Some(agent_task));
}
#[tokio::test]
async fn set_rate_limits_defaults_limit_id_to_codex_when_missing() {
let session_configuration = make_session_configuration_for_tests().await;