Compare commits

...

1 Commits

Author SHA1 Message Date
adrian
d7f6f4173a [AgentIdentity] Adding support for the AgentIdentity protocol 2026-03-17 09:44:37 -07:00
20 changed files with 1402 additions and 12 deletions

2
MODULE.bazel.lock generated
View File

@@ -787,6 +787,8 @@
"dupe_0.9.1": "{\"dependencies\":[{\"name\":\"dupe_derive\",\"req\":\"=0.9.1\"}],\"features\":{}}",
"dupe_derive_0.9.1": "{\"dependencies\":[{\"name\":\"proc-macro2\",\"req\":\"^1.0\"},{\"name\":\"quote\",\"req\":\"^1.0.3\"},{\"features\":[\"extra-traits\"],\"name\":\"syn\",\"req\":\"^2\"}],\"features\":{}}",
"dyn-clone_1.0.20": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"rustversion\",\"req\":\"^1.0\"},{\"features\":[\"diff\"],\"kind\":\"dev\",\"name\":\"trybuild\",\"req\":\"^1.0.66\"}],\"features\":{}}",
"ed25519-dalek_2.2.0": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"bincode\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"blake2\",\"req\":\"^0.10\"},{\"features\":[\"html_reports\"],\"kind\":\"dev\",\"name\":\"criterion\",\"req\":\"^0.5\"},{\"default_features\":false,\"features\":[\"digest\"],\"name\":\"curve25519-dalek\",\"req\":\"^4\"},{\"default_features\":false,\"features\":[\"digest\",\"rand_core\"],\"kind\":\"dev\",\"name\":\"curve25519-dalek\",\"req\":\"^4\"},{\"default_features\":false,\"name\":\"ed25519\",\"req\":\">=2.2, <2.3\"},{\"kind\":\"dev\",\"name\":\"hex\",\"req\":\"^0.4\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"default_features\":false,\"name\":\"merlin\",\"optional\":true,\"req\":\"^3\"},{\"kind\":\"dev\",\"name\":\"rand\",\"req\":\"^0.8\"},{\"default_features\":false,\"name\":\"rand_core\",\"optional\":true,\"req\":\"^0.6.4\"},{\"default_features\":false,\"kind\":\"dev\",\"name\":\"rand_core\",\"req\":\"^0.6.4\"},{\"default_features\":false,\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"serde\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0\"},{\"default_features\":false,\"name\":\"sha2\",\"req\":\"^0.10\"},{\"kind\":\"dev\",\"name\":\"sha3\",\"req\":\"^0.10\"},{\"default_features\":false,\"name\":\"signature\",\"optional\":true,\"req\":\">=2.0, <2.3\"},{\"default_features\":false,\"name\":\"subtle\",\"req\":\"^2.3.0\"},{\"kind\":\"dev\",\"name\":\"toml\",\"req\":\"^0.7\"},{\"default_features\":false,\"features\":[\"static_secrets\"],\"kind\":\"dev\",\"name\":\"x25519-dalek\",\"req\":\"^2\"},{\"default_features\":false,\"name\":\"zeroize\",\"optional\":true,\"req\":\"^1.5\"}],\"features\":{\"alloc\":[\"curve25519-dalek/alloc\",\"ed25519/alloc\",\"serde?/alloc\",\"zeroize/alloc\"],\"asm\":[\"sha2/asm\"],\"batch\":[\"alloc\",\"merlin\",\"rand_core\"],\"default\":[\"fast\",\"std\",\"zeroize\"],\"digest\":[\"signature/digest\"],\"fast\":[\"curve25519-dalek/precomputed-tables\"],\"hazmat\":[],\"legacy_compatibility\":[\"curve25519-dalek/legacy_compatibility\"],\"pem\":[\"alloc\",\"ed25519/pem\",\"pkcs8\"],\"pkcs8\":[\"ed25519/pkcs8\"],\"rand_core\":[\"dep:rand_core\"],\"serde\":[\"dep:serde\",\"ed25519/serde\"],\"std\":[\"alloc\",\"ed25519/std\",\"serde?/std\",\"sha2/std\"],\"zeroize\":[\"dep:zeroize\",\"curve25519-dalek/zeroize\"]}}",
"ed25519_2.2.3": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"bincode\",\"req\":\"^1\"},{\"features\":[\"rand_core\"],\"kind\":\"dev\",\"name\":\"ed25519-dalek\",\"req\":\"^2\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"name\":\"pkcs8\",\"optional\":true,\"req\":\"^0.10\"},{\"features\":[\"std\"],\"kind\":\"dev\",\"name\":\"rand_core\",\"req\":\"^0.6\"},{\"default_features\":false,\"features\":[\"signature\"],\"kind\":\"dev\",\"name\":\"ring-compat\",\"req\":\"^0.8\"},{\"default_features\":false,\"name\":\"serde\",\"optional\":true,\"req\":\"^1\"},{\"name\":\"serde_bytes\",\"optional\":true,\"req\":\"^0.11\"},{\"default_features\":false,\"name\":\"signature\",\"req\":\"^2\"},{\"default_features\":false,\"name\":\"zeroize\",\"optional\":true,\"req\":\"^1\"}],\"features\":{\"alloc\":[\"pkcs8?/alloc\"],\"default\":[\"std\"],\"pem\":[\"alloc\",\"pkcs8/pem\"],\"serde_bytes\":[\"serde\",\"dep:serde_bytes\"],\"std\":[\"pkcs8?/std\",\"signature/std\"]}}",
"either_1.15.0": "{\"dependencies\":[{\"default_features\":false,\"features\":[\"alloc\",\"derive\"],\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0.95\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0.0\"}],\"features\":{\"default\":[\"std\"],\"std\":[],\"use_std\":[\"std\"]}}",
"ena_0.14.3": "{\"dependencies\":[{\"name\":\"dogged\",\"optional\":true,\"req\":\"^0.2.0\"},{\"name\":\"log\",\"req\":\"^0.4\"}],\"features\":{\"bench\":[],\"persistent\":[\"dogged\"]}}",
"encode_unicode_1.0.0": "{\"dependencies\":[{\"default_features\":false,\"name\":\"ascii\",\"optional\":true,\"req\":\"^1.0.0\"},{\"kind\":\"dev\",\"name\":\"lazy_static\",\"req\":\"^1.0\",\"target\":\"cfg(unix)\"},{\"features\":[\"https-native\"],\"kind\":\"dev\",\"name\":\"minreq\",\"req\":\"^2.6\"}],\"features\":{\"default\":[\"std\"],\"std\":[]}}",

26
codex-rs/Cargo.lock generated
View File

@@ -1872,6 +1872,7 @@ dependencies = [
"ctor 0.6.3",
"dirs",
"dunce",
"ed25519-dalek",
"encoding_rs",
"env-flags",
"eventsource-stream",
@@ -3291,6 +3292,7 @@ dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"digest",
"fiat-crypto",
"rustc_version",
"subtle",
@@ -3771,6 +3773,30 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
[[package]]
name = "ed25519"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
dependencies = [
"pkcs8",
"signature",
]
[[package]]
name = "ed25519-dalek"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9"
dependencies = [
"curve25519-dalek",
"ed25519",
"serde",
"sha2",
"subtle",
"zeroize",
]
[[package]]
name = "either"
version = "1.15.0"

View File

@@ -183,6 +183,7 @@ diffy = "0.4.2"
dirs = "6"
dotenvy = "0.15.7"
dunce = "1.0.4"
ed25519-dalek = { version = "2.2.0", features = ["pkcs8", "pem"] }
encoding_rs = "0.8.35"
fd-lock = "4.0.4"
env-flags = "0.1.1"

View File

@@ -60,6 +60,7 @@ codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-
csv = { workspace = true }
dirs = { workspace = true }
dunce = { workspace = true }
ed25519-dalek = { workspace = true }
encoding_rs = { workspace = true }
env-flags = { workspace = true }
eventsource-stream = { workspace = true }

View File

@@ -497,6 +497,9 @@
"unified_exec": {
"type": "boolean"
},
"use_agent_identity": {
"type": "boolean"
},
"use_legacy_landlock": {
"type": "boolean"
},
@@ -2049,6 +2052,9 @@
"unified_exec": {
"type": "boolean"
},
"use_agent_identity": {
"type": "boolean"
},
"use_legacy_landlock": {
"type": "boolean"
},

View File

@@ -0,0 +1,517 @@
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use chrono::SecondsFormat;
use chrono::Utc;
use ed25519_dalek::Signature;
use ed25519_dalek::Signer;
use ed25519_dalek::SigningKey;
use ed25519_dalek::pkcs8::DecodePrivateKey;
use ed25519_dalek::pkcs8::EncodePrivateKey;
use reqwest::Client;
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::json;
use tracing::info;
use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::CodexAuth;
use crate::config::Config;
use crate::default_client::build_reqwest_client;
use crate::default_client::originator;
use crate::features::Feature;
use codex_secrets::SecretName;
use codex_secrets::SecretScope;
use codex_secrets::SecretsBackendKind;
use codex_secrets::SecretsManager;
const AGENT_IDENTITY_SECRET_NAME: &str = "AGENT_IDENTITY";
#[derive(Clone)]
pub(crate) struct AgentIdentityManager {
auth_manager: Arc<AuthManager>,
http_client: Client,
secrets: SecretsManager,
}
impl AgentIdentityManager {
pub(crate) fn new(auth_manager: Arc<AuthManager>, codex_home: PathBuf) -> Self {
Self {
auth_manager,
http_client: build_reqwest_client(),
secrets: SecretsManager::new(codex_home, SecretsBackendKind::Local),
}
}
pub(crate) async fn ensure_thread_task(
&self,
config: &Config,
thread_id: &str,
) -> Result<Option<String>> {
if !config.features.enabled(Feature::UseAgentIdentity) {
return Ok(None);
}
let Some(auth) = self.auth_manager.auth().await else {
return Ok(None);
};
if !auth.is_chatgpt_auth() {
return Ok(None);
}
let binding_id = binding_id_for_auth(
self.auth_manager.forced_chatgpt_workspace_id(),
auth.get_account_id(),
)
.context("agent identity requires a ChatGPT workspace/account binding")?;
let identity = match self.ensure_agent_identity(config, &auth, &binding_id).await {
Ok(identity) => identity,
Err(err) => {
warn!(binding_id = %binding_id, "failed to ensure agent identity: {err:#}");
return Err(err);
}
};
match self
.register_task(config, &auth, &identity, thread_id)
.await
{
Ok(task_id) => Ok(Some(task_id)),
Err(err) => {
warn!(
binding_id = %binding_id,
"agent task registration failed, deleting stored identity and retrying once: {err:#}"
);
self.delete_stored_identity(&binding_id)?;
let identity = self
.register_agent_identity(config, &auth, &binding_id)
.await
.context("re-registering agent identity after task failure")?;
let task_id = self
.register_task(config, &auth, &identity, thread_id)
.await
.context("registering agent task after agent identity retry")?;
Ok(Some(task_id))
}
}
}
async fn ensure_agent_identity(
&self,
config: &Config,
auth: &CodexAuth,
binding_id: &str,
) -> Result<StoredAgentIdentity> {
if let Some(identity) = self.load_stored_identity(binding_id)? {
trace!(binding_id = %binding_id, "reusing stored agent identity");
return Ok(identity);
}
self.register_agent_identity(config, auth, binding_id).await
}
async fn register_agent_identity(
&self,
config: &Config,
auth: &CodexAuth,
binding_id: &str,
) -> Result<StoredAgentIdentity> {
let key_material = generate_key_material()?;
let body = AgentRegisterRequest {
agent_public_key: key_material.public_key_base64.clone(),
abom: build_abom(),
capabilities: vec!["codex_backend".to_string(), "connector_gateway".to_string()],
metadata: json!({
"originator": originator().value,
"workspace_id": binding_id,
"chatgpt_user_id": auth.get_chatgpt_user_id(),
}),
on_behalf_of: OnBehalfOf {
workspace_id: binding_id.to_string(),
},
};
let response: AgentRegisterResponse = self
.post_json(agent_register_url(&config.chatgpt_base_url), auth, &body)
.await
.context("registering agent identity")?;
let identity = StoredAgentIdentity {
binding_id: binding_id.to_string(),
agent_runtime_id: response.agent_runtime_id,
private_key_pkcs8_base64: key_material.private_key_pkcs8_base64,
public_key_base64: key_material.public_key_base64,
registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
abom: body.abom,
metadata: body.metadata,
};
self.store_identity(&identity)?;
info!(binding_id = %binding_id, "agent identity registration succeeded");
Ok(identity)
}
async fn register_task(
&self,
config: &Config,
auth: &CodexAuth,
identity: &StoredAgentIdentity,
thread_id: &str,
) -> Result<String> {
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
let payload = canonical_signing_payload(&identity.agent_runtime_id, &timestamp);
let signature = sign_payload(&identity.private_key_pkcs8_base64, payload.as_bytes())?;
let body = TaskRegisterRequest {
agent_runtime_id: identity.agent_runtime_id.clone(),
timestamp,
signature,
metadata: json!({
"thread_id": thread_id,
}),
};
let response: TaskRegisterResponse = self
.post_json(task_register_url(&config.chatgpt_base_url), auth, &body)
.await
.context("registering agent task")?;
let task_id = decrypt_task_id(response)?;
info!(thread_id = %thread_id, "agent task registration succeeded");
Ok(task_id)
}
async fn post_json<TReq, TResp>(
&self,
url: String,
auth: &CodexAuth,
body: &TReq,
) -> Result<TResp>
where
TReq: Serialize + ?Sized,
TResp: DeserializeOwned,
{
let token = auth
.get_token()
.context("loading ChatGPT access token for agent identity request")?;
let mut request = self.http_client.post(url).bearer_auth(token).json(body);
if let Some(account_id) = auth.get_account_id() {
request = request.header("ChatGPT-Account-ID", account_id);
}
let response = request
.send()
.await
.context("sending agent identity request")?;
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
anyhow::bail!("agent identity request failed with {status}: {body}");
}
response
.json::<TResp>()
.await
.context("decoding agent identity response")
}
fn load_stored_identity(&self, binding_id: &str) -> Result<Option<StoredAgentIdentity>> {
let secret_name = secret_name()?;
let secret_scope = secret_scope(binding_id)?;
let Some(raw) = self.secrets.get(&secret_scope, &secret_name)? else {
return Ok(None);
};
match serde_json::from_str::<StoredAgentIdentity>(&raw) {
Ok(identity) if identity.binding_id == binding_id => Ok(Some(identity)),
Ok(_) => {
warn!(binding_id = %binding_id, "stored agent identity binding mismatch, deleting cached value");
self.delete_stored_identity(binding_id)?;
Ok(None)
}
Err(err) => {
warn!(binding_id = %binding_id, "failed to parse stored agent identity, deleting cached value: {err:#}");
self.delete_stored_identity(binding_id)?;
Ok(None)
}
}
}
fn store_identity(&self, identity: &StoredAgentIdentity) -> Result<()> {
let secret_name = secret_name()?;
let secret_scope = secret_scope(&identity.binding_id)?;
let raw = serde_json::to_string(identity).context("serializing stored agent identity")?;
self.secrets
.set(&secret_scope, &secret_name, &raw)
.context("persisting stored agent identity")
}
fn delete_stored_identity(&self, binding_id: &str) -> Result<()> {
let secret_name = secret_name()?;
let secret_scope = secret_scope(binding_id)?;
let _ = self
.secrets
.delete(&secret_scope, &secret_name)
.context("deleting stored agent identity")?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct StoredAgentIdentity {
binding_id: String,
agent_runtime_id: String,
private_key_pkcs8_base64: String,
public_key_base64: String,
registered_at: String,
abom: AgentAbom,
metadata: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct AgentAbom {
agent_version: String,
agent_harness_id: String,
running_location: String,
}
#[derive(Debug, Serialize)]
struct AgentRegisterRequest {
agent_public_key: String,
abom: AgentAbom,
capabilities: Vec<String>,
metadata: serde_json::Value,
on_behalf_of: OnBehalfOf,
}
#[derive(Debug, Serialize)]
struct OnBehalfOf {
workspace_id: String,
}
#[derive(Debug, Deserialize)]
struct AgentRegisterResponse {
#[serde(alias = "agent_id", alias = "runtime_identity")]
agent_runtime_id: String,
}
#[derive(Debug, Serialize)]
struct TaskRegisterRequest {
#[serde(rename = "agent_id")]
agent_runtime_id: String,
timestamp: String,
signature: String,
metadata: serde_json::Value,
}
#[derive(Debug, Deserialize)]
struct TaskRegisterResponse {
#[serde(default)]
task_id: Option<String>,
#[serde(default)]
encrypted_task_id: Option<String>,
}
struct GeneratedKeyMaterial {
private_key_pkcs8_base64: String,
public_key_base64: String,
}
fn binding_id_for_auth(
forced_chatgpt_workspace_id: Option<String>,
account_id: Option<String>,
) -> Option<String> {
forced_chatgpt_workspace_id.or(account_id)
}
fn normalized_agent_identity_base_url(chatgpt_base_url: &str) -> String {
let base_url = chatgpt_base_url.trim_end_matches('/');
if base_url.contains("/backend-api") || base_url.contains("/api/codex") {
base_url.to_string()
} else {
format!("{base_url}/backend-api")
}
}
fn agent_register_url(chatgpt_base_url: &str) -> String {
format!(
"{}/agent/register",
normalized_agent_identity_base_url(chatgpt_base_url)
)
}
fn task_register_url(chatgpt_base_url: &str) -> String {
format!(
"{}/task/register",
normalized_agent_identity_base_url(chatgpt_base_url)
)
}
fn secret_name() -> Result<SecretName> {
SecretName::new(AGENT_IDENTITY_SECRET_NAME).context("building agent identity secret name")
}
fn secret_scope(binding_id: &str) -> Result<SecretScope> {
SecretScope::environment(format!("agent-identity-{binding_id}"))
.context("building agent identity secret scope")
}
fn build_abom() -> AgentAbom {
let os_info = os_info::get();
AgentAbom {
agent_version: env!("CARGO_PKG_VERSION").to_string(),
agent_harness_id: originator().value,
running_location: format!(
"{}-{}",
os_info.os_type(),
os_info.architecture().unwrap_or("unknown")
),
}
}
fn generate_key_material() -> Result<GeneratedKeyMaterial> {
let secret_key = rand::random::<[u8; 32]>();
let signing_key = SigningKey::from_bytes(&secret_key);
let private_key_pkcs8_base64 = BASE64_STANDARD.encode(
signing_key
.to_pkcs8_der()
.context("encoding agent identity private key")?
.as_bytes(),
);
let public_key_base64 = URL_SAFE_NO_PAD.encode(signing_key.verifying_key().as_bytes());
Ok(GeneratedKeyMaterial {
private_key_pkcs8_base64,
public_key_base64,
})
}
fn canonical_signing_payload(agent_runtime_id: &str, timestamp: &str) -> String {
format!("{agent_runtime_id}:{timestamp}")
}
fn sign_payload(private_key_pkcs8_base64: &str, payload: &[u8]) -> Result<String> {
let private_key_pkcs8_der = BASE64_STANDARD
.decode(private_key_pkcs8_base64)
.context("decoding agent identity private key")?;
let signing_key = SigningKey::from_pkcs8_der(&private_key_pkcs8_der)
.context("decoding agent identity private key")?;
let signature: Signature = signing_key.sign(payload);
Ok(URL_SAFE_NO_PAD.encode(signature.to_bytes()))
}
fn decrypt_task_id(response: TaskRegisterResponse) -> Result<String> {
if let Some(task_id) = response.task_id {
return Ok(task_id);
}
let encrypted_task_id = response
.encrypted_task_id
.context("task register response was missing both task_id and encrypted_task_id")?;
if let Some(task_id) = encrypted_task_id.strip_prefix("plaintext:") {
return Ok(task_id.to_string());
}
let decoded = BASE64_STANDARD
.decode(&encrypted_task_id)
.or_else(|_| URL_SAFE_NO_PAD.decode(&encrypted_task_id))
.context("decoding encrypted task id envelope")?;
String::from_utf8(decoded).context("decoding encrypted task id UTF-8 payload")
}
#[cfg(test)]
mod tests {
use super::*;
use codex_keyring_store::tests::MockKeyringStore;
use codex_secrets::SecretsManager;
use pretty_assertions::assert_eq;
use std::sync::Arc;
#[test]
fn binding_id_prefers_forced_workspace() {
let binding = binding_id_for_auth(
Some("workspace-123".to_string()),
Some("account-456".to_string()),
);
assert_eq!(binding, Some("workspace-123".to_string()));
}
#[test]
fn signing_payload_is_stable() {
assert_eq!(
canonical_signing_payload("agent-123", "2026-03-16T12:34:56Z"),
"agent-123:2026-03-16T12:34:56Z".to_string()
);
}
#[test]
fn decrypt_task_id_prefers_plaintext_field() {
let task_id = decrypt_task_id(TaskRegisterResponse {
task_id: Some("task-123".to_string()),
encrypted_task_id: Some(BASE64_STANDARD.encode("ignored")),
})
.expect("task id should decode");
assert_eq!(task_id, "task-123".to_string());
}
#[test]
fn decrypt_task_id_decodes_base64_fallback() {
let task_id = decrypt_task_id(TaskRegisterResponse {
task_id: None,
encrypted_task_id: Some(BASE64_STANDARD.encode("task-456")),
})
.expect("task id should decode");
assert_eq!(task_id, "task-456".to_string());
}
#[test]
fn decrypt_task_id_decodes_urlsafe_base64_fallback() {
let task_id = decrypt_task_id(TaskRegisterResponse {
task_id: None,
encrypted_task_id: Some(URL_SAFE_NO_PAD.encode("task-789")),
})
.expect("task id should decode");
assert_eq!(task_id, "task-789".to_string());
}
#[test]
fn stored_identity_round_trips_in_secrets_manager() {
let codex_home = tempfile::tempdir().expect("tempdir");
let keyring = Arc::new(MockKeyringStore::default());
let secrets = SecretsManager::new_with_keyring_store(
codex_home.path().to_path_buf(),
SecretsBackendKind::Local,
keyring,
);
let identity = StoredAgentIdentity {
binding_id: "workspace-123".to_string(),
agent_runtime_id: "agent-123".to_string(),
private_key_pkcs8_base64: "private".to_string(),
public_key_base64: "public".to_string(),
registered_at: "2026-03-16T12:34:56Z".to_string(),
abom: build_abom(),
metadata: json!({"workspace_id": "workspace-123"}),
};
let secret_name = secret_name().expect("secret name");
let secret_scope = secret_scope("workspace-123").expect("secret scope");
let serialized = serde_json::to_string(&identity).expect("serialize identity");
secrets
.set(&secret_scope, &secret_name, &serialized)
.expect("set identity");
let loaded = secrets
.get(&secret_scope, &secret_name)
.expect("get identity")
.expect("missing identity");
let decoded: StoredAgentIdentity = serde_json::from_str(&loaded).expect("decode identity");
assert_eq!(decoded, identity);
}
#[test]
fn secret_scope_is_binding_specific() {
let first = secret_scope("workspace-123").expect("first scope");
let second = secret_scope("workspace-456").expect("second scope");
assert_ne!(first, second);
}
}

View File

@@ -111,6 +111,7 @@ use crate::util::emit_feedback_request_tags;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER: &str = "X-Openai-Internal-Codex-Task-Id";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
@@ -142,6 +143,7 @@ struct ModelClientState {
include_timing_metrics: bool,
beta_features_header: Option<String>,
disable_websockets: AtomicBool,
agent_task_id: StdMutex<Option<String>>,
cached_websocket_session: StdMutex<WebsocketSession>,
}
@@ -276,6 +278,7 @@ impl ModelClient {
include_timing_metrics,
beta_features_header,
disable_websockets: AtomicBool::new(false),
agent_task_id: StdMutex::new(None),
cached_websocket_session: StdMutex::new(WebsocketSession::default()),
}),
}
@@ -310,6 +313,23 @@ impl ModelClient {
.unwrap_or_else(std::sync::PoisonError::into_inner) = websocket_session;
}
pub fn set_agent_task_id(&self, task_id: Option<String>) {
*self
.state
.agent_task_id
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = task_id;
self.store_cached_websocket_session(WebsocketSession::default());
}
fn agent_task_id(&self) -> Option<String> {
self.state
.agent_task_id
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
/// Compacts the current conversation history using the Compact endpoint.
///
/// This is a unary call (no streaming) that returns a new list of
@@ -609,6 +629,7 @@ impl ModelClient {
self.state.beta_features_header.as_deref(),
turn_state,
turn_metadata_header.as_ref(),
self.agent_task_id().as_deref(),
);
if let Ok(header_value) = HeaderValue::from_str(&conversation_id) {
headers.insert("x-client-request-id", header_value);
@@ -733,12 +754,18 @@ impl ModelClientSession {
self.client.state.beta_features_header.as_deref(),
Some(&self.turn_state),
turn_metadata_header.as_ref(),
self.client.agent_task_id().as_deref(),
),
compression,
turn_state: Some(Arc::clone(&self.turn_state)),
}
}
pub(crate) fn set_agent_task_id(&mut self, task_id: Option<String>) {
self.client.set_agent_task_id(task_id);
self.websocket_session = WebsocketSession::default();
}
fn get_incremental_items(
&self,
request: &ResponsesApiRequest,
@@ -1337,6 +1364,7 @@ fn build_responses_headers(
beta_features_header: Option<&str>,
turn_state: Option<&Arc<OnceLock<String>>>,
turn_metadata_header: Option<&HeaderValue>,
agent_task_id: Option<&str>,
) -> ApiHeaderMap {
let mut headers = ApiHeaderMap::new();
if let Some(value) = beta_features_header
@@ -1354,6 +1382,12 @@ fn build_responses_headers(
if let Some(header_value) = turn_metadata_header {
headers.insert(X_CODEX_TURN_METADATA_HEADER, header_value.clone());
}
if let Some(agent_task_id) = agent_task_id
&& let Ok(header_value) = HeaderValue::from_str(agent_task_id)
{
trace!("attaching agent task id to responses request");
headers.insert(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER, header_value);
}
headers
}

View File

@@ -12,6 +12,7 @@ use crate::SandboxState;
use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::agent_identity::AgentIdentityManager;
use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::AppInvocation;
use crate::analytics_client::InvocationType;
@@ -1792,6 +1793,10 @@ impl Session {
config.features.enabled(Feature::RuntimeMetrics),
Self::build_model_client_beta_features_header(config.as_ref()),
),
agent_identity_manager: AgentIdentityManager::new(
Arc::clone(&auth_manager),
config.codex_home.clone(),
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
@@ -2047,6 +2052,72 @@ impl Session {
state.merge_connector_selection(connector_ids)
}
pub(crate) async fn agent_task_id(&self) -> Option<String> {
let state = self.state.lock().await;
state.agent_task_id()
}
pub(crate) async fn ensure_agent_task(&self, turn_context: &TurnContext) -> Option<String> {
if !turn_context
.config
.features
.enabled(Feature::UseAgentIdentity)
{
let had_task = {
let mut state = self.state.lock().await;
let had_task = state.agent_task_id().is_some();
if had_task {
state.set_agent_task_id(None);
}
had_task
};
self.services.model_client.set_agent_task_id(None);
if had_task {
self.refresh_mcp_servers_for_agent_task(turn_context).await;
}
return None;
}
if let Some(task_id) = self.agent_task_id().await {
self.services
.model_client
.set_agent_task_id(Some(task_id.clone()));
return Some(task_id);
}
let task_id = match self
.services
.agent_identity_manager
.ensure_thread_task(
turn_context.config.as_ref(),
&self.conversation_id.to_string(),
)
.await
{
Ok(task_id) => task_id,
Err(err) => {
let message =
format!("Agent identity flow failed; continuing without agent task: {err:#}");
warn!("{message}");
self.send_event(turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
return None;
}
};
let task_id = task_id?;
{
let mut state = self.state.lock().await;
state.set_agent_task_id(Some(task_id.clone()));
}
self.services
.model_client
.set_agent_task_id(Some(task_id.clone()));
self.refresh_mcp_servers_for_agent_task(turn_context).await;
Some(task_id)
}
// Returns the connector IDs currently selected for this session.
pub(crate) async fn get_connector_selection(&self) -> HashSet<String> {
let state = self.state.lock().await;
@@ -2059,6 +2130,20 @@ impl Session {
state.clear_connector_selection();
}
async fn refresh_mcp_servers_for_agent_task(&self, turn_context: &TurnContext) {
let config = self.get_config().await;
let mcp_servers = self
.services
.mcp_manager
.configured_servers(config.as_ref());
self.refresh_mcp_servers_inner(
turn_context,
mcp_servers,
config.mcp_oauth_credentials_store_mode,
)
.await;
}
async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_default_turn().await;
let is_subagent = {
@@ -4012,6 +4097,7 @@ impl Session {
store_mode: OAuthCredentialsStoreMode,
) {
let auth = self.services.auth_manager.auth().await;
let agent_task_id = self.agent_task_id().await;
let config = self.get_config().await;
let tool_plugin_provenance = self
.services
@@ -4022,6 +4108,7 @@ impl Session {
self.features.apps_enabled_for_auth(auth.as_ref()),
auth.as_ref(),
config.as_ref(),
agent_task_id.as_deref(),
);
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await;
let sandbox_state = SandboxState {
@@ -4749,10 +4836,12 @@ mod handlers {
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let auth = sess.services.auth_manager.auth().await;
let mcp_servers = sess
.services
.mcp_manager
.effective_servers(config, auth.as_ref());
let agent_task_id = sess.agent_task_id().await;
let mcp_servers = sess.services.mcp_manager.effective_servers_with_agent_task(
config,
auth.as_ref(),
agent_task_id.as_deref(),
);
let snapshot = collect_mcp_snapshot_from_manager(
&mcp_connection_manager,
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode)
@@ -5495,6 +5584,8 @@ pub(crate) async fn run_turn(
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
sess.ensure_agent_task(turn_context.as_ref()).await;
let loaded_plugins = sess
.services
.plugins_manager
@@ -5673,6 +5764,7 @@ pub(crate) async fn run_turn(
// one instance across retries within this turn.
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
client_session.set_agent_task_id(sess.agent_task_id().await);
loop {
if let Some(session_start_source) = sess.take_pending_session_start_source().await {

View File

@@ -2409,6 +2409,10 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
),
agent_identity_manager: crate::agent_identity::AgentIdentityManager::new(
Arc::clone(&auth_manager),
config.codex_home.clone(),
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
@@ -3199,6 +3203,10 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
),
agent_identity_manager: crate::agent_identity::AgentIdentityManager::new(
Arc::clone(&auth_manager),
config.codex_home.clone(),
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),

View File

@@ -186,7 +186,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
});
}
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config);
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config, None);
if mcp_servers.is_empty() {
return Ok(AccessibleConnectorsStatus {
connectors: Vec::new(),

View File

@@ -188,6 +188,8 @@ pub enum Feature {
ResponsesWebsockets,
/// Enable Responses API websocket v2 mode.
ResponsesWebsocketsV2,
/// Use the agent identity registration + per-thread task flow for ChatGPT auth.
UseAgentIdentity,
}
impl Feature {
@@ -869,6 +871,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::UseAgentIdentity,
key: "use_agent_identity",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
];
/// Push a warning event if any under-development features are enabled.

View File

@@ -5,6 +5,7 @@
// the TUI or the tracing stack).
#![deny(clippy::print_stdout, clippy::print_stderr)]
mod agent_identity;
mod analytics_client;
pub mod api_bridge;
mod apply_patch;
@@ -76,6 +77,7 @@ pub mod token_data;
mod truncate;
mod unified_exec;
pub mod windows_sandbox;
pub use client::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
pub use model_provider_info::DEFAULT_OLLAMA_PORT;

View File

@@ -15,9 +15,11 @@ use codex_protocol::mcp::Tool;
use codex_protocol::protocol::McpListToolsResponseEvent;
use codex_protocol::protocol::SandboxPolicy;
use serde_json::Value;
use tracing::trace;
use crate::AuthManager;
use crate::CodexAuth;
use crate::client::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
use crate::config::Config;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerTransportConfig;
@@ -110,7 +112,10 @@ fn codex_apps_mcp_bearer_token(auth: Option<&CodexAuth>) -> Option<String> {
}
}
fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<String, String>> {
fn codex_apps_mcp_http_headers(
auth: Option<&CodexAuth>,
agent_task_id: Option<&str>,
) -> Option<HashMap<String, String>> {
let mut headers = HashMap::new();
if let Some(token) = codex_apps_mcp_bearer_token(auth) {
headers.insert("Authorization".to_string(), format!("Bearer {token}"));
@@ -118,6 +123,13 @@ fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<Strin
if let Some(account_id) = auth.and_then(CodexAuth::get_account_id) {
headers.insert("ChatGPT-Account-ID".to_string(), account_id);
}
if let Some(agent_task_id) = agent_task_id {
trace!("attaching agent task id to codex apps gateway");
headers.insert(
X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER.to_string(),
agent_task_id.to_string(),
);
}
if headers.is_empty() {
None
} else {
@@ -151,12 +163,16 @@ pub(crate) fn codex_apps_mcp_url(config: &Config) -> String {
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
}
fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig {
fn codex_apps_mcp_server_config(
config: &Config,
auth: Option<&CodexAuth>,
agent_task_id: Option<&str>,
) -> McpServerConfig {
let bearer_token_env_var = codex_apps_mcp_bearer_token_env_var();
let http_headers = if bearer_token_env_var.is_some() {
None
} else {
codex_apps_mcp_http_headers(auth)
codex_apps_mcp_http_headers(auth, agent_task_id)
};
let url = codex_apps_mcp_url(config);
@@ -184,11 +200,12 @@ pub(crate) fn with_codex_apps_mcp(
connectors_enabled: bool,
auth: Option<&CodexAuth>,
config: &Config,
agent_task_id: Option<&str>,
) -> HashMap<String, McpServerConfig> {
if connectors_enabled {
servers.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
codex_apps_mcp_server_config(config, auth),
codex_apps_mcp_server_config(config, auth, agent_task_id),
);
} else {
servers.remove(CODEX_APPS_MCP_SERVER_NAME);
@@ -214,7 +231,16 @@ impl McpManager {
config: &Config,
auth: Option<&CodexAuth>,
) -> HashMap<String, McpServerConfig> {
effective_mcp_servers(config, auth, self.plugins_manager.as_ref())
self.effective_servers_with_agent_task(config, auth, None)
}
pub fn effective_servers_with_agent_task(
&self,
config: &Config,
auth: Option<&CodexAuth>,
agent_task_id: Option<&str>,
) -> HashMap<String, McpServerConfig> {
effective_mcp_servers(config, auth, self.plugins_manager.as_ref(), agent_task_id)
}
pub fn tool_plugin_provenance(&self, config: &Config) -> ToolPluginProvenance {
@@ -239,6 +265,7 @@ fn effective_mcp_servers(
config: &Config,
auth: Option<&CodexAuth>,
plugins_manager: &PluginsManager,
agent_task_id: Option<&str>,
) -> HashMap<String, McpServerConfig> {
let servers = configured_mcp_servers(config, plugins_manager);
with_codex_apps_mcp(
@@ -246,6 +273,7 @@ fn effective_mcp_servers(
config.features.apps_enabled_for_auth(auth),
auth,
config,
agent_task_id,
)
}

View File

@@ -159,7 +159,7 @@ fn codex_apps_server_config_uses_legacy_codex_apps_path() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
let mut servers = with_codex_apps_mcp(HashMap::new(), false, None, &config);
let mut servers = with_codex_apps_mcp(HashMap::new(), false, None, &config, None);
assert!(!servers.contains_key(CODEX_APPS_MCP_SERVER_NAME));
config
@@ -167,7 +167,7 @@ fn codex_apps_server_config_uses_legacy_codex_apps_path() {
.enable(Feature::Apps)
.expect("test config should allow apps");
servers = with_codex_apps_mcp(servers, true, None, &config);
servers = with_codex_apps_mcp(servers, true, None, &config, None);
let server = servers
.get(CODEX_APPS_MCP_SERVER_NAME)
.expect("codex apps should be present when apps is enabled");
@@ -179,6 +179,34 @@ fn codex_apps_server_config_uses_legacy_codex_apps_path() {
assert_eq!(url, "https://chatgpt.com/backend-api/wham/apps");
}
#[test]
fn codex_apps_server_config_includes_agent_task_header() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
config
.features
.enable(Feature::Apps)
.expect("test config should allow apps");
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
let servers = with_codex_apps_mcp(HashMap::new(), true, Some(&auth), &config, Some("task-123"));
let server = servers
.get(CODEX_APPS_MCP_SERVER_NAME)
.expect("codex apps should be present when apps is enabled");
match &server.transport {
McpServerTransportConfig::StreamableHttp { http_headers, .. } => {
assert_eq!(
http_headers
.as_ref()
.and_then(|headers| headers.get(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)),
Some(&"task-123".to_string())
);
}
other => panic!("expected streamable http transport, got {other:?}"),
}
}
#[tokio::test]
async fn effective_mcp_servers_include_plugins_without_overriding_user_config() {
let codex_home = tempfile::tempdir().expect("tempdir");

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::AuthManager;
use crate::RolloutRecorder;
use crate::agent::AgentControl;
use crate::agent_identity::AgentIdentityManager;
use crate::analytics_client::AnalyticsEventsClient;
use crate::client::ModelClient;
use crate::config::StartedNetworkProxy;
@@ -60,5 +61,6 @@ pub(crate) struct SessionServices {
pub(crate) state_db: Option<StateDbHandle>,
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,
pub(crate) agent_identity_manager: AgentIdentityManager,
pub(crate) code_mode_service: CodeModeService,
}

View File

@@ -34,6 +34,7 @@ pub(crate) struct SessionState {
pub(crate) startup_regular_task: Option<JoinHandle<CodexResult<RegularTask>>>,
pub(crate) active_connector_selection: HashSet<String>,
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
agent_task_id: Option<String>,
granted_permissions: Option<PermissionProfile>,
}
@@ -52,6 +53,7 @@ impl SessionState {
startup_regular_task: None,
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
agent_task_id: None,
granted_permissions: None,
}
}
@@ -207,6 +209,14 @@ impl SessionState {
self.pending_session_start_source.take()
}
pub(crate) fn agent_task_id(&self) -> Option<String> {
self.agent_task_id.clone()
}
pub(crate) fn set_agent_task_id(&mut self, task_id: Option<String>) {
self.agent_task_id = task_id;
}
pub(crate) fn record_granted_permissions(&mut self, permissions: PermissionProfile) {
self.granted_permissions =
merge_permission_profiles(self.granted_permissions.as_ref(), Some(&permissions));

View File

@@ -7,6 +7,7 @@ use codex_core::ModelProviderInfo;
use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::WireApi;
use codex_core::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_protocol::ThreadId;
@@ -247,6 +248,119 @@ async fn responses_stream_includes_subagent_header_on_other() {
);
}
#[tokio::test]
async fn responses_stream_includes_agent_task_header() {
core_test_support::skip_if_no_network!();
let server = responses::start_mock_server().await;
let response_body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_completed("resp-1"),
]);
let request_recorder = responses::mount_sse_once_match(
&server,
header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER, "task-123"),
response_body,
)
.await;
let provider = ModelProviderInfo {
name: "mock".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: None,
env_key_instructions: None,
experimental_bearer_token: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
let mut config = load_default_config_for_test(&codex_home).await;
config.model_provider_id = provider.name.clone();
config.model_provider = provider.clone();
let effort = config.model_reasoning_effort;
let summary = config.model_reasoning_summary;
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
config.model = Some(model.clone());
let config = Arc::new(config);
let conversation_id = ThreadId::new();
let session_source = SessionSource::Exec;
let model_info =
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
let session_telemetry = SessionTelemetry::new(
conversation_id,
model.as_str(),
model_info.slug.as_str(),
None,
Some("test@test.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"test_originator".to_string(),
false,
"test".to_string(),
session_source.clone(),
);
let client = ModelClient::new(
None,
conversation_id,
provider,
session_source,
config.model_verbosity,
false,
false,
false,
None,
);
client.set_agent_task_id(Some("task-123".to_string()));
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
id: None,
role: "user".into(),
content: vec![ContentItem::InputText {
text: "hello".into(),
}],
end_turn: None,
phase: None,
}];
let mut stream = client_session
.stream(
&prompt,
&model_info,
&session_telemetry,
effort,
summary.unwrap_or(model_info.default_reasoning_summary),
None,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
}
}
let request = request_recorder.single_request();
assert_eq!(
request
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
.as_deref(),
Some("task-123")
);
}
#[tokio::test]
async fn responses_respects_model_info_overrides_from_config() {
core_test_support::skip_if_no_network!();

View File

@@ -0,0 +1,481 @@
#![cfg(not(target_os = "windows"))]
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::Result;
use base64::Engine as _;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use chrono::Utc;
use codex_app_server_protocol::AuthMode;
use codex_core::CodexAuth;
use codex_core::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::AuthDotJson;
use codex_core::auth::save_auth;
use codex_core::features::Feature;
use codex_core::token_data::IdTokenInfo;
use codex_core::token_data::TokenData;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use serde_json::Value;
use serde_json::json;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::path_regex;
#[derive(Clone)]
struct JsonSequenceResponder {
bodies: Arc<Mutex<VecDeque<Value>>>,
}
impl JsonSequenceResponder {
fn new(bodies: Vec<Value>) -> Self {
Self {
bodies: Arc::new(Mutex::new(VecDeque::from(bodies))),
}
}
}
impl Respond for JsonSequenceResponder {
fn respond(&self, _request: &Request) -> ResponseTemplate {
let body = self
.bodies
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.pop_front()
.unwrap_or_else(|| panic!("missing queued JSON response"));
ResponseTemplate::new(200).set_body_json(body)
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn feature_off_skips_agent_identity_requests() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let response_mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
}
});
let test = builder.build(&server).await?;
test.submit_turn_with_policy("hello", workspace_write_with_network())
.await?;
let requests = server.received_requests().await.unwrap_or_default();
assert_eq!(
count_requests_for_path(&requests, "/backend-api/agent/register"),
0
);
assert_eq!(
count_requests_for_path(&requests, "/backend-api/task/register"),
0
);
assert_eq!(
response_mock
.single_request()
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER),
None
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_identity_registers_once_and_reuses_task_within_thread() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_agent_identity_endpoints(&server, vec!["task-1"]).await;
let response_mock = mount_sse_sequence(
&server,
vec![
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
],
)
.await;
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
config
.features
.enable(Feature::UseAgentIdentity)
.expect("test config should allow feature update");
}
});
let test = builder.build(&server).await?;
test.submit_turn_with_policy("hello", workspace_write_with_network())
.await?;
test.submit_turn_with_policy("again", workspace_write_with_network())
.await?;
let requests = server.received_requests().await.unwrap_or_default();
assert_eq!(
count_requests_for_path(&requests, "/backend-api/agent/register"),
1
);
assert_eq!(
count_requests_for_path(&requests, "/backend-api/task/register"),
1
);
assert_eq!(response_mock.requests().len(), 2);
let task_request = request_json_body(
requests
.iter()
.find(|request| request.url.path() == "/backend-api/task/register")
.expect("missing task register request"),
);
assert_eq!(task_request["agent_id"], json!("agent-runtime-1"));
assert_eq!(
task_request["metadata"]["thread_id"],
json!(test.session_configured.session_id.to_string())
);
assert!(
!task_request["timestamp"]
.as_str()
.unwrap_or_default()
.is_empty()
);
assert!(
!task_request["signature"]
.as_str()
.unwrap_or_default()
.is_empty()
);
for request in response_mock.requests() {
assert_eq!(
request
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
.as_deref(),
Some("task-1")
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn new_thread_reuses_agent_identity_but_gets_new_task() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
mount_agent_identity_endpoints(&server, vec!["task-1", "task-2"]).await;
let response_mock = mount_sse_sequence(
&server,
vec![
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
],
)
.await;
let mut first_builder = test_codex()
.with_home(Arc::clone(&home))
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
config
.features
.enable(Feature::UseAgentIdentity)
.expect("test config should allow feature update");
}
});
let first = first_builder.build(&server).await?;
first
.submit_turn_with_policy("hello", workspace_write_with_network())
.await?;
let mut second_builder = test_codex()
.with_home(home)
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
config
.features
.enable(Feature::UseAgentIdentity)
.expect("test config should allow feature update");
}
});
let second = second_builder.build(&server).await?;
second
.submit_turn_with_policy("hello from new thread", workspace_write_with_network())
.await?;
let requests = server.received_requests().await.unwrap_or_default();
assert_eq!(
count_requests_for_path(&requests, "/backend-api/agent/register"),
1
);
assert_eq!(
count_requests_for_path(&requests, "/backend-api/task/register"),
2
);
let response_requests = response_mock.requests();
assert_eq!(response_requests.len(), 2);
assert_eq!(
response_requests[0]
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
.as_deref(),
Some("task-1")
);
assert_eq!(
response_requests[1]
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
.as_deref(),
Some("task-2")
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auth_account_change_registers_a_new_agent_identity() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
mount_agent_identity_endpoints(&server, vec!["task-1", "task-2"]).await;
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(sse(vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
])),
)
.mount(&server)
.await;
let mut first_builder = test_codex()
.with_home(Arc::clone(&home))
.with_auth(chatgpt_auth_for_account(home.path(), "account-1")?)
.with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
config
.features
.enable(Feature::UseAgentIdentity)
.expect("test config should allow feature update");
}
});
let first = first_builder.build(&server).await?;
first
.submit_turn_with_policy("hello", workspace_write_with_network())
.await?;
let mut second_builder = test_codex()
.with_home(home)
.with_auth(chatgpt_auth_for_account(first.home.path(), "account-2")?)
.with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
config
.features
.enable(Feature::UseAgentIdentity)
.expect("test config should allow feature update");
}
});
let second = second_builder.build(&server).await?;
second
.submit_turn_with_policy("hello from second account", workspace_write_with_network())
.await?;
let requests = server.received_requests().await.unwrap_or_default();
assert_eq!(
count_requests_for_path(&requests, "/backend-api/agent/register"),
2
);
assert_eq!(
count_requests_for_path(&requests, "/backend-api/task/register"),
2
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn api_key_auth_bypasses_agent_identity_flow() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let response_mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let mut builder = test_codex().with_config({
let base_url = server.uri();
move |config| {
config.chatgpt_base_url = base_url;
config.model_provider.supports_websockets = false;
config
.features
.enable(Feature::UseAgentIdentity)
.expect("test config should allow feature update");
}
});
let test = builder.build(&server).await?;
test.submit_turn_with_policy("hello", workspace_write_with_network())
.await?;
let requests = server.received_requests().await.unwrap_or_default();
assert_eq!(
count_requests_for_path(&requests, "/backend-api/agent/register"),
0
);
assert_eq!(
count_requests_for_path(&requests, "/backend-api/task/register"),
0
);
assert_eq!(
response_mock
.single_request()
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER),
None
);
Ok(())
}
async fn mount_agent_identity_endpoints(server: &wiremock::MockServer, task_ids: Vec<&str>) {
Mock::given(method("POST"))
.and(path("/backend-api/agent/register"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"agent_runtime_id": "agent-runtime-1",
})))
.mount(server)
.await;
Mock::given(method("POST"))
.and(path("/backend-api/task/register"))
.respond_with(JsonSequenceResponder::new(
task_ids
.into_iter()
.map(|task_id| json!({ "task_id": task_id }))
.collect(),
))
.mount(server)
.await;
}
fn count_requests_for_path(requests: &[wiremock::Request], expected_path: &str) -> usize {
requests
.iter()
.filter(|request| request.url.path() == expected_path)
.count()
}
fn request_json_body(request: &wiremock::Request) -> Value {
match serde_json::from_slice(&request.body) {
Ok(body) => body,
Err(err) => panic!("request body should be valid JSON: {err}"),
}
}
fn workspace_write_with_network() -> codex_protocol::protocol::SandboxPolicy {
codex_protocol::protocol::SandboxPolicy::WorkspaceWrite {
writable_roots: vec![],
read_only_access: Default::default(),
network_access: true,
exclude_tmpdir_env_var: true,
exclude_slash_tmp: true,
}
}
fn chatgpt_auth_for_account(codex_home: &std::path::Path, account_id: &str) -> Result<CodexAuth> {
let mut id_token = IdTokenInfo::default();
id_token.raw_jwt = fake_chatgpt_id_token(account_id)?;
let auth_dot_json = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(TokenData {
id_token,
access_token: format!("access-token-{account_id}"),
refresh_token: "refresh-token".to_string(),
account_id: Some(account_id.to_string()),
}),
last_refresh: Some(Utc::now()),
};
save_auth(codex_home, &auth_dot_json, AuthCredentialsStoreMode::File)?;
let Some(auth) = CodexAuth::from_auth_storage(codex_home, AuthCredentialsStoreMode::File)?
else {
anyhow::bail!("auth should load from storage");
};
Ok(auth)
}
fn fake_chatgpt_id_token(account_id: &str) -> Result<String> {
#[derive(serde::Serialize)]
struct Header {
alg: &'static str,
typ: &'static str,
}
let header = Header {
alg: "none",
typ: "JWT",
};
let payload = json!({
"email": "test@example.com",
"https://api.openai.com/auth": {
"chatgpt_user_id": "user-123",
"chatgpt_account_id": account_id,
},
});
let header_bytes = serde_json::to_vec(&header)?;
let payload_bytes = serde_json::to_vec(&payload)?;
let header_b64 = URL_SAFE_NO_PAD.encode(header_bytes);
let payload_b64 = URL_SAFE_NO_PAD.encode(payload_bytes);
let signature_b64 = URL_SAFE_NO_PAD.encode("sig");
Ok(format!("{header_b64}.{payload_b64}.{signature_b64}"))
}

View File

@@ -6,6 +6,7 @@ use codex_core::ModelProviderInfo;
use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::WireApi;
use codex_core::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
use codex_core::features::Feature;
use codex_core::ws_version_from_features;
@@ -98,6 +99,34 @@ async fn responses_websocket_streams_request() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_includes_agent_task_header() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness(&server).await;
harness
.client
.set_agent_task_id(Some("task-123".to_string()));
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut client_session, &harness, &prompt).await;
let handshake = server.single_handshake();
assert_eq!(
handshake.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER),
Some("task-123".to_string())
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_preconnect_reuses_connection() {
skip_if_no_network!();

View File

@@ -56,6 +56,7 @@ pub static CODEX_ALIASES_TEMP_DIR: TestCodexAliasesGuard = unsafe {
#[cfg(not(target_os = "windows"))]
mod abort_tasks;
mod agent_identity;
mod agent_jobs;
mod agent_websocket;
mod apply_patch_cli;