mirror of
https://github.com/openai/codex.git
synced 2026-03-19 05:03:51 +00:00
Compare commits
1 Commits
starr/exec
...
dev/adrian
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7f6f4173a |
2
MODULE.bazel.lock
generated
2
MODULE.bazel.lock
generated
@@ -787,6 +787,8 @@
|
||||
"dupe_0.9.1": "{\"dependencies\":[{\"name\":\"dupe_derive\",\"req\":\"=0.9.1\"}],\"features\":{}}",
|
||||
"dupe_derive_0.9.1": "{\"dependencies\":[{\"name\":\"proc-macro2\",\"req\":\"^1.0\"},{\"name\":\"quote\",\"req\":\"^1.0.3\"},{\"features\":[\"extra-traits\"],\"name\":\"syn\",\"req\":\"^2\"}],\"features\":{}}",
|
||||
"dyn-clone_1.0.20": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"rustversion\",\"req\":\"^1.0\"},{\"features\":[\"diff\"],\"kind\":\"dev\",\"name\":\"trybuild\",\"req\":\"^1.0.66\"}],\"features\":{}}",
|
||||
"ed25519-dalek_2.2.0": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"bincode\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"blake2\",\"req\":\"^0.10\"},{\"features\":[\"html_reports\"],\"kind\":\"dev\",\"name\":\"criterion\",\"req\":\"^0.5\"},{\"default_features\":false,\"features\":[\"digest\"],\"name\":\"curve25519-dalek\",\"req\":\"^4\"},{\"default_features\":false,\"features\":[\"digest\",\"rand_core\"],\"kind\":\"dev\",\"name\":\"curve25519-dalek\",\"req\":\"^4\"},{\"default_features\":false,\"name\":\"ed25519\",\"req\":\">=2.2, <2.3\"},{\"kind\":\"dev\",\"name\":\"hex\",\"req\":\"^0.4\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"default_features\":false,\"name\":\"merlin\",\"optional\":true,\"req\":\"^3\"},{\"kind\":\"dev\",\"name\":\"rand\",\"req\":\"^0.8\"},{\"default_features\":false,\"name\":\"rand_core\",\"optional\":true,\"req\":\"^0.6.4\"},{\"default_features\":false,\"kind\":\"dev\",\"name\":\"rand_core\",\"req\":\"^0.6.4\"},{\"default_features\":false,\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"serde\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0\"},{\"default_features\":false,\"name\":\"sha2\",\"req\":\"^0.10\"},{\"kind\":\"dev\",\"name\":\"sha3\",\"req\":\"^0.10\"},{\"default_features\":false,\"name\":\"signature\",\"optional\":true,\"req\":\">=2.0, <2.3\"},{\"default_features\":false,\"name\":\"subtle\",\"req\":\"^2.3.0\"},{\"kind\":\"dev\",\"name\":\"toml\",\"req\":\"^0.7\"},{\"default_features\":false,\"features\":[\"static_secrets\"],\"kind\":\"dev\",\"name\":\"x25519-dalek\",\"req\":\"^2\"},{\"default_features\":false,\"name\":\"zeroize\",\"optional\":true,\"req\":\"^1.5\"}],\"features\":{\"alloc\":[\"curve25519-dalek/alloc\",\"ed25519/alloc\",\"serde?/alloc\",\"zeroize/alloc\"],\"asm\":[\"sha2/asm\"],\"batch\":[\"alloc\",\"merlin\",\"rand_core\"],\"default\":[\"fast\",\"std\",\"zeroize\"],\"digest\":[\"signature/digest\"],\"fast\":[\"curve25519-dalek/precomputed-tables\"],\"hazmat\":[],\"legacy_compatibility\":[\"curve25519-dalek/legacy_compatibility\"],\"pem\":[\"alloc\",\"ed25519/pem\",\"pkcs8\"],\"pkcs8\":[\"ed25519/pkcs8\"],\"rand_core\":[\"dep:rand_core\"],\"serde\":[\"dep:serde\",\"ed25519/serde\"],\"std\":[\"alloc\",\"ed25519/std\",\"serde?/std\",\"sha2/std\"],\"zeroize\":[\"dep:zeroize\",\"curve25519-dalek/zeroize\"]}}",
|
||||
"ed25519_2.2.3": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"bincode\",\"req\":\"^1\"},{\"features\":[\"rand_core\"],\"kind\":\"dev\",\"name\":\"ed25519-dalek\",\"req\":\"^2\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"name\":\"pkcs8\",\"optional\":true,\"req\":\"^0.10\"},{\"features\":[\"std\"],\"kind\":\"dev\",\"name\":\"rand_core\",\"req\":\"^0.6\"},{\"default_features\":false,\"features\":[\"signature\"],\"kind\":\"dev\",\"name\":\"ring-compat\",\"req\":\"^0.8\"},{\"default_features\":false,\"name\":\"serde\",\"optional\":true,\"req\":\"^1\"},{\"name\":\"serde_bytes\",\"optional\":true,\"req\":\"^0.11\"},{\"default_features\":false,\"name\":\"signature\",\"req\":\"^2\"},{\"default_features\":false,\"name\":\"zeroize\",\"optional\":true,\"req\":\"^1\"}],\"features\":{\"alloc\":[\"pkcs8?/alloc\"],\"default\":[\"std\"],\"pem\":[\"alloc\",\"pkcs8/pem\"],\"serde_bytes\":[\"serde\",\"dep:serde_bytes\"],\"std\":[\"pkcs8?/std\",\"signature/std\"]}}",
|
||||
"either_1.15.0": "{\"dependencies\":[{\"default_features\":false,\"features\":[\"alloc\",\"derive\"],\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0.95\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0.0\"}],\"features\":{\"default\":[\"std\"],\"std\":[],\"use_std\":[\"std\"]}}",
|
||||
"ena_0.14.3": "{\"dependencies\":[{\"name\":\"dogged\",\"optional\":true,\"req\":\"^0.2.0\"},{\"name\":\"log\",\"req\":\"^0.4\"}],\"features\":{\"bench\":[],\"persistent\":[\"dogged\"]}}",
|
||||
"encode_unicode_1.0.0": "{\"dependencies\":[{\"default_features\":false,\"name\":\"ascii\",\"optional\":true,\"req\":\"^1.0.0\"},{\"kind\":\"dev\",\"name\":\"lazy_static\",\"req\":\"^1.0\",\"target\":\"cfg(unix)\"},{\"features\":[\"https-native\"],\"kind\":\"dev\",\"name\":\"minreq\",\"req\":\"^2.6\"}],\"features\":{\"default\":[\"std\"],\"std\":[]}}",
|
||||
|
||||
26
codex-rs/Cargo.lock
generated
26
codex-rs/Cargo.lock
generated
@@ -1872,6 +1872,7 @@ dependencies = [
|
||||
"ctor 0.6.3",
|
||||
"dirs",
|
||||
"dunce",
|
||||
"ed25519-dalek",
|
||||
"encoding_rs",
|
||||
"env-flags",
|
||||
"eventsource-stream",
|
||||
@@ -3291,6 +3292,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"curve25519-dalek-derive",
|
||||
"digest",
|
||||
"fiat-crypto",
|
||||
"rustc_version",
|
||||
"subtle",
|
||||
@@ -3771,6 +3773,30 @@ version = "1.0.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
|
||||
|
||||
[[package]]
|
||||
name = "ed25519"
|
||||
version = "2.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
|
||||
dependencies = [
|
||||
"pkcs8",
|
||||
"signature",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ed25519-dalek"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9"
|
||||
dependencies = [
|
||||
"curve25519-dalek",
|
||||
"ed25519",
|
||||
"serde",
|
||||
"sha2",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.15.0"
|
||||
|
||||
@@ -183,6 +183,7 @@ diffy = "0.4.2"
|
||||
dirs = "6"
|
||||
dotenvy = "0.15.7"
|
||||
dunce = "1.0.4"
|
||||
ed25519-dalek = { version = "2.2.0", features = ["pkcs8", "pem"] }
|
||||
encoding_rs = "0.8.35"
|
||||
fd-lock = "4.0.4"
|
||||
env-flags = "0.1.1"
|
||||
|
||||
@@ -60,6 +60,7 @@ codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-
|
||||
csv = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
dunce = { workspace = true }
|
||||
ed25519-dalek = { workspace = true }
|
||||
encoding_rs = { workspace = true }
|
||||
env-flags = { workspace = true }
|
||||
eventsource-stream = { workspace = true }
|
||||
|
||||
@@ -497,6 +497,9 @@
|
||||
"unified_exec": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"use_agent_identity": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"use_legacy_landlock": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -2049,6 +2052,9 @@
|
||||
"unified_exec": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"use_agent_identity": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"use_legacy_landlock": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
517
codex-rs/core/src/agent_identity.rs
Normal file
517
codex-rs/core/src/agent_identity.rs
Normal file
@@ -0,0 +1,517 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
|
||||
use chrono::SecondsFormat;
|
||||
use chrono::Utc;
|
||||
use ed25519_dalek::Signature;
|
||||
use ed25519_dalek::Signer;
|
||||
use ed25519_dalek::SigningKey;
|
||||
use ed25519_dalek::pkcs8::DecodePrivateKey;
|
||||
use ed25519_dalek::pkcs8::EncodePrivateKey;
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::json;
|
||||
use tracing::info;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::CodexAuth;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use crate::default_client::originator;
|
||||
use crate::features::Feature;
|
||||
use codex_secrets::SecretName;
|
||||
use codex_secrets::SecretScope;
|
||||
use codex_secrets::SecretsBackendKind;
|
||||
use codex_secrets::SecretsManager;
|
||||
|
||||
const AGENT_IDENTITY_SECRET_NAME: &str = "AGENT_IDENTITY";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AgentIdentityManager {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
http_client: Client,
|
||||
secrets: SecretsManager,
|
||||
}
|
||||
|
||||
impl AgentIdentityManager {
|
||||
pub(crate) fn new(auth_manager: Arc<AuthManager>, codex_home: PathBuf) -> Self {
|
||||
Self {
|
||||
auth_manager,
|
||||
http_client: build_reqwest_client(),
|
||||
secrets: SecretsManager::new(codex_home, SecretsBackendKind::Local),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn ensure_thread_task(
|
||||
&self,
|
||||
config: &Config,
|
||||
thread_id: &str,
|
||||
) -> Result<Option<String>> {
|
||||
if !config.features.enabled(Feature::UseAgentIdentity) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(auth) = self.auth_manager.auth().await else {
|
||||
return Ok(None);
|
||||
};
|
||||
if !auth.is_chatgpt_auth() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let binding_id = binding_id_for_auth(
|
||||
self.auth_manager.forced_chatgpt_workspace_id(),
|
||||
auth.get_account_id(),
|
||||
)
|
||||
.context("agent identity requires a ChatGPT workspace/account binding")?;
|
||||
|
||||
let identity = match self.ensure_agent_identity(config, &auth, &binding_id).await {
|
||||
Ok(identity) => identity,
|
||||
Err(err) => {
|
||||
warn!(binding_id = %binding_id, "failed to ensure agent identity: {err:#}");
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match self
|
||||
.register_task(config, &auth, &identity, thread_id)
|
||||
.await
|
||||
{
|
||||
Ok(task_id) => Ok(Some(task_id)),
|
||||
Err(err) => {
|
||||
warn!(
|
||||
binding_id = %binding_id,
|
||||
"agent task registration failed, deleting stored identity and retrying once: {err:#}"
|
||||
);
|
||||
self.delete_stored_identity(&binding_id)?;
|
||||
let identity = self
|
||||
.register_agent_identity(config, &auth, &binding_id)
|
||||
.await
|
||||
.context("re-registering agent identity after task failure")?;
|
||||
let task_id = self
|
||||
.register_task(config, &auth, &identity, thread_id)
|
||||
.await
|
||||
.context("registering agent task after agent identity retry")?;
|
||||
Ok(Some(task_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_agent_identity(
|
||||
&self,
|
||||
config: &Config,
|
||||
auth: &CodexAuth,
|
||||
binding_id: &str,
|
||||
) -> Result<StoredAgentIdentity> {
|
||||
if let Some(identity) = self.load_stored_identity(binding_id)? {
|
||||
trace!(binding_id = %binding_id, "reusing stored agent identity");
|
||||
return Ok(identity);
|
||||
}
|
||||
|
||||
self.register_agent_identity(config, auth, binding_id).await
|
||||
}
|
||||
|
||||
async fn register_agent_identity(
|
||||
&self,
|
||||
config: &Config,
|
||||
auth: &CodexAuth,
|
||||
binding_id: &str,
|
||||
) -> Result<StoredAgentIdentity> {
|
||||
let key_material = generate_key_material()?;
|
||||
let body = AgentRegisterRequest {
|
||||
agent_public_key: key_material.public_key_base64.clone(),
|
||||
abom: build_abom(),
|
||||
capabilities: vec!["codex_backend".to_string(), "connector_gateway".to_string()],
|
||||
metadata: json!({
|
||||
"originator": originator().value,
|
||||
"workspace_id": binding_id,
|
||||
"chatgpt_user_id": auth.get_chatgpt_user_id(),
|
||||
}),
|
||||
on_behalf_of: OnBehalfOf {
|
||||
workspace_id: binding_id.to_string(),
|
||||
},
|
||||
};
|
||||
let response: AgentRegisterResponse = self
|
||||
.post_json(agent_register_url(&config.chatgpt_base_url), auth, &body)
|
||||
.await
|
||||
.context("registering agent identity")?;
|
||||
|
||||
let identity = StoredAgentIdentity {
|
||||
binding_id: binding_id.to_string(),
|
||||
agent_runtime_id: response.agent_runtime_id,
|
||||
private_key_pkcs8_base64: key_material.private_key_pkcs8_base64,
|
||||
public_key_base64: key_material.public_key_base64,
|
||||
registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
abom: body.abom,
|
||||
metadata: body.metadata,
|
||||
};
|
||||
self.store_identity(&identity)?;
|
||||
info!(binding_id = %binding_id, "agent identity registration succeeded");
|
||||
Ok(identity)
|
||||
}
|
||||
|
||||
async fn register_task(
|
||||
&self,
|
||||
config: &Config,
|
||||
auth: &CodexAuth,
|
||||
identity: &StoredAgentIdentity,
|
||||
thread_id: &str,
|
||||
) -> Result<String> {
|
||||
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
|
||||
let payload = canonical_signing_payload(&identity.agent_runtime_id, ×tamp);
|
||||
let signature = sign_payload(&identity.private_key_pkcs8_base64, payload.as_bytes())?;
|
||||
let body = TaskRegisterRequest {
|
||||
agent_runtime_id: identity.agent_runtime_id.clone(),
|
||||
timestamp,
|
||||
signature,
|
||||
metadata: json!({
|
||||
"thread_id": thread_id,
|
||||
}),
|
||||
};
|
||||
let response: TaskRegisterResponse = self
|
||||
.post_json(task_register_url(&config.chatgpt_base_url), auth, &body)
|
||||
.await
|
||||
.context("registering agent task")?;
|
||||
let task_id = decrypt_task_id(response)?;
|
||||
info!(thread_id = %thread_id, "agent task registration succeeded");
|
||||
Ok(task_id)
|
||||
}
|
||||
|
||||
async fn post_json<TReq, TResp>(
|
||||
&self,
|
||||
url: String,
|
||||
auth: &CodexAuth,
|
||||
body: &TReq,
|
||||
) -> Result<TResp>
|
||||
where
|
||||
TReq: Serialize + ?Sized,
|
||||
TResp: DeserializeOwned,
|
||||
{
|
||||
let token = auth
|
||||
.get_token()
|
||||
.context("loading ChatGPT access token for agent identity request")?;
|
||||
let mut request = self.http_client.post(url).bearer_auth(token).json(body);
|
||||
if let Some(account_id) = auth.get_account_id() {
|
||||
request = request.header("ChatGPT-Account-ID", account_id);
|
||||
}
|
||||
let response = request
|
||||
.send()
|
||||
.await
|
||||
.context("sending agent identity request")?;
|
||||
let status = response.status();
|
||||
if !status.is_success() {
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
anyhow::bail!("agent identity request failed with {status}: {body}");
|
||||
}
|
||||
response
|
||||
.json::<TResp>()
|
||||
.await
|
||||
.context("decoding agent identity response")
|
||||
}
|
||||
|
||||
fn load_stored_identity(&self, binding_id: &str) -> Result<Option<StoredAgentIdentity>> {
|
||||
let secret_name = secret_name()?;
|
||||
let secret_scope = secret_scope(binding_id)?;
|
||||
let Some(raw) = self.secrets.get(&secret_scope, &secret_name)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
match serde_json::from_str::<StoredAgentIdentity>(&raw) {
|
||||
Ok(identity) if identity.binding_id == binding_id => Ok(Some(identity)),
|
||||
Ok(_) => {
|
||||
warn!(binding_id = %binding_id, "stored agent identity binding mismatch, deleting cached value");
|
||||
self.delete_stored_identity(binding_id)?;
|
||||
Ok(None)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(binding_id = %binding_id, "failed to parse stored agent identity, deleting cached value: {err:#}");
|
||||
self.delete_stored_identity(binding_id)?;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn store_identity(&self, identity: &StoredAgentIdentity) -> Result<()> {
|
||||
let secret_name = secret_name()?;
|
||||
let secret_scope = secret_scope(&identity.binding_id)?;
|
||||
let raw = serde_json::to_string(identity).context("serializing stored agent identity")?;
|
||||
self.secrets
|
||||
.set(&secret_scope, &secret_name, &raw)
|
||||
.context("persisting stored agent identity")
|
||||
}
|
||||
|
||||
fn delete_stored_identity(&self, binding_id: &str) -> Result<()> {
|
||||
let secret_name = secret_name()?;
|
||||
let secret_scope = secret_scope(binding_id)?;
|
||||
let _ = self
|
||||
.secrets
|
||||
.delete(&secret_scope, &secret_name)
|
||||
.context("deleting stored agent identity")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
struct StoredAgentIdentity {
|
||||
binding_id: String,
|
||||
agent_runtime_id: String,
|
||||
private_key_pkcs8_base64: String,
|
||||
public_key_base64: String,
|
||||
registered_at: String,
|
||||
abom: AgentAbom,
|
||||
metadata: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
struct AgentAbom {
|
||||
agent_version: String,
|
||||
agent_harness_id: String,
|
||||
running_location: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct AgentRegisterRequest {
|
||||
agent_public_key: String,
|
||||
abom: AgentAbom,
|
||||
capabilities: Vec<String>,
|
||||
metadata: serde_json::Value,
|
||||
on_behalf_of: OnBehalfOf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct OnBehalfOf {
|
||||
workspace_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AgentRegisterResponse {
|
||||
#[serde(alias = "agent_id", alias = "runtime_identity")]
|
||||
agent_runtime_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct TaskRegisterRequest {
|
||||
#[serde(rename = "agent_id")]
|
||||
agent_runtime_id: String,
|
||||
timestamp: String,
|
||||
signature: String,
|
||||
metadata: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct TaskRegisterResponse {
|
||||
#[serde(default)]
|
||||
task_id: Option<String>,
|
||||
#[serde(default)]
|
||||
encrypted_task_id: Option<String>,
|
||||
}
|
||||
|
||||
struct GeneratedKeyMaterial {
|
||||
private_key_pkcs8_base64: String,
|
||||
public_key_base64: String,
|
||||
}
|
||||
|
||||
fn binding_id_for_auth(
|
||||
forced_chatgpt_workspace_id: Option<String>,
|
||||
account_id: Option<String>,
|
||||
) -> Option<String> {
|
||||
forced_chatgpt_workspace_id.or(account_id)
|
||||
}
|
||||
|
||||
fn normalized_agent_identity_base_url(chatgpt_base_url: &str) -> String {
|
||||
let base_url = chatgpt_base_url.trim_end_matches('/');
|
||||
if base_url.contains("/backend-api") || base_url.contains("/api/codex") {
|
||||
base_url.to_string()
|
||||
} else {
|
||||
format!("{base_url}/backend-api")
|
||||
}
|
||||
}
|
||||
|
||||
fn agent_register_url(chatgpt_base_url: &str) -> String {
|
||||
format!(
|
||||
"{}/agent/register",
|
||||
normalized_agent_identity_base_url(chatgpt_base_url)
|
||||
)
|
||||
}
|
||||
|
||||
fn task_register_url(chatgpt_base_url: &str) -> String {
|
||||
format!(
|
||||
"{}/task/register",
|
||||
normalized_agent_identity_base_url(chatgpt_base_url)
|
||||
)
|
||||
}
|
||||
|
||||
fn secret_name() -> Result<SecretName> {
|
||||
SecretName::new(AGENT_IDENTITY_SECRET_NAME).context("building agent identity secret name")
|
||||
}
|
||||
|
||||
fn secret_scope(binding_id: &str) -> Result<SecretScope> {
|
||||
SecretScope::environment(format!("agent-identity-{binding_id}"))
|
||||
.context("building agent identity secret scope")
|
||||
}
|
||||
|
||||
fn build_abom() -> AgentAbom {
|
||||
let os_info = os_info::get();
|
||||
AgentAbom {
|
||||
agent_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
agent_harness_id: originator().value,
|
||||
running_location: format!(
|
||||
"{}-{}",
|
||||
os_info.os_type(),
|
||||
os_info.architecture().unwrap_or("unknown")
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_key_material() -> Result<GeneratedKeyMaterial> {
|
||||
let secret_key = rand::random::<[u8; 32]>();
|
||||
let signing_key = SigningKey::from_bytes(&secret_key);
|
||||
let private_key_pkcs8_base64 = BASE64_STANDARD.encode(
|
||||
signing_key
|
||||
.to_pkcs8_der()
|
||||
.context("encoding agent identity private key")?
|
||||
.as_bytes(),
|
||||
);
|
||||
let public_key_base64 = URL_SAFE_NO_PAD.encode(signing_key.verifying_key().as_bytes());
|
||||
Ok(GeneratedKeyMaterial {
|
||||
private_key_pkcs8_base64,
|
||||
public_key_base64,
|
||||
})
|
||||
}
|
||||
|
||||
fn canonical_signing_payload(agent_runtime_id: &str, timestamp: &str) -> String {
|
||||
format!("{agent_runtime_id}:{timestamp}")
|
||||
}
|
||||
|
||||
fn sign_payload(private_key_pkcs8_base64: &str, payload: &[u8]) -> Result<String> {
|
||||
let private_key_pkcs8_der = BASE64_STANDARD
|
||||
.decode(private_key_pkcs8_base64)
|
||||
.context("decoding agent identity private key")?;
|
||||
let signing_key = SigningKey::from_pkcs8_der(&private_key_pkcs8_der)
|
||||
.context("decoding agent identity private key")?;
|
||||
let signature: Signature = signing_key.sign(payload);
|
||||
Ok(URL_SAFE_NO_PAD.encode(signature.to_bytes()))
|
||||
}
|
||||
|
||||
fn decrypt_task_id(response: TaskRegisterResponse) -> Result<String> {
|
||||
if let Some(task_id) = response.task_id {
|
||||
return Ok(task_id);
|
||||
}
|
||||
|
||||
let encrypted_task_id = response
|
||||
.encrypted_task_id
|
||||
.context("task register response was missing both task_id and encrypted_task_id")?;
|
||||
|
||||
if let Some(task_id) = encrypted_task_id.strip_prefix("plaintext:") {
|
||||
return Ok(task_id.to_string());
|
||||
}
|
||||
|
||||
let decoded = BASE64_STANDARD
|
||||
.decode(&encrypted_task_id)
|
||||
.or_else(|_| URL_SAFE_NO_PAD.decode(&encrypted_task_id))
|
||||
.context("decoding encrypted task id envelope")?;
|
||||
String::from_utf8(decoded).context("decoding encrypted task id UTF-8 payload")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_keyring_store::tests::MockKeyringStore;
|
||||
use codex_secrets::SecretsManager;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn binding_id_prefers_forced_workspace() {
|
||||
let binding = binding_id_for_auth(
|
||||
Some("workspace-123".to_string()),
|
||||
Some("account-456".to_string()),
|
||||
);
|
||||
assert_eq!(binding, Some("workspace-123".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn signing_payload_is_stable() {
|
||||
assert_eq!(
|
||||
canonical_signing_payload("agent-123", "2026-03-16T12:34:56Z"),
|
||||
"agent-123:2026-03-16T12:34:56Z".to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decrypt_task_id_prefers_plaintext_field() {
|
||||
let task_id = decrypt_task_id(TaskRegisterResponse {
|
||||
task_id: Some("task-123".to_string()),
|
||||
encrypted_task_id: Some(BASE64_STANDARD.encode("ignored")),
|
||||
})
|
||||
.expect("task id should decode");
|
||||
assert_eq!(task_id, "task-123".to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decrypt_task_id_decodes_base64_fallback() {
|
||||
let task_id = decrypt_task_id(TaskRegisterResponse {
|
||||
task_id: None,
|
||||
encrypted_task_id: Some(BASE64_STANDARD.encode("task-456")),
|
||||
})
|
||||
.expect("task id should decode");
|
||||
assert_eq!(task_id, "task-456".to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decrypt_task_id_decodes_urlsafe_base64_fallback() {
|
||||
let task_id = decrypt_task_id(TaskRegisterResponse {
|
||||
task_id: None,
|
||||
encrypted_task_id: Some(URL_SAFE_NO_PAD.encode("task-789")),
|
||||
})
|
||||
.expect("task id should decode");
|
||||
assert_eq!(task_id, "task-789".to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_identity_round_trips_in_secrets_manager() {
|
||||
let codex_home = tempfile::tempdir().expect("tempdir");
|
||||
let keyring = Arc::new(MockKeyringStore::default());
|
||||
let secrets = SecretsManager::new_with_keyring_store(
|
||||
codex_home.path().to_path_buf(),
|
||||
SecretsBackendKind::Local,
|
||||
keyring,
|
||||
);
|
||||
let identity = StoredAgentIdentity {
|
||||
binding_id: "workspace-123".to_string(),
|
||||
agent_runtime_id: "agent-123".to_string(),
|
||||
private_key_pkcs8_base64: "private".to_string(),
|
||||
public_key_base64: "public".to_string(),
|
||||
registered_at: "2026-03-16T12:34:56Z".to_string(),
|
||||
abom: build_abom(),
|
||||
metadata: json!({"workspace_id": "workspace-123"}),
|
||||
};
|
||||
|
||||
let secret_name = secret_name().expect("secret name");
|
||||
let secret_scope = secret_scope("workspace-123").expect("secret scope");
|
||||
let serialized = serde_json::to_string(&identity).expect("serialize identity");
|
||||
secrets
|
||||
.set(&secret_scope, &secret_name, &serialized)
|
||||
.expect("set identity");
|
||||
let loaded = secrets
|
||||
.get(&secret_scope, &secret_name)
|
||||
.expect("get identity")
|
||||
.expect("missing identity");
|
||||
let decoded: StoredAgentIdentity = serde_json::from_str(&loaded).expect("decode identity");
|
||||
assert_eq!(decoded, identity);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn secret_scope_is_binding_specific() {
|
||||
let first = secret_scope("workspace-123").expect("first scope");
|
||||
let second = secret_scope("workspace-456").expect("second scope");
|
||||
assert_ne!(first, second);
|
||||
}
|
||||
}
|
||||
@@ -111,6 +111,7 @@ use crate::util::emit_feedback_request_tags;
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
|
||||
pub const X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER: &str = "X-Openai-Internal-Codex-Task-Id";
|
||||
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
@@ -142,6 +143,7 @@ struct ModelClientState {
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
disable_websockets: AtomicBool,
|
||||
agent_task_id: StdMutex<Option<String>>,
|
||||
cached_websocket_session: StdMutex<WebsocketSession>,
|
||||
}
|
||||
|
||||
@@ -276,6 +278,7 @@ impl ModelClient {
|
||||
include_timing_metrics,
|
||||
beta_features_header,
|
||||
disable_websockets: AtomicBool::new(false),
|
||||
agent_task_id: StdMutex::new(None),
|
||||
cached_websocket_session: StdMutex::new(WebsocketSession::default()),
|
||||
}),
|
||||
}
|
||||
@@ -310,6 +313,23 @@ impl ModelClient {
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner) = websocket_session;
|
||||
}
|
||||
|
||||
pub fn set_agent_task_id(&self, task_id: Option<String>) {
|
||||
*self
|
||||
.state
|
||||
.agent_task_id
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner) = task_id;
|
||||
self.store_cached_websocket_session(WebsocketSession::default());
|
||||
}
|
||||
|
||||
fn agent_task_id(&self) -> Option<String> {
|
||||
self.state
|
||||
.agent_task_id
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Compacts the current conversation history using the Compact endpoint.
|
||||
///
|
||||
/// This is a unary call (no streaming) that returns a new list of
|
||||
@@ -609,6 +629,7 @@ impl ModelClient {
|
||||
self.state.beta_features_header.as_deref(),
|
||||
turn_state,
|
||||
turn_metadata_header.as_ref(),
|
||||
self.agent_task_id().as_deref(),
|
||||
);
|
||||
if let Ok(header_value) = HeaderValue::from_str(&conversation_id) {
|
||||
headers.insert("x-client-request-id", header_value);
|
||||
@@ -733,12 +754,18 @@ impl ModelClientSession {
|
||||
self.client.state.beta_features_header.as_deref(),
|
||||
Some(&self.turn_state),
|
||||
turn_metadata_header.as_ref(),
|
||||
self.client.agent_task_id().as_deref(),
|
||||
),
|
||||
compression,
|
||||
turn_state: Some(Arc::clone(&self.turn_state)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_agent_task_id(&mut self, task_id: Option<String>) {
|
||||
self.client.set_agent_task_id(task_id);
|
||||
self.websocket_session = WebsocketSession::default();
|
||||
}
|
||||
|
||||
fn get_incremental_items(
|
||||
&self,
|
||||
request: &ResponsesApiRequest,
|
||||
@@ -1337,6 +1364,7 @@ fn build_responses_headers(
|
||||
beta_features_header: Option<&str>,
|
||||
turn_state: Option<&Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&HeaderValue>,
|
||||
agent_task_id: Option<&str>,
|
||||
) -> ApiHeaderMap {
|
||||
let mut headers = ApiHeaderMap::new();
|
||||
if let Some(value) = beta_features_header
|
||||
@@ -1354,6 +1382,12 @@ fn build_responses_headers(
|
||||
if let Some(header_value) = turn_metadata_header {
|
||||
headers.insert(X_CODEX_TURN_METADATA_HEADER, header_value.clone());
|
||||
}
|
||||
if let Some(agent_task_id) = agent_task_id
|
||||
&& let Ok(header_value) = HeaderValue::from_str(agent_task_id)
|
||||
{
|
||||
trace!("attaching agent task id to responses request");
|
||||
headers.insert(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER, header_value);
|
||||
}
|
||||
headers
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::SandboxState;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::agent_identity::AgentIdentityManager;
|
||||
use crate::analytics_client::AnalyticsEventsClient;
|
||||
use crate::analytics_client::AppInvocation;
|
||||
use crate::analytics_client::InvocationType;
|
||||
@@ -1792,6 +1793,10 @@ impl Session {
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Self::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
agent_identity_manager: AgentIdentityManager::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.codex_home.clone(),
|
||||
),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
@@ -2047,6 +2052,72 @@ impl Session {
|
||||
state.merge_connector_selection(connector_ids)
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_task_id(&self) -> Option<String> {
|
||||
let state = self.state.lock().await;
|
||||
state.agent_task_id()
|
||||
}
|
||||
|
||||
pub(crate) async fn ensure_agent_task(&self, turn_context: &TurnContext) -> Option<String> {
|
||||
if !turn_context
|
||||
.config
|
||||
.features
|
||||
.enabled(Feature::UseAgentIdentity)
|
||||
{
|
||||
let had_task = {
|
||||
let mut state = self.state.lock().await;
|
||||
let had_task = state.agent_task_id().is_some();
|
||||
if had_task {
|
||||
state.set_agent_task_id(None);
|
||||
}
|
||||
had_task
|
||||
};
|
||||
self.services.model_client.set_agent_task_id(None);
|
||||
if had_task {
|
||||
self.refresh_mcp_servers_for_agent_task(turn_context).await;
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
if let Some(task_id) = self.agent_task_id().await {
|
||||
self.services
|
||||
.model_client
|
||||
.set_agent_task_id(Some(task_id.clone()));
|
||||
return Some(task_id);
|
||||
}
|
||||
|
||||
let task_id = match self
|
||||
.services
|
||||
.agent_identity_manager
|
||||
.ensure_thread_task(
|
||||
turn_context.config.as_ref(),
|
||||
&self.conversation_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(task_id) => task_id,
|
||||
Err(err) => {
|
||||
let message =
|
||||
format!("Agent identity flow failed; continuing without agent task: {err:#}");
|
||||
warn!("{message}");
|
||||
self.send_event(turn_context, EventMsg::Warning(WarningEvent { message }))
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let task_id = task_id?;
|
||||
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_agent_task_id(Some(task_id.clone()));
|
||||
}
|
||||
self.services
|
||||
.model_client
|
||||
.set_agent_task_id(Some(task_id.clone()));
|
||||
self.refresh_mcp_servers_for_agent_task(turn_context).await;
|
||||
Some(task_id)
|
||||
}
|
||||
|
||||
// Returns the connector IDs currently selected for this session.
|
||||
pub(crate) async fn get_connector_selection(&self) -> HashSet<String> {
|
||||
let state = self.state.lock().await;
|
||||
@@ -2059,6 +2130,20 @@ impl Session {
|
||||
state.clear_connector_selection();
|
||||
}
|
||||
|
||||
async fn refresh_mcp_servers_for_agent_task(&self, turn_context: &TurnContext) {
|
||||
let config = self.get_config().await;
|
||||
let mcp_servers = self
|
||||
.services
|
||||
.mcp_manager
|
||||
.configured_servers(config.as_ref());
|
||||
self.refresh_mcp_servers_inner(
|
||||
turn_context,
|
||||
mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
let is_subagent = {
|
||||
@@ -4012,6 +4097,7 @@ impl Session {
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
) {
|
||||
let auth = self.services.auth_manager.auth().await;
|
||||
let agent_task_id = self.agent_task_id().await;
|
||||
let config = self.get_config().await;
|
||||
let tool_plugin_provenance = self
|
||||
.services
|
||||
@@ -4022,6 +4108,7 @@ impl Session {
|
||||
self.features.apps_enabled_for_auth(auth.as_ref()),
|
||||
auth.as_ref(),
|
||||
config.as_ref(),
|
||||
agent_task_id.as_deref(),
|
||||
);
|
||||
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await;
|
||||
let sandbox_state = SandboxState {
|
||||
@@ -4749,10 +4836,12 @@ mod handlers {
|
||||
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
|
||||
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
|
||||
let auth = sess.services.auth_manager.auth().await;
|
||||
let mcp_servers = sess
|
||||
.services
|
||||
.mcp_manager
|
||||
.effective_servers(config, auth.as_ref());
|
||||
let agent_task_id = sess.agent_task_id().await;
|
||||
let mcp_servers = sess.services.mcp_manager.effective_servers_with_agent_task(
|
||||
config,
|
||||
auth.as_ref(),
|
||||
agent_task_id.as_deref(),
|
||||
);
|
||||
let snapshot = collect_mcp_snapshot_from_manager(
|
||||
&mcp_connection_manager,
|
||||
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode)
|
||||
@@ -5495,6 +5584,8 @@ pub(crate) async fn run_turn(
|
||||
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
|
||||
.await;
|
||||
|
||||
sess.ensure_agent_task(turn_context.as_ref()).await;
|
||||
|
||||
let loaded_plugins = sess
|
||||
.services
|
||||
.plugins_manager
|
||||
@@ -5673,6 +5764,7 @@ pub(crate) async fn run_turn(
|
||||
// one instance across retries within this turn.
|
||||
let mut client_session =
|
||||
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
|
||||
client_session.set_agent_task_id(sess.agent_task_id().await);
|
||||
|
||||
loop {
|
||||
if let Some(session_start_source) = sess.take_pending_session_start_source().await {
|
||||
|
||||
@@ -2409,6 +2409,10 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Session::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
agent_identity_manager: crate::agent_identity::AgentIdentityManager::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.codex_home.clone(),
|
||||
),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
@@ -3199,6 +3203,10 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Session::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
agent_identity_manager: crate::agent_identity::AgentIdentityManager::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.codex_home.clone(),
|
||||
),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
|
||||
@@ -186,7 +186,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
});
|
||||
}
|
||||
|
||||
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config);
|
||||
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config, None);
|
||||
if mcp_servers.is_empty() {
|
||||
return Ok(AccessibleConnectorsStatus {
|
||||
connectors: Vec::new(),
|
||||
|
||||
@@ -188,6 +188,8 @@ pub enum Feature {
|
||||
ResponsesWebsockets,
|
||||
/// Enable Responses API websocket v2 mode.
|
||||
ResponsesWebsocketsV2,
|
||||
/// Use the agent identity registration + per-thread task flow for ChatGPT auth.
|
||||
UseAgentIdentity,
|
||||
}
|
||||
|
||||
impl Feature {
|
||||
@@ -869,6 +871,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::UseAgentIdentity,
|
||||
key: "use_agent_identity",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
];
|
||||
|
||||
/// Push a warning event if any under-development features are enabled.
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
// the TUI or the tracing stack).
|
||||
#![deny(clippy::print_stdout, clippy::print_stderr)]
|
||||
|
||||
mod agent_identity;
|
||||
mod analytics_client;
|
||||
pub mod api_bridge;
|
||||
mod apply_patch;
|
||||
@@ -76,6 +77,7 @@ pub mod token_data;
|
||||
mod truncate;
|
||||
mod unified_exec;
|
||||
pub mod windows_sandbox;
|
||||
pub use client::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
|
||||
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
|
||||
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
|
||||
pub use model_provider_info::DEFAULT_OLLAMA_PORT;
|
||||
|
||||
@@ -15,9 +15,11 @@ use codex_protocol::mcp::Tool;
|
||||
use codex_protocol::protocol::McpListToolsResponseEvent;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use serde_json::Value;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::CodexAuth;
|
||||
use crate::client::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
|
||||
use crate::config::Config;
|
||||
use crate::config::types::McpServerConfig;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
@@ -110,7 +112,10 @@ fn codex_apps_mcp_bearer_token(auth: Option<&CodexAuth>) -> Option<String> {
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<String, String>> {
|
||||
fn codex_apps_mcp_http_headers(
|
||||
auth: Option<&CodexAuth>,
|
||||
agent_task_id: Option<&str>,
|
||||
) -> Option<HashMap<String, String>> {
|
||||
let mut headers = HashMap::new();
|
||||
if let Some(token) = codex_apps_mcp_bearer_token(auth) {
|
||||
headers.insert("Authorization".to_string(), format!("Bearer {token}"));
|
||||
@@ -118,6 +123,13 @@ fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<Strin
|
||||
if let Some(account_id) = auth.and_then(CodexAuth::get_account_id) {
|
||||
headers.insert("ChatGPT-Account-ID".to_string(), account_id);
|
||||
}
|
||||
if let Some(agent_task_id) = agent_task_id {
|
||||
trace!("attaching agent task id to codex apps gateway");
|
||||
headers.insert(
|
||||
X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER.to_string(),
|
||||
agent_task_id.to_string(),
|
||||
);
|
||||
}
|
||||
if headers.is_empty() {
|
||||
None
|
||||
} else {
|
||||
@@ -151,12 +163,16 @@ pub(crate) fn codex_apps_mcp_url(config: &Config) -> String {
|
||||
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
|
||||
}
|
||||
|
||||
fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig {
|
||||
fn codex_apps_mcp_server_config(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
agent_task_id: Option<&str>,
|
||||
) -> McpServerConfig {
|
||||
let bearer_token_env_var = codex_apps_mcp_bearer_token_env_var();
|
||||
let http_headers = if bearer_token_env_var.is_some() {
|
||||
None
|
||||
} else {
|
||||
codex_apps_mcp_http_headers(auth)
|
||||
codex_apps_mcp_http_headers(auth, agent_task_id)
|
||||
};
|
||||
let url = codex_apps_mcp_url(config);
|
||||
|
||||
@@ -184,11 +200,12 @@ pub(crate) fn with_codex_apps_mcp(
|
||||
connectors_enabled: bool,
|
||||
auth: Option<&CodexAuth>,
|
||||
config: &Config,
|
||||
agent_task_id: Option<&str>,
|
||||
) -> HashMap<String, McpServerConfig> {
|
||||
if connectors_enabled {
|
||||
servers.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
codex_apps_mcp_server_config(config, auth),
|
||||
codex_apps_mcp_server_config(config, auth, agent_task_id),
|
||||
);
|
||||
} else {
|
||||
servers.remove(CODEX_APPS_MCP_SERVER_NAME);
|
||||
@@ -214,7 +231,16 @@ impl McpManager {
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
) -> HashMap<String, McpServerConfig> {
|
||||
effective_mcp_servers(config, auth, self.plugins_manager.as_ref())
|
||||
self.effective_servers_with_agent_task(config, auth, None)
|
||||
}
|
||||
|
||||
pub fn effective_servers_with_agent_task(
|
||||
&self,
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
agent_task_id: Option<&str>,
|
||||
) -> HashMap<String, McpServerConfig> {
|
||||
effective_mcp_servers(config, auth, self.plugins_manager.as_ref(), agent_task_id)
|
||||
}
|
||||
|
||||
pub fn tool_plugin_provenance(&self, config: &Config) -> ToolPluginProvenance {
|
||||
@@ -239,6 +265,7 @@ fn effective_mcp_servers(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
plugins_manager: &PluginsManager,
|
||||
agent_task_id: Option<&str>,
|
||||
) -> HashMap<String, McpServerConfig> {
|
||||
let servers = configured_mcp_servers(config, plugins_manager);
|
||||
with_codex_apps_mcp(
|
||||
@@ -246,6 +273,7 @@ fn effective_mcp_servers(
|
||||
config.features.apps_enabled_for_auth(auth),
|
||||
auth,
|
||||
config,
|
||||
agent_task_id,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -159,7 +159,7 @@ fn codex_apps_server_config_uses_legacy_codex_apps_path() {
|
||||
let mut config = crate::config::test_config();
|
||||
config.chatgpt_base_url = "https://chatgpt.com".to_string();
|
||||
|
||||
let mut servers = with_codex_apps_mcp(HashMap::new(), false, None, &config);
|
||||
let mut servers = with_codex_apps_mcp(HashMap::new(), false, None, &config, None);
|
||||
assert!(!servers.contains_key(CODEX_APPS_MCP_SERVER_NAME));
|
||||
|
||||
config
|
||||
@@ -167,7 +167,7 @@ fn codex_apps_server_config_uses_legacy_codex_apps_path() {
|
||||
.enable(Feature::Apps)
|
||||
.expect("test config should allow apps");
|
||||
|
||||
servers = with_codex_apps_mcp(servers, true, None, &config);
|
||||
servers = with_codex_apps_mcp(servers, true, None, &config, None);
|
||||
let server = servers
|
||||
.get(CODEX_APPS_MCP_SERVER_NAME)
|
||||
.expect("codex apps should be present when apps is enabled");
|
||||
@@ -179,6 +179,34 @@ fn codex_apps_server_config_uses_legacy_codex_apps_path() {
|
||||
assert_eq!(url, "https://chatgpt.com/backend-api/wham/apps");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_server_config_includes_agent_task_header() {
|
||||
let mut config = crate::config::test_config();
|
||||
config.chatgpt_base_url = "https://chatgpt.com".to_string();
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Apps)
|
||||
.expect("test config should allow apps");
|
||||
|
||||
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
|
||||
let servers = with_codex_apps_mcp(HashMap::new(), true, Some(&auth), &config, Some("task-123"));
|
||||
let server = servers
|
||||
.get(CODEX_APPS_MCP_SERVER_NAME)
|
||||
.expect("codex apps should be present when apps is enabled");
|
||||
|
||||
match &server.transport {
|
||||
McpServerTransportConfig::StreamableHttp { http_headers, .. } => {
|
||||
assert_eq!(
|
||||
http_headers
|
||||
.as_ref()
|
||||
.and_then(|headers| headers.get(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)),
|
||||
Some(&"task-123".to_string())
|
||||
);
|
||||
}
|
||||
other => panic!("expected streamable http transport, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn effective_mcp_servers_include_plugins_without_overriding_user_config() {
|
||||
let codex_home = tempfile::tempdir().expect("tempdir");
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||
use crate::AuthManager;
|
||||
use crate::RolloutRecorder;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::agent_identity::AgentIdentityManager;
|
||||
use crate::analytics_client::AnalyticsEventsClient;
|
||||
use crate::client::ModelClient;
|
||||
use crate::config::StartedNetworkProxy;
|
||||
@@ -60,5 +61,6 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
/// Session-scoped model client shared across turns.
|
||||
pub(crate) model_client: ModelClient,
|
||||
pub(crate) agent_identity_manager: AgentIdentityManager,
|
||||
pub(crate) code_mode_service: CodeModeService,
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ pub(crate) struct SessionState {
|
||||
pub(crate) startup_regular_task: Option<JoinHandle<CodexResult<RegularTask>>>,
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
|
||||
agent_task_id: Option<String>,
|
||||
granted_permissions: Option<PermissionProfile>,
|
||||
}
|
||||
|
||||
@@ -52,6 +53,7 @@ impl SessionState {
|
||||
startup_regular_task: None,
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
agent_task_id: None,
|
||||
granted_permissions: None,
|
||||
}
|
||||
}
|
||||
@@ -207,6 +209,14 @@ impl SessionState {
|
||||
self.pending_session_start_source.take()
|
||||
}
|
||||
|
||||
pub(crate) fn agent_task_id(&self) -> Option<String> {
|
||||
self.agent_task_id.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_agent_task_id(&mut self, task_id: Option<String>) {
|
||||
self.agent_task_id = task_id;
|
||||
}
|
||||
|
||||
pub(crate) fn record_granted_permissions(&mut self, permissions: PermissionProfile) {
|
||||
self.granted_permissions =
|
||||
merge_permission_profiles(self.granted_permissions.as_ref(), Some(&permissions));
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_core::ModelProviderInfo;
|
||||
use codex_core::Prompt;
|
||||
use codex_core::ResponseEvent;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -247,6 +248,119 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_stream_includes_agent_task_header() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_body = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
|
||||
let request_recorder = responses::mount_sse_once_match(
|
||||
&server,
|
||||
header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER, "task-123"),
|
||||
response_body,
|
||||
)
|
||||
.await;
|
||||
|
||||
let provider = ModelProviderInfo {
|
||||
name: "mock".into(),
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model_provider_id = provider.name.clone();
|
||||
config.model_provider = provider.clone();
|
||||
let effort = config.model_reasoning_effort;
|
||||
let summary = config.model_reasoning_summary;
|
||||
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
|
||||
config.model = Some(model.clone());
|
||||
let config = Arc::new(config);
|
||||
|
||||
let conversation_id = ThreadId::new();
|
||||
let session_source = SessionSource::Exec;
|
||||
let model_info =
|
||||
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
|
||||
let session_telemetry = SessionTelemetry::new(
|
||||
conversation_id,
|
||||
model.as_str(),
|
||||
model_info.slug.as_str(),
|
||||
None,
|
||||
Some("test@test.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"test_originator".to_string(),
|
||||
false,
|
||||
"test".to_string(),
|
||||
session_source.clone(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
None,
|
||||
conversation_id,
|
||||
provider,
|
||||
session_source,
|
||||
config.model_verbosity,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
);
|
||||
client.set_agent_task_id(Some("task-123".to_string()));
|
||||
let mut client_session = client.new_session();
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
prompt.input = vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".into(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "hello".into(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}];
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&session_telemetry,
|
||||
effort,
|
||||
summary.unwrap_or(model_info.default_reasoning_summary),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let request = request_recorder.single_request();
|
||||
assert_eq!(
|
||||
request
|
||||
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
|
||||
.as_deref(),
|
||||
Some("task-123")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_respects_model_info_overrides_from_config() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
|
||||
481
codex-rs/core/tests/suite/agent_identity.rs
Normal file
481
codex-rs/core/tests/suite/agent_identity.rs
Normal file
@@ -0,0 +1,481 @@
|
||||
#![cfg(not(target_os = "windows"))]
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use anyhow::Result;
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
|
||||
use codex_core::auth::AuthCredentialsStoreMode;
|
||||
use codex_core::auth::AuthDotJson;
|
||||
use codex_core::auth::save_auth;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::token_data::IdTokenInfo;
|
||||
use codex_core::token_data::TokenData;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::Mock;
|
||||
use wiremock::Request;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct JsonSequenceResponder {
|
||||
bodies: Arc<Mutex<VecDeque<Value>>>,
|
||||
}
|
||||
|
||||
impl JsonSequenceResponder {
|
||||
fn new(bodies: Vec<Value>) -> Self {
|
||||
Self {
|
||||
bodies: Arc::new(Mutex::new(VecDeque::from(bodies))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Respond for JsonSequenceResponder {
|
||||
fn respond(&self, _request: &Request) -> ResponseTemplate {
|
||||
let body = self
|
||||
.bodies
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.pop_front()
|
||||
.unwrap_or_else(|| panic!("missing queued JSON response"));
|
||||
ResponseTemplate::new(200).set_body_json(body)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn feature_off_skips_agent_identity_requests() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
}
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
test.submit_turn_with_policy("hello", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/agent/register"),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/task/register"),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
response_mock
|
||||
.single_request()
|
||||
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER),
|
||||
None
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn agent_identity_registers_once_and_reuses_task_within_thread() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_agent_identity_endpoints(&server, vec!["task-1"]).await;
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UseAgentIdentity)
|
||||
.expect("test config should allow feature update");
|
||||
}
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
test.submit_turn_with_policy("hello", workspace_write_with_network())
|
||||
.await?;
|
||||
test.submit_turn_with_policy("again", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/agent/register"),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/task/register"),
|
||||
1
|
||||
);
|
||||
assert_eq!(response_mock.requests().len(), 2);
|
||||
|
||||
let task_request = request_json_body(
|
||||
requests
|
||||
.iter()
|
||||
.find(|request| request.url.path() == "/backend-api/task/register")
|
||||
.expect("missing task register request"),
|
||||
);
|
||||
assert_eq!(task_request["agent_id"], json!("agent-runtime-1"));
|
||||
assert_eq!(
|
||||
task_request["metadata"]["thread_id"],
|
||||
json!(test.session_configured.session_id.to_string())
|
||||
);
|
||||
assert!(
|
||||
!task_request["timestamp"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.is_empty()
|
||||
);
|
||||
assert!(
|
||||
!task_request["signature"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.is_empty()
|
||||
);
|
||||
|
||||
for request in response_mock.requests() {
|
||||
assert_eq!(
|
||||
request
|
||||
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
|
||||
.as_deref(),
|
||||
Some("task-1")
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn new_thread_reuses_agent_identity_but_gets_new_task() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let home = Arc::new(TempDir::new()?);
|
||||
mount_agent_identity_endpoints(&server, vec!["task-1", "task-2"]).await;
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut first_builder = test_codex()
|
||||
.with_home(Arc::clone(&home))
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UseAgentIdentity)
|
||||
.expect("test config should allow feature update");
|
||||
}
|
||||
});
|
||||
let first = first_builder.build(&server).await?;
|
||||
first
|
||||
.submit_turn_with_policy("hello", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let mut second_builder = test_codex()
|
||||
.with_home(home)
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UseAgentIdentity)
|
||||
.expect("test config should allow feature update");
|
||||
}
|
||||
});
|
||||
let second = second_builder.build(&server).await?;
|
||||
second
|
||||
.submit_turn_with_policy("hello from new thread", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/agent/register"),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/task/register"),
|
||||
2
|
||||
);
|
||||
|
||||
let response_requests = response_mock.requests();
|
||||
assert_eq!(response_requests.len(), 2);
|
||||
assert_eq!(
|
||||
response_requests[0]
|
||||
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
|
||||
.as_deref(),
|
||||
Some("task-1")
|
||||
);
|
||||
assert_eq!(
|
||||
response_requests[1]
|
||||
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER)
|
||||
.as_deref(),
|
||||
Some("task-2")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auth_account_change_registers_a_new_agent_identity() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let home = Arc::new(TempDir::new()?);
|
||||
mount_agent_identity_endpoints(&server, vec!["task-1", "task-2"]).await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_string(sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
])),
|
||||
)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let mut first_builder = test_codex()
|
||||
.with_home(Arc::clone(&home))
|
||||
.with_auth(chatgpt_auth_for_account(home.path(), "account-1")?)
|
||||
.with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UseAgentIdentity)
|
||||
.expect("test config should allow feature update");
|
||||
}
|
||||
});
|
||||
let first = first_builder.build(&server).await?;
|
||||
first
|
||||
.submit_turn_with_policy("hello", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let mut second_builder = test_codex()
|
||||
.with_home(home)
|
||||
.with_auth(chatgpt_auth_for_account(first.home.path(), "account-2")?)
|
||||
.with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UseAgentIdentity)
|
||||
.expect("test config should allow feature update");
|
||||
}
|
||||
});
|
||||
let second = second_builder.build(&server).await?;
|
||||
second
|
||||
.submit_turn_with_policy("hello from second account", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/agent/register"),
|
||||
2
|
||||
);
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/task/register"),
|
||||
2
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn api_key_auth_bypasses_agent_identity_flow() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let base_url = server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = base_url;
|
||||
config.model_provider.supports_websockets = false;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UseAgentIdentity)
|
||||
.expect("test config should allow feature update");
|
||||
}
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
test.submit_turn_with_policy("hello", workspace_write_with_network())
|
||||
.await?;
|
||||
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/agent/register"),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
count_requests_for_path(&requests, "/backend-api/task/register"),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
response_mock
|
||||
.single_request()
|
||||
.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER),
|
||||
None
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mount_agent_identity_endpoints(server: &wiremock::MockServer, task_ids: Vec<&str>) {
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/agent/register"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"agent_runtime_id": "agent-runtime-1",
|
||||
})))
|
||||
.mount(server)
|
||||
.await;
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/task/register"))
|
||||
.respond_with(JsonSequenceResponder::new(
|
||||
task_ids
|
||||
.into_iter()
|
||||
.map(|task_id| json!({ "task_id": task_id }))
|
||||
.collect(),
|
||||
))
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn count_requests_for_path(requests: &[wiremock::Request], expected_path: &str) -> usize {
|
||||
requests
|
||||
.iter()
|
||||
.filter(|request| request.url.path() == expected_path)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn request_json_body(request: &wiremock::Request) -> Value {
|
||||
match serde_json::from_slice(&request.body) {
|
||||
Ok(body) => body,
|
||||
Err(err) => panic!("request body should be valid JSON: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn workspace_write_with_network() -> codex_protocol::protocol::SandboxPolicy {
|
||||
codex_protocol::protocol::SandboxPolicy::WorkspaceWrite {
|
||||
writable_roots: vec![],
|
||||
read_only_access: Default::default(),
|
||||
network_access: true,
|
||||
exclude_tmpdir_env_var: true,
|
||||
exclude_slash_tmp: true,
|
||||
}
|
||||
}
|
||||
|
||||
fn chatgpt_auth_for_account(codex_home: &std::path::Path, account_id: &str) -> Result<CodexAuth> {
|
||||
let mut id_token = IdTokenInfo::default();
|
||||
id_token.raw_jwt = fake_chatgpt_id_token(account_id)?;
|
||||
let auth_dot_json = AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(TokenData {
|
||||
id_token,
|
||||
access_token: format!("access-token-{account_id}"),
|
||||
refresh_token: "refresh-token".to_string(),
|
||||
account_id: Some(account_id.to_string()),
|
||||
}),
|
||||
last_refresh: Some(Utc::now()),
|
||||
};
|
||||
save_auth(codex_home, &auth_dot_json, AuthCredentialsStoreMode::File)?;
|
||||
let Some(auth) = CodexAuth::from_auth_storage(codex_home, AuthCredentialsStoreMode::File)?
|
||||
else {
|
||||
anyhow::bail!("auth should load from storage");
|
||||
};
|
||||
Ok(auth)
|
||||
}
|
||||
|
||||
fn fake_chatgpt_id_token(account_id: &str) -> Result<String> {
|
||||
#[derive(serde::Serialize)]
|
||||
struct Header {
|
||||
alg: &'static str,
|
||||
typ: &'static str,
|
||||
}
|
||||
|
||||
let header = Header {
|
||||
alg: "none",
|
||||
typ: "JWT",
|
||||
};
|
||||
let payload = json!({
|
||||
"email": "test@example.com",
|
||||
"https://api.openai.com/auth": {
|
||||
"chatgpt_user_id": "user-123",
|
||||
"chatgpt_account_id": account_id,
|
||||
},
|
||||
});
|
||||
|
||||
let header_bytes = serde_json::to_vec(&header)?;
|
||||
let payload_bytes = serde_json::to_vec(&payload)?;
|
||||
let header_b64 = URL_SAFE_NO_PAD.encode(header_bytes);
|
||||
let payload_b64 = URL_SAFE_NO_PAD.encode(payload_bytes);
|
||||
let signature_b64 = URL_SAFE_NO_PAD.encode("sig");
|
||||
Ok(format!("{header_b64}.{payload_b64}.{signature_b64}"))
|
||||
}
|
||||
@@ -6,6 +6,7 @@ use codex_core::ModelProviderInfo;
|
||||
use codex_core::Prompt;
|
||||
use codex_core::ResponseEvent;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER;
|
||||
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::ws_version_from_features;
|
||||
@@ -98,6 +99,34 @@ async fn responses_websocket_streams_request() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_includes_agent_task_header() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
harness
|
||||
.client
|
||||
.set_agent_task_id(Some("task-123".to_string()));
|
||||
let mut client_session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
stream_until_complete(&mut client_session, &harness, &prompt).await;
|
||||
|
||||
let handshake = server.single_handshake();
|
||||
assert_eq!(
|
||||
handshake.header(X_OPENAI_INTERNAL_CODEX_TASK_ID_HEADER),
|
||||
Some("task-123".to_string())
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preconnect_reuses_connection() {
|
||||
skip_if_no_network!();
|
||||
|
||||
@@ -56,6 +56,7 @@ pub static CODEX_ALIASES_TEMP_DIR: TestCodexAliasesGuard = unsafe {
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
mod abort_tasks;
|
||||
mod agent_identity;
|
||||
mod agent_jobs;
|
||||
mod agent_websocket;
|
||||
mod apply_patch_cli;
|
||||
|
||||
Reference in New Issue
Block a user