mirror of
https://github.com/openai/codex.git
synced 2026-06-01 19:02:59 +00:00
Register agent tasks behind use_agent_identity (#17387)
## Summary Stack PR3 for feature-gated agent identity support. This PR adds per-thread agent task registration behind `features.use_agent_identity`. Tasks are minted on the first real user turn and cached in thread runtime state for later turns. ## Stack - PR1: https://github.com/openai/codex/pull/17385 - add `features.use_agent_identity` - PR2: https://github.com/openai/codex/pull/17386 - register agent identities when enabled - PR3: https://github.com/openai/codex/pull/17387 - this PR, original task registration slice - PR3.1: https://github.com/openai/codex/pull/17978 - persist and prewarm registered tasks per thread - PR4: https://github.com/openai/codex/pull/17980 - use `AgentAssertion` downstream when enabled ## Validation Covered as part of the local stack validation pass: - `just fmt` - `cargo test -p codex-core --lib agent_identity` - `cargo test -p codex-core --lib agent_assertion` - `cargo test -p codex-core --lib websocket_agent_task` - `cargo test -p codex-api api_bridge` - `cargo build -p codex-cli --bin codex` ## Notes The full local app-server E2E path is still being debugged after PR creation. The current branch stack is directionally ready for review while that follow-up continues.
This commit is contained in:
@@ -27,6 +27,10 @@ use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
mod task_registration;
|
||||
|
||||
pub(crate) use task_registration::RegisteredAgentTask;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
const AGENT_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
@@ -119,32 +123,58 @@ impl AgentIdentityManager {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(auth) = self.auth_manager.auth().await else {
|
||||
debug!("skipping agent identity registration because no auth is available");
|
||||
let Some((auth, binding)) = self.current_auth_binding().await else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let Some(binding) =
|
||||
AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id())
|
||||
else {
|
||||
debug!("skipping agent identity registration because ChatGPT auth is unavailable");
|
||||
return Ok(None);
|
||||
};
|
||||
self.ensure_registered_identity_for_binding(&auth, &binding)
|
||||
.await
|
||||
.map(Some)
|
||||
}
|
||||
|
||||
async fn ensure_registered_identity_for_binding(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
binding: &AgentIdentityBinding,
|
||||
) -> Result<StoredAgentIdentity> {
|
||||
let _guard = self.ensure_lock.lock().await;
|
||||
|
||||
if let Some(stored_identity) = self.load_stored_identity(&auth, &binding)? {
|
||||
if let Some(stored_identity) = self.load_stored_identity(auth, binding)? {
|
||||
info!(
|
||||
agent_runtime_id = %stored_identity.agent_runtime_id,
|
||||
binding_id = %binding.binding_id,
|
||||
"reusing stored agent identity"
|
||||
);
|
||||
return Ok(Some(stored_identity));
|
||||
return Ok(stored_identity);
|
||||
}
|
||||
|
||||
let stored_identity = self.register_agent_identity(&binding).await?;
|
||||
self.store_identity(&auth, &stored_identity)?;
|
||||
Ok(Some(stored_identity))
|
||||
let stored_identity = self.register_agent_identity(binding).await?;
|
||||
self.store_identity(auth, &stored_identity)?;
|
||||
Ok(stored_identity)
|
||||
}
|
||||
|
||||
pub(crate) async fn task_matches_current_binding(&self, task: &RegisteredAgentTask) -> bool {
|
||||
if !self.feature_enabled {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.current_auth_binding()
|
||||
.await
|
||||
.is_some_and(|(_, binding)| task.matches_binding(&binding))
|
||||
}
|
||||
|
||||
async fn current_auth_binding(&self) -> Option<(CodexAuth, AgentIdentityBinding)> {
|
||||
let Some(auth) = self.auth_manager.auth().await else {
|
||||
debug!("skipping agent identity flow because no auth is available");
|
||||
return None;
|
||||
};
|
||||
|
||||
let binding =
|
||||
AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id());
|
||||
if binding.is_none() {
|
||||
debug!("skipping agent identity flow because ChatGPT auth is unavailable");
|
||||
}
|
||||
binding.map(|binding| (auth, binding))
|
||||
}
|
||||
|
||||
async fn register_agent_identity(
|
||||
@@ -351,12 +381,11 @@ impl StoredAgentIdentity {
|
||||
}
|
||||
|
||||
fn matches_binding(&self, binding: &AgentIdentityBinding) -> bool {
|
||||
self.binding_id == binding.binding_id
|
||||
&& self.chatgpt_account_id == binding.chatgpt_account_id
|
||||
&& match binding.chatgpt_user_id.as_deref() {
|
||||
Some(chatgpt_user_id) => self.chatgpt_user_id.as_deref() == Some(chatgpt_user_id),
|
||||
None => true,
|
||||
}
|
||||
binding.matches_parts(
|
||||
&self.binding_id,
|
||||
&self.chatgpt_account_id,
|
||||
self.chatgpt_user_id.as_deref(),
|
||||
)
|
||||
}
|
||||
|
||||
fn validate_key_material(&self) -> Result<()> {
|
||||
@@ -375,6 +404,20 @@ impl StoredAgentIdentity {
|
||||
}
|
||||
|
||||
impl AgentIdentityBinding {
|
||||
fn matches_parts(
|
||||
&self,
|
||||
binding_id: &str,
|
||||
chatgpt_account_id: &str,
|
||||
chatgpt_user_id: Option<&str>,
|
||||
) -> bool {
|
||||
binding_id == self.binding_id
|
||||
&& chatgpt_account_id == self.chatgpt_account_id
|
||||
&& match self.chatgpt_user_id.as_deref() {
|
||||
Some(expected_user_id) => chatgpt_user_id == Some(expected_user_id),
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn from_auth(auth: &CodexAuth, forced_workspace_id: Option<String>) -> Option<Self> {
|
||||
if !auth.is_chatgpt_auth() {
|
||||
return None;
|
||||
|
||||
470
codex-rs/core/src/agent_identity/task_registration.rs
Normal file
470
codex-rs/core/src/agent_identity/task_registration.rs
Normal file
@@ -0,0 +1,470 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use crypto_box::SecretKey as Curve25519SecretKey;
|
||||
use ed25519_dalek::Signer as _;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use sha2::Digest as _;
|
||||
use sha2::Sha512;
|
||||
use tracing::info;
|
||||
|
||||
use super::*;
|
||||
|
||||
const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct RegisteredAgentTask {
|
||||
pub(crate) binding_id: String,
|
||||
pub(crate) chatgpt_account_id: String,
|
||||
pub(crate) chatgpt_user_id: Option<String>,
|
||||
pub(crate) agent_runtime_id: String,
|
||||
pub(crate) task_id: String,
|
||||
pub(crate) registered_at: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RegisterTaskRequest {
|
||||
signature: String,
|
||||
timestamp: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RegisterTaskResponse {
|
||||
encrypted_task_id: String,
|
||||
}
|
||||
|
||||
impl AgentIdentityManager {
|
||||
pub(crate) async fn register_task(&self) -> Result<Option<RegisteredAgentTask>> {
|
||||
if !self.feature_enabled {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some((auth, binding)) = self.current_auth_binding().await else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.register_task_for_binding(auth, binding).await
|
||||
}
|
||||
|
||||
async fn register_task_for_binding(
|
||||
&self,
|
||||
auth: CodexAuth,
|
||||
binding: AgentIdentityBinding,
|
||||
) -> Result<Option<RegisteredAgentTask>> {
|
||||
let stored_identity = self
|
||||
.ensure_registered_identity_for_binding(&auth, &binding)
|
||||
.await?;
|
||||
|
||||
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
|
||||
let request_body = RegisterTaskRequest {
|
||||
signature: sign_task_registration_payload(&stored_identity, ×tamp)?,
|
||||
timestamp,
|
||||
};
|
||||
|
||||
let client = create_client();
|
||||
let url =
|
||||
agent_task_registration_url(&self.chatgpt_base_url, &stored_identity.agent_runtime_id);
|
||||
let human_biscuit = self.mint_human_biscuit(&binding, "POST", &url).await?;
|
||||
let response = client
|
||||
.post(&url)
|
||||
.header("X-OpenAI-Authorization", human_biscuit)
|
||||
.json(&request_body)
|
||||
.timeout(AGENT_TASK_REGISTRATION_TIMEOUT)
|
||||
.send()
|
||||
.await
|
||||
.with_context(|| format!("failed to send agent task registration request to {url}"))?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let response_body = response
|
||||
.json::<RegisterTaskResponse>()
|
||||
.await
|
||||
.with_context(|| format!("failed to parse agent task response from {url}"))?;
|
||||
let registered_task = RegisteredAgentTask {
|
||||
binding_id: stored_identity.binding_id.clone(),
|
||||
chatgpt_account_id: stored_identity.chatgpt_account_id.clone(),
|
||||
chatgpt_user_id: stored_identity.chatgpt_user_id.clone(),
|
||||
agent_runtime_id: stored_identity.agent_runtime_id.clone(),
|
||||
task_id: decrypt_task_id_response(
|
||||
&stored_identity,
|
||||
&response_body.encrypted_task_id,
|
||||
)?,
|
||||
registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
};
|
||||
info!(
|
||||
agent_runtime_id = %registered_task.agent_runtime_id,
|
||||
task_id = %registered_task.task_id,
|
||||
"registered agent task"
|
||||
);
|
||||
return Ok(Some(registered_task));
|
||||
}
|
||||
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
anyhow::bail!("agent task registration failed with status {status} from {url}: {body}")
|
||||
}
|
||||
}
|
||||
|
||||
impl RegisteredAgentTask {
|
||||
pub(super) fn matches_binding(&self, binding: &AgentIdentityBinding) -> bool {
|
||||
binding.matches_parts(
|
||||
&self.binding_id,
|
||||
&self.chatgpt_account_id,
|
||||
self.chatgpt_user_id.as_deref(),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn has_same_binding(&self, other: &Self) -> bool {
|
||||
self.binding_id == other.binding_id
|
||||
&& self.chatgpt_account_id == other.chatgpt_account_id
|
||||
&& self.chatgpt_user_id == other.chatgpt_user_id
|
||||
}
|
||||
}
|
||||
|
||||
fn sign_task_registration_payload(
|
||||
stored_identity: &StoredAgentIdentity,
|
||||
timestamp: &str,
|
||||
) -> Result<String> {
|
||||
let signing_key = stored_identity.signing_key()?;
|
||||
let payload = format!("{}:{timestamp}", stored_identity.agent_runtime_id);
|
||||
Ok(BASE64_STANDARD.encode(signing_key.sign(payload.as_bytes()).to_bytes()))
|
||||
}
|
||||
|
||||
fn decrypt_task_id_response(
|
||||
stored_identity: &StoredAgentIdentity,
|
||||
encrypted_task_id: &str,
|
||||
) -> Result<String> {
|
||||
let signing_key = stored_identity.signing_key()?;
|
||||
let ciphertext = BASE64_STANDARD
|
||||
.decode(encrypted_task_id)
|
||||
.context("encrypted task id is not valid base64")?;
|
||||
let plaintext = curve25519_secret_key_from_signing_key(&signing_key)
|
||||
.unseal(&ciphertext)
|
||||
.map_err(|_| anyhow::anyhow!("failed to decrypt encrypted task id"))?;
|
||||
String::from_utf8(plaintext).context("decrypted task id is not valid UTF-8")
|
||||
}
|
||||
|
||||
fn curve25519_secret_key_from_signing_key(signing_key: &SigningKey) -> Curve25519SecretKey {
|
||||
let digest = Sha512::digest(signing_key.to_bytes());
|
||||
let mut secret_key = [0u8; 32];
|
||||
secret_key.copy_from_slice(&digest[..32]);
|
||||
secret_key[0] &= 248;
|
||||
secret_key[31] &= 127;
|
||||
secret_key[31] |= 64;
|
||||
Curve25519SecretKey::from(secret_key)
|
||||
}
|
||||
|
||||
fn agent_task_registration_url(chatgpt_base_url: &str, agent_runtime_id: &str) -> String {
|
||||
let trimmed = chatgpt_base_url.trim_end_matches('/');
|
||||
format!("{trimmed}/v1/agent/{agent_runtime_id}/task/register")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
|
||||
use codex_app_server_protocol::AuthMode as ApiAuthMode;
|
||||
use codex_login::AuthCredentialsStoreMode;
|
||||
use codex_login::AuthDotJson;
|
||||
use codex_login::save_auth;
|
||||
use codex_login::token_data::IdTokenInfo;
|
||||
use codex_login::token_data::TokenData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_task_skips_when_feature_is_disabled() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(make_chatgpt_auth("account-123", Some("user-123")));
|
||||
let manager = AgentIdentityManager::new_for_tests(
|
||||
auth_manager,
|
||||
/*feature_enabled*/ false,
|
||||
"https://chatgpt.com/backend-api/".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
|
||||
assert_eq!(manager.register_task().await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_task_skips_for_api_key_auth() {
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test-key"));
|
||||
let manager = AgentIdentityManager::new_for_tests(
|
||||
auth_manager,
|
||||
/*feature_enabled*/ true,
|
||||
"https://chatgpt.com/backend-api/".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
|
||||
assert_eq!(manager.register_task().await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_task_registers_and_decrypts_plaintext_task_id() {
|
||||
let server = MockServer::start().await;
|
||||
let chatgpt_base_url = server.uri();
|
||||
mount_human_biscuit(&server, &chatgpt_base_url, "agent-123").await;
|
||||
let auth = make_chatgpt_auth("account-123", Some("user-123"));
|
||||
let auth_manager = AuthManager::from_auth_for_testing(auth.clone());
|
||||
let manager = AgentIdentityManager::new_for_tests(
|
||||
auth_manager,
|
||||
/*feature_enabled*/ true,
|
||||
chatgpt_base_url,
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let stored_identity = seed_stored_identity(&manager, &auth, "agent-123", "account-123");
|
||||
let encrypted_task_id =
|
||||
encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/agent/agent-123/task/register"))
|
||||
.and(header("x-openai-authorization", "human-biscuit"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"encrypted_task_id": encrypted_task_id,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let task = manager
|
||||
.register_task()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("task should be registered");
|
||||
|
||||
assert_eq!(
|
||||
task,
|
||||
RegisteredAgentTask {
|
||||
binding_id: "chatgpt-account-account-123".to_string(),
|
||||
chatgpt_account_id: "account-123".to_string(),
|
||||
chatgpt_user_id: Some("user-123".to_string()),
|
||||
agent_runtime_id: "agent-123".to_string(),
|
||||
task_id: "task_123".to_string(),
|
||||
registered_at: task.registered_at.clone(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_task_uses_chatgpt_base_url() {
|
||||
let server = MockServer::start().await;
|
||||
let chatgpt_base_url = format!("{}/backend-api", server.uri());
|
||||
mount_human_biscuit(&server, &chatgpt_base_url, "agent-fallback").await;
|
||||
let auth = make_chatgpt_auth("account-123", Some("user-123"));
|
||||
let auth_manager = AuthManager::from_auth_for_testing(auth.clone());
|
||||
let manager = AgentIdentityManager::new_for_tests(
|
||||
auth_manager,
|
||||
/*feature_enabled*/ true,
|
||||
chatgpt_base_url,
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let stored_identity =
|
||||
seed_stored_identity(&manager, &auth, "agent-fallback", "account-123");
|
||||
let encrypted_task_id = encrypt_task_id_for_identity(&stored_identity, "task_fallback")
|
||||
.expect("task ciphertext");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/v1/agent/agent-fallback/task/register"))
|
||||
.and(header("x-openai-authorization", "human-biscuit"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"encrypted_task_id": encrypted_task_id,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let task = manager
|
||||
.register_task()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("task should be registered");
|
||||
|
||||
assert_eq!(task.agent_runtime_id, "agent-fallback");
|
||||
assert_eq!(task.task_id, "task_fallback");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_task_for_binding_keeps_one_auth_snapshot() {
|
||||
let server = MockServer::start().await;
|
||||
let chatgpt_base_url = server.uri();
|
||||
mount_human_biscuit(&server, &chatgpt_base_url, "agent-123").await;
|
||||
let binding_auth = make_chatgpt_auth("account-123", Some("user-123"));
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(make_chatgpt_auth("account-456", Some("user-456")));
|
||||
let manager = AgentIdentityManager::new_for_tests(
|
||||
auth_manager,
|
||||
/*feature_enabled*/ true,
|
||||
chatgpt_base_url,
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let stored_identity =
|
||||
seed_stored_identity(&manager, &binding_auth, "agent-123", "account-123");
|
||||
let encrypted_task_id =
|
||||
encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext");
|
||||
let binding =
|
||||
AgentIdentityBinding::from_auth(&binding_auth, /*forced_workspace_id*/ None)
|
||||
.expect("binding");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/agent/agent-123/task/register"))
|
||||
.and(header("x-openai-authorization", "human-biscuit"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"encrypted_task_id": encrypted_task_id,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let task = manager
|
||||
.register_task_for_binding(binding_auth, binding)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("task should be registered");
|
||||
|
||||
assert_eq!(
|
||||
task,
|
||||
RegisteredAgentTask {
|
||||
binding_id: "chatgpt-account-account-123".to_string(),
|
||||
chatgpt_account_id: "account-123".to_string(),
|
||||
chatgpt_user_id: Some("user-123".to_string()),
|
||||
agent_runtime_id: "agent-123".to_string(),
|
||||
task_id: "task_123".to_string(),
|
||||
registered_at: task.registered_at.clone(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_matches_current_binding_rejects_stale_auth_binding() {
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(make_chatgpt_auth("account-456", Some("user-456")));
|
||||
let manager = AgentIdentityManager::new_for_tests(
|
||||
auth_manager,
|
||||
/*feature_enabled*/ true,
|
||||
"https://chatgpt.com/backend-api/".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let task = RegisteredAgentTask {
|
||||
binding_id: "chatgpt-account-account-123".to_string(),
|
||||
chatgpt_account_id: "account-123".to_string(),
|
||||
chatgpt_user_id: Some("user-123".to_string()),
|
||||
agent_runtime_id: "agent-123".to_string(),
|
||||
task_id: "task_123".to_string(),
|
||||
registered_at: "2026-03-23T12:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
assert!(!manager.task_matches_current_binding(&task).await);
|
||||
}
|
||||
|
||||
async fn mount_human_biscuit(
|
||||
server: &MockServer,
|
||||
chatgpt_base_url: &str,
|
||||
agent_runtime_id: &str,
|
||||
) {
|
||||
let biscuit_url = agent_identity_biscuit_url(chatgpt_base_url);
|
||||
let biscuit_path = reqwest::Url::parse(&biscuit_url)
|
||||
.expect("biscuit URL parses")
|
||||
.path()
|
||||
.to_string();
|
||||
let target_url = agent_task_registration_url(chatgpt_base_url, agent_runtime_id);
|
||||
Mock::given(method("GET"))
|
||||
.and(path(biscuit_path))
|
||||
.and(header("authorization", "Bearer access-token-account-123"))
|
||||
.and(header("x-original-method", "POST"))
|
||||
.and(header("x-original-url", target_url))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200).insert_header("x-openai-authorization", "human-biscuit"),
|
||||
)
|
||||
.expect(1)
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn seed_stored_identity(
|
||||
manager: &AgentIdentityManager,
|
||||
auth: &CodexAuth,
|
||||
agent_runtime_id: &str,
|
||||
account_id: &str,
|
||||
) -> StoredAgentIdentity {
|
||||
let key_material = generate_agent_key_material().expect("key material");
|
||||
let binding =
|
||||
AgentIdentityBinding::from_auth(auth, /*forced_workspace_id*/ None).expect("binding");
|
||||
let stored_identity = StoredAgentIdentity {
|
||||
binding_id: binding.binding_id,
|
||||
chatgpt_account_id: account_id.to_string(),
|
||||
chatgpt_user_id: Some("user-123".to_string()),
|
||||
agent_runtime_id: agent_runtime_id.to_string(),
|
||||
private_key_pkcs8_base64: key_material.private_key_pkcs8_base64,
|
||||
public_key_ssh: key_material.public_key_ssh,
|
||||
registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
abom: manager.abom.clone(),
|
||||
};
|
||||
manager
|
||||
.store_identity(auth, &stored_identity)
|
||||
.expect("store identity");
|
||||
let persisted = auth
|
||||
.get_agent_identity(account_id)
|
||||
.expect("persisted identity");
|
||||
assert_eq!(persisted.agent_runtime_id, agent_runtime_id);
|
||||
stored_identity
|
||||
}
|
||||
|
||||
fn encrypt_task_id_for_identity(
|
||||
stored_identity: &StoredAgentIdentity,
|
||||
task_id: &str,
|
||||
) -> Result<String> {
|
||||
let mut rng = crypto_box::aead::OsRng;
|
||||
let public_key =
|
||||
curve25519_secret_key_from_signing_key(&stored_identity.signing_key()?).public_key();
|
||||
let ciphertext = public_key
|
||||
.seal(&mut rng, task_id.as_bytes())
|
||||
.map_err(|_| anyhow::anyhow!("failed to encrypt test task id"))?;
|
||||
Ok(BASE64_STANDARD.encode(ciphertext))
|
||||
}
|
||||
|
||||
fn make_chatgpt_auth(account_id: &str, user_id: Option<&str>) -> CodexAuth {
|
||||
let tempdir = tempfile::tempdir().expect("tempdir");
|
||||
let auth_json = AuthDotJson {
|
||||
auth_mode: Some(ApiAuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(TokenData {
|
||||
id_token: IdTokenInfo {
|
||||
email: None,
|
||||
chatgpt_plan_type: None,
|
||||
chatgpt_user_id: user_id.map(ToOwned::to_owned),
|
||||
chatgpt_account_id: Some(account_id.to_string()),
|
||||
chatgpt_account_is_fedramp: false,
|
||||
raw_jwt: fake_id_token(account_id, user_id),
|
||||
},
|
||||
access_token: format!("access-token-{account_id}"),
|
||||
refresh_token: "refresh-token".to_string(),
|
||||
account_id: Some(account_id.to_string()),
|
||||
}),
|
||||
last_refresh: Some(Utc::now()),
|
||||
agent_identity: None,
|
||||
};
|
||||
save_auth(tempdir.path(), &auth_json, AuthCredentialsStoreMode::File).expect("save auth");
|
||||
CodexAuth::from_auth_storage(tempdir.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("load auth")
|
||||
.expect("auth")
|
||||
}
|
||||
|
||||
fn fake_id_token(account_id: &str, user_id: Option<&str>) -> String {
|
||||
let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
|
||||
let payload = serde_json::json!({
|
||||
"https://api.openai.com/auth": {
|
||||
"chatgpt_user_id": user_id,
|
||||
"chatgpt_account_id": account_id,
|
||||
}
|
||||
});
|
||||
let payload = URL_SAFE_NO_PAD.encode(payload.to_string());
|
||||
format!("{header}.{payload}.signature")
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ use crate::agent::MailboxReceiver;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::agent_identity::AgentIdentityManager;
|
||||
use crate::agent_identity::RegisteredAgentTask;
|
||||
use crate::apps::render_apps_section;
|
||||
use crate::commit_attribution::commit_message_trailer_instruction;
|
||||
use crate::compact;
|
||||
@@ -1577,7 +1578,7 @@ impl Session {
|
||||
async fn fail_agent_identity_registration(self: &Arc<Self>, error: anyhow::Error) {
|
||||
warn!(error = %error, "agent identity registration failed");
|
||||
let message = format!(
|
||||
"Agent identity registration failed. Codex cannot continue while `features.use_agent_identity` is enabled: {error}"
|
||||
"Agent identity registration failed while `features.use_agent_identity` is enabled: {error}"
|
||||
);
|
||||
self.send_event_raw(Event {
|
||||
id: self.next_internal_sub_id(),
|
||||
@@ -1587,7 +1588,90 @@ impl Session {
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
handlers::shutdown(self, self.next_internal_sub_id()).await;
|
||||
}
|
||||
|
||||
async fn cached_agent_task_for_current_binding(&self) -> Option<RegisteredAgentTask> {
|
||||
let agent_task = {
|
||||
let state = self.state.lock().await;
|
||||
state.agent_task()
|
||||
}?;
|
||||
|
||||
if self
|
||||
.services
|
||||
.agent_identity_manager
|
||||
.task_matches_current_binding(&agent_task)
|
||||
.await
|
||||
{
|
||||
debug!(
|
||||
agent_runtime_id = %agent_task.agent_runtime_id,
|
||||
task_id = %agent_task.task_id,
|
||||
"reusing cached agent task"
|
||||
);
|
||||
return Some(agent_task);
|
||||
}
|
||||
|
||||
debug!(
|
||||
agent_runtime_id = %agent_task.agent_runtime_id,
|
||||
task_id = %agent_task.task_id,
|
||||
"discarding cached agent task because auth binding changed"
|
||||
);
|
||||
let mut state = self.state.lock().await;
|
||||
if state.agent_task().as_ref() == Some(&agent_task) {
|
||||
state.clear_agent_task();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn ensure_agent_task_registered(&self) -> anyhow::Result<Option<RegisteredAgentTask>> {
|
||||
if let Some(agent_task) = self.cached_agent_task_for_current_binding().await {
|
||||
return Ok(Some(agent_task));
|
||||
}
|
||||
|
||||
for _ in 0..2 {
|
||||
let Some(agent_task) = self.services.agent_identity_manager.register_task().await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if !self
|
||||
.services
|
||||
.agent_identity_manager
|
||||
.task_matches_current_binding(&agent_task)
|
||||
.await
|
||||
{
|
||||
debug!(
|
||||
agent_runtime_id = %agent_task.agent_runtime_id,
|
||||
task_id = %agent_task.task_id,
|
||||
"discarding newly registered agent task because auth binding changed"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
if let Some(existing_agent_task) = state.agent_task() {
|
||||
if existing_agent_task.has_same_binding(&agent_task) {
|
||||
return Ok(Some(existing_agent_task));
|
||||
}
|
||||
debug!(
|
||||
agent_runtime_id = %existing_agent_task.agent_runtime_id,
|
||||
task_id = %existing_agent_task.task_id,
|
||||
"replacing cached agent task because auth binding changed"
|
||||
);
|
||||
}
|
||||
state.set_agent_task(agent_task.clone());
|
||||
}
|
||||
|
||||
info!(
|
||||
thread_id = %self.conversation_id,
|
||||
agent_runtime_id = %agent_task.agent_runtime_id,
|
||||
task_id = %agent_task.task_id,
|
||||
"registered agent task for thread"
|
||||
);
|
||||
return Ok(Some(agent_task));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -5168,6 +5252,20 @@ pub(crate) async fn run_turn(
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
if let Err(error) = sess.ensure_agent_task_registered().await {
|
||||
warn!(error = %error, "agent task registration failed");
|
||||
sess.send_event(
|
||||
turn_context.as_ref(),
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message: format!(
|
||||
"Agent task registration failed. Please try again; Codex will attempt to register the task again on the next turn: {error}"
|
||||
),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
|
||||
if !skill_items.is_empty() {
|
||||
sess.record_conversation_items(&turn_context, &skill_items)
|
||||
|
||||
@@ -4182,7 +4182,7 @@ pub(crate) async fn make_session_and_context_with_rx() -> (
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fail_agent_identity_registration_emits_error_and_shutdown() {
|
||||
async fn fail_agent_identity_registration_emits_error_without_shutdown() {
|
||||
let (session, _turn_context, rx_event) = make_session_and_context_with_rx().await;
|
||||
|
||||
session
|
||||
@@ -4200,21 +4200,14 @@ async fn fail_agent_identity_registration_emits_error_and_shutdown() {
|
||||
}) => {
|
||||
assert_eq!(
|
||||
message,
|
||||
"Agent identity registration failed. Codex cannot continue while `features.use_agent_identity` is enabled: registration exploded".to_string()
|
||||
"Agent identity registration failed while `features.use_agent_identity` is enabled: registration exploded".to_string()
|
||||
);
|
||||
assert_eq!(codex_error_info, Some(CodexErrorInfo::Other));
|
||||
}
|
||||
other => panic!("expected error event, got {other:?}"),
|
||||
}
|
||||
|
||||
let shutdown_event = timeout(Duration::from_secs(1), rx_event.recv())
|
||||
.await
|
||||
.expect("shutdown event should arrive")
|
||||
.expect("shutdown event should be readable");
|
||||
match shutdown_event.msg {
|
||||
EventMsg::ShutdownComplete => {}
|
||||
other => panic!("expected shutdown event, got {other:?}"),
|
||||
}
|
||||
assert!(rx_event.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -6,6 +6,7 @@ use codex_sandboxing::policy_transforms::merge_permission_profiles;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::agent_identity::RegisteredAgentTask;
|
||||
use crate::codex::PreviousTurnSettings;
|
||||
use crate::codex::SessionConfiguration;
|
||||
use crate::context_manager::ContextManager;
|
||||
@@ -30,6 +31,7 @@ pub(crate) struct SessionState {
|
||||
previous_turn_settings: Option<PreviousTurnSettings>,
|
||||
/// Startup prewarmed session prepared during session initialization.
|
||||
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
|
||||
pub(crate) agent_task: Option<RegisteredAgentTask>,
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
|
||||
granted_permissions: Option<PermissionProfile>,
|
||||
@@ -49,6 +51,7 @@ impl SessionState {
|
||||
mcp_dependency_prompted: HashSet::new(),
|
||||
previous_turn_settings: None,
|
||||
startup_prewarm: None,
|
||||
agent_task: None,
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
granted_permissions: None,
|
||||
@@ -186,6 +189,18 @@ impl SessionState {
|
||||
self.startup_prewarm.take()
|
||||
}
|
||||
|
||||
pub(crate) fn agent_task(&self) -> Option<RegisteredAgentTask> {
|
||||
self.agent_task.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_agent_task(&mut self, agent_task: RegisteredAgentTask) {
|
||||
self.agent_task = Some(agent_task);
|
||||
}
|
||||
|
||||
pub(crate) fn clear_agent_task(&mut self) {
|
||||
self.agent_task = None;
|
||||
}
|
||||
|
||||
// Adds connector IDs to the active set and returns the merged selection.
|
||||
pub(crate) fn merge_connector_selection<I>(&mut self, connector_ids: I) -> HashSet<String>
|
||||
where
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use crate::agent_identity::RegisteredAgentTask;
|
||||
use crate::codex::make_session_configuration_for_tests;
|
||||
use codex_protocol::protocol::CreditsSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
@@ -33,6 +34,43 @@ async fn clear_connector_selection_removes_entries() {
|
||||
assert_eq!(state.get_connector_selection(), HashSet::new());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_agent_task_persists_plaintext_task_for_session_reuse() {
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
let agent_task = RegisteredAgentTask {
|
||||
binding_id: "chatgpt-account-account-123".to_string(),
|
||||
chatgpt_account_id: "account-123".to_string(),
|
||||
chatgpt_user_id: Some("user-123".to_string()),
|
||||
agent_runtime_id: "agent_123".to_string(),
|
||||
task_id: "task_123".to_string(),
|
||||
registered_at: "2026-03-23T12:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
state.set_agent_task(agent_task.clone());
|
||||
|
||||
assert_eq!(state.agent_task(), Some(agent_task));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn clear_agent_task_removes_cached_task() {
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
let agent_task = RegisteredAgentTask {
|
||||
binding_id: "chatgpt-account-account-123".to_string(),
|
||||
chatgpt_account_id: "account-123".to_string(),
|
||||
chatgpt_user_id: Some("user-123".to_string()),
|
||||
agent_runtime_id: "agent_123".to_string(),
|
||||
task_id: "task_123".to_string(),
|
||||
registered_at: "2026-03-23T12:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
state.set_agent_task(agent_task);
|
||||
state.clear_agent_task();
|
||||
|
||||
assert_eq!(state.agent_task(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_rate_limits_defaults_limit_id_to_codex_when_missing() {
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
|
||||
Reference in New Issue
Block a user