From a98623511ba433154ec811fc63091617f5945438 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 6 May 2026 10:48:37 +0200 Subject: [PATCH] feat: add `session_id` (#20437) ## Summary Related to https://openai.slack.com/archives/C095U48JNL9/p1777537279707449 TLDR: We update the meaning of session ids and thread ids: * thread_id stays as now * session_id become a shared id between every thread under a /root thread (i.e. every sub-agent share the same session id) This PR introduces an explicit `SessionId` and threads it through the protocol/client boundary so `session_id` and `thread_id` can diverge when they need to, while preserving compatibility for older serialized `session_configured` events. --------- Co-authored-by: Codex --- .../analytics/src/analytics_client_tests.rs | 2 + codex-rs/analytics/src/client_tests.rs | 2 + .../codex_app_server_protocol.schemas.json | 10 + .../codex_app_server_protocol.v2.schemas.json | 10 + .../schema/json/v2/ThreadResumeResponse.json | 5 + .../schema/json/v2/ThreadStartResponse.json | 5 + .../typescript/v2/ThreadResumeResponse.ts | 5 +- .../typescript/v2/ThreadStartResponse.ts | 5 +- .../src/protocol/common.rs | 2 + .../src/protocol/v2/thread.rs | 6 + .../request_processors/thread_lifecycle.rs | 2 + .../request_processors/thread_processor.rs | 2 + .../tests/suite/v2/thread_resume.rs | 5 +- .../app-server/tests/suite/v2/thread_start.rs | 2 + codex-rs/codex-api/src/endpoint/responses.rs | 14 +- codex-rs/codex-api/src/lib.rs | 2 +- codex-rs/codex-api/src/requests/headers.rs | 7 +- codex-rs/codex-api/tests/clients.rs | 13 +- codex-rs/core/src/agent/control.rs | 13 ++ codex-rs/core/src/client.rs | 40 ++-- codex-rs/core/src/client_tests.rs | 8 +- codex-rs/core/src/codex_thread.rs | 2 +- codex-rs/core/src/session/session.rs | 31 ++- codex-rs/core/src/session/tests.rs | 203 ++++++++++++++++-- codex-rs/core/tests/responses_headers.rs | 23 +- codex-rs/core/tests/suite/approvals.rs | 2 +- codex-rs/core/tests/suite/client.rs | 37 ++-- .../core/tests/suite/client_websockets.rs | 25 ++- codex-rs/core/tests/suite/compact_remote.rs | 10 + codex-rs/core/tests/suite/fork_thread.rs | 2 +- codex-rs/core/tests/suite/items.rs | 10 +- codex-rs/core/tests/suite/model_switching.rs | 6 +- codex-rs/core/tests/suite/search_tool.rs | 2 +- codex-rs/core/tests/suite/sqlite_state.rs | 10 +- .../tests/suite/subagent_notifications.rs | 2 +- .../src/event_processor_with_jsonl_output.rs | 2 +- codex-rs/exec/src/lib.rs | 16 +- codex-rs/exec/src/lib_tests.rs | 9 + .../tests/event_processor_with_json_output.rs | 7 +- codex-rs/mcp-server/src/outgoing_message.rs | 13 +- codex-rs/memories/write/src/runtime.rs | 2 + codex-rs/memories/write/src/startup_tests.rs | 4 +- codex-rs/protocol/src/lib.rs | 2 + codex-rs/protocol/src/protocol.rs | 18 +- codex-rs/protocol/src/session_id.rs | 126 +++++++++++ codex-rs/protocol/src/thread_id.rs | 2 +- codex-rs/tui/src/app_server_session.rs | 1 + .../chatwidget/tests/composer_submission.rs | 36 ++-- .../tui/src/chatwidget/tests/exec_flow.rs | 4 +- .../src/chatwidget/tests/history_replay.rs | 20 +- .../tui/src/chatwidget/tests/plan_mode.rs | 4 +- 51 files changed, 638 insertions(+), 153 deletions(-) create mode 100644 codex-rs/protocol/src/session_id.rs diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 823dcace91..7194e32443 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -154,6 +154,7 @@ fn sample_thread_start_response( model: &str, ) -> ClientResponsePayload { ClientResponsePayload::ThreadStart(ThreadStartResponse { + session_id: format!("session-{thread_id}"), thread: sample_thread_with_metadata( thread_id, ephemeral, @@ -215,6 +216,7 @@ fn sample_thread_resume_response_with_source( thread_source: Option, ) -> ClientResponsePayload { ClientResponsePayload::ThreadResume(ThreadResumeResponse { + session_id: format!("session-{thread_id}"), thread: sample_thread_with_metadata(thread_id, ephemeral, source, thread_source), model: model.to_string(), model_provider: "openai".to_string(), diff --git a/codex-rs/analytics/src/client_tests.rs b/codex-rs/analytics/src/client_tests.rs index c36b5cf9a7..42d508602a 100644 --- a/codex-rs/analytics/src/client_tests.rs +++ b/codex-rs/analytics/src/client_tests.rs @@ -102,6 +102,7 @@ fn sample_permission_profile() -> AppServerPermissionProfile { fn sample_thread_start_response() -> ClientResponsePayload { ClientResponsePayload::ThreadStart(ThreadStartResponse { + session_id: "session-1".to_string(), thread: sample_thread("thread-1"), model: "gpt-5".to_string(), model_provider: "openai".to_string(), @@ -119,6 +120,7 @@ fn sample_thread_start_response() -> ClientResponsePayload { fn sample_thread_resume_response() -> ClientResponsePayload { ClientResponsePayload::ThreadResume(ThreadResumeResponse { + session_id: "session-2".to_string(), thread: sample_thread("thread-2"), model: "gpt-5".to_string(), model_provider: "openai".to_string(), diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 881380bf19..d6f184a6db 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17171,6 +17171,11 @@ } ] }, + "sessionId": { + "default": "", + "description": "Session id shared by threads that belong to the same session tree.", + "type": "string" + }, "thread": { "$ref": "#/definitions/v2/Thread" } @@ -17494,6 +17499,11 @@ } ] }, + "sessionId": { + "default": "", + "description": "Session id shared by threads that belong to the same session tree.", + "type": "string" + }, "thread": { "$ref": "#/definitions/v2/Thread" } diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 109294746d..1f252cbd23 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15057,6 +15057,11 @@ } ] }, + "sessionId": { + "default": "", + "description": "Session id shared by threads that belong to the same session tree.", + "type": "string" + }, "thread": { "$ref": "#/definitions/Thread" } @@ -15380,6 +15385,11 @@ } ] }, + "sessionId": { + "default": "", + "description": "Session id shared by threads that belong to the same session tree.", + "type": "string" + }, "thread": { "$ref": "#/definitions/Thread" } diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 81eff83400..b00fd513e7 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -2619,6 +2619,11 @@ } ] }, + "sessionId": { + "default": "", + "description": "Session id shared by threads that belong to the same session tree.", + "type": "string" + }, "thread": { "$ref": "#/definitions/Thread" } diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index b2d5cbfa08..7451d08977 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -2619,6 +2619,11 @@ } ] }, + "sessionId": { + "default": "", + "description": "Session id shared by threads that belong to the same session tree.", + "type": "string" + }, "thread": { "$ref": "#/definitions/Thread" } diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeResponse.ts index f7627c07ae..cc4c2440f7 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeResponse.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeResponse.ts @@ -9,7 +9,10 @@ import type { AskForApproval } from "./AskForApproval"; import type { SandboxPolicy } from "./SandboxPolicy"; import type { Thread } from "./Thread"; -export type ThreadResumeResponse = {thread: Thread, model: string, modelProvider: string, serviceTier: ServiceTier | null, cwd: AbsolutePathBuf, /** +export type ThreadResumeResponse = {/** + * Session id shared by threads that belong to the same session tree. + */ +sessionId: string, thread: Thread, model: string, modelProvider: string, serviceTier: ServiceTier | null, cwd: AbsolutePathBuf, /** * Instruction source files currently loaded for this thread. */ instructionSources: Array, approvalPolicy: AskForApproval, /** diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartResponse.ts index ce28a4a1d7..962ed2437e 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartResponse.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartResponse.ts @@ -9,7 +9,10 @@ import type { AskForApproval } from "./AskForApproval"; import type { SandboxPolicy } from "./SandboxPolicy"; import type { Thread } from "./Thread"; -export type ThreadStartResponse = {thread: Thread, model: string, modelProvider: string, serviceTier: ServiceTier | null, cwd: AbsolutePathBuf, /** +export type ThreadStartResponse = {/** + * Session id shared by threads that belong to the same session tree. + */ +sessionId: string, thread: Thread, model: string, modelProvider: string, serviceTier: ServiceTier | null, cwd: AbsolutePathBuf, /** * Instruction source files currently loaded for this thread. */ instructionSources: Array, approvalPolicy: AskForApproval, /** diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index ae687a21b2..8fd267d014 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -2176,6 +2176,7 @@ mod tests { let response = ClientResponse::ThreadStart { request_id: RequestId::Integer(7), response: v2::ThreadStartResponse { + session_id: "67e55044-10b1-426f-9247-bb680e5fe0c7".to_string(), thread: v2::Thread { id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), forked_from_id: None, @@ -2217,6 +2218,7 @@ mod tests { "method": "thread/start", "id": 7, "response": { + "sessionId": "67e55044-10b1-426f-9247-bb680e5fe0c7", "thread": { "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "forkedFromId": null, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 578ef9193f..d8250987a6 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -189,6 +189,9 @@ pub struct MockExperimentalMethodResponse { #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub struct ThreadStartResponse { + /// Session id shared by threads that belong to the same session tree. + #[serde(default)] + pub session_id: String, pub thread: Thread, pub model: String, pub model_provider: String, @@ -304,6 +307,9 @@ pub struct ThreadResumeParams { #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub struct ThreadResumeResponse { + /// Session id shared by threads that belong to the same session tree. + #[serde(default)] + pub session_id: String, pub thread: Thread, pub model: String, pub model_provider: String, diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 4a677d91ab..bd93893b79 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -598,8 +598,10 @@ pub(super) async fn handle_pending_thread_resume_request( let sandbox = thread_response_sandbox_policy(&permission_profile, cwd.as_path()); let active_permission_profile = thread_response_active_permission_profile(active_permission_profile); + let session_id = conversation.session_configured().session_id.to_string(); let response = ThreadResumeResponse { + session_id, thread, model, model_provider: model_provider_id, diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 147c4cd721..5563bd56e9 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1148,6 +1148,7 @@ impl ThreadRequestProcessor { thread_response_active_permission_profile(config_snapshot.active_permission_profile); let response = ThreadStartResponse { + session_id: session_configured.session_id.to_string(), thread: thread.clone(), model: config_snapshot.model, model_provider: config_snapshot.model_provider_id, @@ -2476,6 +2477,7 @@ impl ThreadRequestProcessor { ); let response = ThreadResumeResponse { + session_id: session_configured.session_id.to_string(), thread, model: session_configured.model, model_provider: session_configured.model_provider_id, diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 4014e6f975..175da83016 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -270,7 +270,10 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; - let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + let ThreadResumeResponse { + session_id, thread, .. + } = to_response::(resume_resp)?; + assert!(!session_id.is_empty(), "session id should not be empty"); assert_eq!(thread.thread_source, Some(ThreadSource::User)); let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index 68fc818ec2..e300e562d9 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -121,10 +121,12 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { .await??; let resp_result = resp.result.clone(); let ThreadStartResponse { + session_id, thread, model_provider, .. } = to_response::(resp)?; + assert!(!session_id.is_empty(), "session id should not be empty"); assert!(!thread.id.is_empty(), "thread id should not be empty"); assert!( thread.preview.is_empty(), diff --git a/codex-rs/codex-api/src/endpoint/responses.rs b/codex-rs/codex-api/src/endpoint/responses.rs index 17b478d1fd..cc1be2846a 100644 --- a/codex-rs/codex-api/src/endpoint/responses.rs +++ b/codex-rs/codex-api/src/endpoint/responses.rs @@ -6,7 +6,7 @@ use crate::error::ApiError; use crate::provider::Provider; use crate::requests::Compression; use crate::requests::attach_item_ids; -use crate::requests::headers::build_conversation_headers; +use crate::requests::headers::build_session_headers; use crate::requests::headers::insert_header; use crate::requests::headers::subagent_header; use crate::sse::spawn_response_stream; @@ -30,7 +30,8 @@ pub struct ResponsesClient { #[derive(Default)] pub struct ResponsesOptions { - pub conversation_id: Option, + pub session_id: Option, + pub thread_id: Option, pub session_source: Option, pub extra_headers: HeaderMap, pub compression: Compression, @@ -72,7 +73,8 @@ impl ResponsesClient { options: ResponsesOptions, ) -> Result { let ResponsesOptions { - conversation_id, + session_id, + thread_id, session_source, extra_headers, compression, @@ -86,10 +88,10 @@ impl ResponsesClient { } let mut headers = extra_headers; - if let Some(ref conv_id) = conversation_id { - insert_header(&mut headers, "x-client-request-id", conv_id); + if let Some(ref thread_id) = thread_id { + insert_header(&mut headers, "x-client-request-id", thread_id); } - headers.extend(build_conversation_headers(conversation_id)); + headers.extend(build_session_headers(session_id, thread_id)); if let Some(subagent) = subagent_header(&session_source) { insert_header(&mut headers, "x-openai-subagent", &subagent); } diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index 0b8aee266b..6913c6e8ca 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -10,7 +10,7 @@ pub(crate) mod requests; pub(crate) mod sse; pub(crate) mod telemetry; -pub use crate::requests::headers::build_conversation_headers; +pub use crate::requests::headers::build_session_headers; pub use codex_client::RequestTelemetry; pub use codex_client::ReqwestTransport; pub use codex_client::TransportError; diff --git a/codex-rs/codex-api/src/requests/headers.rs b/codex-rs/codex-api/src/requests/headers.rs index d1ab834109..d91d2a2bf1 100644 --- a/codex-rs/codex-api/src/requests/headers.rs +++ b/codex-rs/codex-api/src/requests/headers.rs @@ -2,11 +2,14 @@ use codex_protocol::protocol::SessionSource; use http::HeaderMap; use http::HeaderValue; -pub fn build_conversation_headers(conversation_id: Option) -> HeaderMap { +pub fn build_session_headers(session_id: Option, thread_id: Option) -> HeaderMap { let mut headers = HeaderMap::new(); - if let Some(id) = conversation_id { + if let Some(id) = session_id { insert_header(&mut headers, "session_id", &id); } + if let Some(id) = thread_id { + insert_header(&mut headers, "thread_id", &id); + } headers } diff --git a/codex-rs/codex-api/tests/clients.rs b/codex-rs/codex-api/tests/clients.rs index 218a99f9b2..a2a29ba16d 100644 --- a/codex-rs/codex-api/tests/clients.rs +++ b/codex-rs/codex-api/tests/clients.rs @@ -444,7 +444,8 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> { .stream_request( request, ResponsesOptions { - conversation_id: Some("sess_123".into()), + session_id: Some("sess_123".into()), + thread_id: Some("thread_123".into()), session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)), extra_headers, compression: Compression::None, @@ -461,6 +462,16 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> { req.headers.get("session_id").and_then(|v| v.to_str().ok()), Some("sess_123") ); + assert_eq!( + req.headers.get("thread_id").and_then(|v| v.to_str().ok()), + Some("thread_123") + ); + assert_eq!( + req.headers + .get("x-client-request-id") + .and_then(|v| v.to_str().ok()), + Some("thread_123") + ); assert_eq!( req.headers .get("x-openai-subagent") diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index e5967a6ed2..8c8114324c 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -14,6 +14,7 @@ use crate::thread_manager::ThreadManagerState; use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns; use codex_features::Feature; use codex_protocol::AgentPath; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; @@ -132,6 +133,9 @@ fn keep_forked_rollout_item(item: &RolloutItem) -> bool { /// which keeps the registry scoped to that root thread rather than the entire `ThreadManager`. #[derive(Clone, Default)] pub(crate) struct AgentControl { + /// ID shared by the whole agent control session. This means every sub-agents from a common + /// root share the same session ID. + session_id: SessionId, /// Weak handle back to the global thread registry/state. /// This is `Weak` to avoid reference cycles and shadow persistence of the form /// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`. @@ -148,6 +152,15 @@ impl AgentControl { } } + pub(crate) fn with_session_id(mut self, session_id: SessionId) -> Self { + self.session_id = session_id; + self + } + + pub(crate) fn session_id(&self) -> SessionId { + self.session_id + } + /// Spawn a new agent thread and submit the initial prompt. #[cfg(test)] pub(crate) async fn spawn_agent( diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index d11fda47dc..00feb81a9f 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -58,7 +58,7 @@ use codex_api::SseTelemetry; use codex_api::TransportError; use codex_api::WebsocketTelemetry; use codex_api::auth_header_telemetry; -use codex_api::build_conversation_headers; +use codex_api::build_session_headers; use codex_api::create_text_param_for_request; use codex_api::response_create_client_metadata; use codex_app_server_protocol::AuthMode; @@ -70,6 +70,7 @@ use codex_login::default_client::build_reqwest_client; use codex_otel::SessionTelemetry; use codex_otel::current_span_w3c_trace_context; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::config_types::ServiceTier; @@ -152,7 +153,8 @@ pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration = /// configuration is per turn and is passed explicitly to streaming/unary methods. #[derive(Debug)] struct ModelClientState { - conversation_id: ThreadId, + session_id: SessionId, + thread_id: ThreadId, window_generation: AtomicU64, installation_id: String, provider: SharedModelProvider, @@ -190,7 +192,7 @@ impl RequestRouteTelemetry { /// A session-scoped client for model-provider API calls. /// /// This holds configuration and state that should be shared across turns within a Codex session -/// (auth, provider selection, conversation id, and transport fallback state). +/// (auth, provider selection, thread id, and transport fallback state). /// /// WebSocket fallback is session-scoped: once a turn activates the HTTP fallback, subsequent turns /// will also use HTTP for the remainder of the session. @@ -297,7 +299,8 @@ impl ModelClient { /// are passed to [`ModelClientSession::stream`] (and other turn-scoped methods) explicitly. pub fn new( auth_manager: Option>, - conversation_id: ThreadId, + session_id: SessionId, + thread_id: ThreadId, installation_id: String, provider_info: ModelProviderInfo, session_source: SessionSource, @@ -315,7 +318,8 @@ impl ModelClient { collect_auth_env_telemetry(model_provider.info(), codex_api_key_env_enabled); Self { state: Arc::new(ModelClientState { - conversation_id, + session_id, + thread_id, window_generation: AtomicU64::new(0), installation_id, provider: model_provider, @@ -360,9 +364,9 @@ impl ModelClient { } fn current_window_id(&self) -> String { - let conversation_id = self.state.conversation_id; + let thread_id = self.state.thread_id; let window_generation = self.state.window_generation.load(Ordering::Relaxed); - format!("{conversation_id}:{window_generation}") + format!("{thread_id}:{window_generation}") } fn take_cached_websocket_session(&self) -> WebsocketSession { @@ -475,9 +479,10 @@ impl ModelClient { /*turn_metadata_header*/ None, )); extra_headers.extend(self.build_responses_identity_headers()); - extra_headers.extend(build_conversation_headers(Some( - self.state.conversation_id.to_string(), - ))); + extra_headers.extend(build_session_headers( + Some(self.state.session_id.to_string()), + Some(self.state.thread_id.to_string()), + )); let trace_attempt = compaction_trace.start_attempt(&payload); let result = client .compact_input(&payload, extra_headers) @@ -696,7 +701,7 @@ impl ModelClient { &prompt.output_schema, prompt.output_schema_strict, ); - let prompt_cache_key = Some(self.state.conversation_id.to_string()); + let prompt_cache_key = Some(self.state.thread_id.to_string()); let request = ResponsesApiRequest { model: model_info.slug.clone(), instructions: instructions.clone(), @@ -850,16 +855,17 @@ impl ModelClient { turn_metadata_header: Option<&str>, ) -> ApiHeaderMap { let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header); - let conversation_id = self.state.conversation_id.to_string(); + let session_id = self.state.session_id.to_string(); + let thread_id = self.state.thread_id.to_string(); let mut headers = build_responses_headers( self.state.beta_features_header.as_deref(), turn_state, turn_metadata_header.as_ref(), ); - if let Ok(header_value) = HeaderValue::from_str(&conversation_id) { + if let Ok(header_value) = HeaderValue::from_str(&thread_id) { headers.insert("x-client-request-id", header_value); } - headers.extend(build_conversation_headers(Some(conversation_id))); + headers.extend(build_session_headers(Some(session_id), Some(thread_id))); headers.extend(self.build_responses_identity_headers()); headers.insert( OPENAI_BETA_HEADER, @@ -903,9 +909,11 @@ impl ModelClientSession { compression: Compression, ) -> ApiResponsesOptions { let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header); - let conversation_id = self.client.state.conversation_id.to_string(); + let session_id = self.client.state.session_id.to_string(); + let thread_id = self.client.state.thread_id.to_string(); ApiResponsesOptions { - conversation_id: Some(conversation_id), + session_id: Some(session_id), + thread_id: Some(thread_id), session_source: Some(self.client.state.session_source.clone()), extra_headers: { let mut headers = build_responses_headers( diff --git a/codex-rs/core/src/client_tests.rs b/codex-rs/core/src/client_tests.rs index 13c5603baa..2ba65d7c45 100644 --- a/codex-rs/core/src/client_tests.rs +++ b/codex-rs/core/src/client_tests.rs @@ -52,9 +52,11 @@ use tracing_subscriber::util::SubscriberInitExt; fn test_model_client(session_source: SessionSource) -> ModelClient { let provider = create_oss_provider_with_base_url("https://example.com/v1", WireApi::Responses); + let thread_id = ThreadId::new(); ModelClient::new( /*auth_manager*/ None, - ThreadId::new(), + thread_id.into(), + thread_id, /*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(), provider, session_source, @@ -270,7 +272,7 @@ fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() { client.advance_window_generation(); let client_metadata = client.build_ws_client_metadata(Some(r#"{"turn_id":"turn-123"}"#)); - let conversation_id = client.state.conversation_id; + let thread_id = client.state.thread_id; assert_eq!( client_metadata, std::collections::HashMap::from([ @@ -280,7 +282,7 @@ fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() { ), ( X_CODEX_WINDOW_ID_HEADER.to_string(), - format!("{conversation_id}:1"), + format!("{thread_id}:1"), ), ( X_OPENAI_SUBAGENT_HEADER.to_string(), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 2f74bd48b4..508e8c2fac 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -391,7 +391,7 @@ impl CodexThread { self.rollout_path.clone() } - pub(crate) fn session_configured(&self) -> SessionConfiguredEvent { + pub fn session_configured(&self) -> SessionConfiguredEvent { self.session_configured.clone() } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 92a27b284f..3485713bc7 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -1,5 +1,6 @@ use super::*; use crate::goals::GoalRuntimeState; +use codex_protocol::SessionId; use codex_protocol::permissions::FileSystemPath; use codex_protocol::permissions::FileSystemSpecialPath; use codex_protocol::protocol::ThreadSource; @@ -363,7 +364,7 @@ impl Session { } else { ThreadEventPersistenceMode::Limited }; - let conversation_id = match &initial_history { + let thread_id = match &initial_history { InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => { ThreadId::default() } @@ -394,7 +395,7 @@ impl Session { LiveThread::create( Arc::clone(&thread_store), CreateThreadParams { - thread_id: conversation_id, + thread_id, forked_from_id, source: session_source, thread_source: session_configuration.thread_source, @@ -508,7 +509,7 @@ impl Session { let trace_task_name = (!trace_agent_path.is_root()).then(|| trace_agent_path.name().to_string()); let trace_metadata = ThreadStartedTraceMetadata { - thread_id: conversation_id.to_string(), + thread_id: thread_id.to_string(), agent_path: trace_agent_path.to_string(), task_name: trace_task_name, nickname: session_configuration.session_source.get_nickname(), @@ -599,7 +600,7 @@ impl Session { auth_manager.codex_api_key_env_enabled(), ); let mut session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, session_model.as_str(), session_model.as_str(), account_id.clone(), @@ -615,7 +616,7 @@ impl Session { session_telemetry = session_telemetry.with_metrics_service_name(service_name); } let network_proxy_audit_metadata = NetworkProxyAuditMetadata { - conversation_id: Some(conversation_id.to_string()), + conversation_id: Some(thread_id.to_string()), app_version: Some(env!("CARGO_PKG_VERSION").to_string()), user_account_id: account_id, auth_mode: auth_mode.map(|mode| mode.to_string()), @@ -685,7 +686,7 @@ impl Session { } else { ShellSnapshot::start_snapshotting( config.codex_home.clone(), - conversation_id, + thread_id, session_configuration.cwd.clone(), &mut default_shell, session_telemetry.clone(), @@ -698,7 +699,7 @@ impl Session { tx }; let thread_name = - thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, conversation_id) + thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, thread_id) .instrument(info_span!( "session_init.thread_name_lookup", otel.name = "session_init.thread_name_lookup", @@ -706,7 +707,7 @@ impl Session { .await; session_configuration.thread_name = thread_name.clone(); validate_config_lock_if_configured(&session_configuration).await?; - export_config_lock_if_configured(&session_configuration, conversation_id).await?; + export_config_lock_if_configured(&session_configuration, thread_id).await?; let state = SessionState::new(session_configuration.clone()); let managed_network_requirements_configured = config .config_layer_stack @@ -786,6 +787,12 @@ impl Session { config.analytics_enabled, ) }); + let session_id = if session_configuration.session_source.is_non_root_agent() { + agent_control.session_id() + } else { + SessionId::from(thread_id) + }; + let agent_control = agent_control.with_session_id(session_id); let services = SessionServices { // Initialize the MCP connection manager with an uninitialized // instance. It will be replaced with one created via @@ -830,7 +837,8 @@ impl Session { thread_store: Arc::clone(&thread_store), model_client: ModelClient::new( Some(Arc::clone(&auth_manager)), - conversation_id, + session_id, + thread_id, installation_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), @@ -850,7 +858,7 @@ impl Session { let (mailbox, mailbox_rx) = Mailbox::new(); let sess = Arc::new(Session { - conversation_id, + conversation_id: thread_id, tx_event: tx_event.clone(), agent_status, out_of_band_elicitation_paused, @@ -878,7 +886,8 @@ impl Session { let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id: conversation_id, + session_id, + thread_id, forked_from_id, thread_source: session_configuration.thread_source, thread_name: session_configuration.thread_name.clone(), diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 60c7278ea0..3384a09598 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -30,6 +30,7 @@ use codex_models_manager::model_info; use codex_models_manager::test_support::construct_model_info_offline_for_tests; use codex_models_manager::test_support::get_model_offline_for_tests; use codex_protocol::AgentPath; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::account::PlanType as AccountPlanType; use codex_protocol::config_types::ServiceTier; @@ -393,10 +394,12 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted } fn test_model_client_session() -> crate::client::ModelClientSession { + let thread_id = ThreadId::try_from("00000000-0000-4000-8000-000000000001") + .expect("test thread id should be valid"); crate::client::ModelClient::new( /*auth_manager*/ None, - ThreadId::try_from("00000000-0000-4000-8000-000000000001") - .expect("test thread id should be valid"), + thread_id.into(), + thread_id, /*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(), ModelProviderInfo::create_openai_provider(/* base_url */ /*base_url*/ None), codex_protocol::protocol::SessionSource::Exec, @@ -3605,7 +3608,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let codex_home = tempfile::tempdir().expect("create temp dir"); let config = build_test_config(codex_home.path()).await; let config = Arc::new(config); - let conversation_id = ThreadId::default(); + let thread_id = ThreadId::default(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); let models_manager = models_manager_with_provider( config.codex_home.to_path_buf(), @@ -3671,7 +3674,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { &per_turn_config.to_models_manager_config(), ); let session_telemetry = session_telemetry( - conversation_id, + thread_id, config.as_ref(), &model_info, session_configuration.session_source.clone(), @@ -3743,7 +3746,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { )), model_client: ModelClient::new( Some(auth_manager.clone()), - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(), session_configuration.provider.clone(), session_configuration.session_source.clone(), @@ -3772,7 +3776,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ); let turn_environments = turn_environments_for_tests(&environment, &session_configuration.cwd); let turn_context = Session::make_turn_context( - conversation_id, + thread_id, Some(Arc::clone(&auth_manager)), &session_telemetry, session_configuration.provider.clone(), @@ -3793,7 +3797,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let (mailbox, mailbox_rx) = crate::agent::Mailbox::new(); let session = Session { - conversation_id, + conversation_id: thread_id, tx_event, agent_status: agent_status_tx, out_of_band_elicitation_paused: watch::channel(false).0, @@ -3931,6 +3935,178 @@ async fn make_session_with_config_and_rx( Ok((session, rx_event, codex_home)) } +async fn make_session_with_history_source_and_agent_control_and_rx( + initial_history: InitialHistory, + session_source: SessionSource, + agent_control: AgentControl, +) -> anyhow::Result<(Arc, async_channel::Receiver)> { + let codex_home = tempfile::tempdir().expect("create temp dir"); + let mut config = build_test_config(codex_home.path()).await; + config.ephemeral = true; + let config = Arc::new(config); + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); + let models_manager = models_manager_with_provider( + config.codex_home.to_path_buf(), + auth_manager.clone(), + config.model_provider.clone(), + ); + let model = get_model_offline_for_tests(config.model.as_deref()); + let model_info = + construct_model_info_offline_for_tests(model.as_str(), &config.to_models_manager_config()); + let collaboration_mode = CollaborationMode { + mode: ModeKind::Default, + settings: Settings { + model, + reasoning_effort: config.model_reasoning_effort, + developer_instructions: None, + }, + }; + let default_environments = vec![TurnEnvironmentSelection { + environment_id: codex_exec_server::LOCAL_ENVIRONMENT_ID.to_string(), + cwd: config.cwd.clone(), + }]; + let session_configuration = SessionConfiguration { + provider: config.model_provider.clone(), + collaboration_mode, + model_reasoning_summary: config.model_reasoning_summary, + developer_instructions: config.developer_instructions.clone(), + user_instructions: config.user_instructions.clone(), + service_tier: None, + personality: config.personality, + base_instructions: config + .base_instructions + .clone() + .unwrap_or_else(|| model_info.get_model_instructions(config.personality)), + compact_prompt: config.compact_prompt.clone(), + approval_policy: config.permissions.approval_policy.clone(), + approvals_reviewer: config.approvals_reviewer, + permission_profile: config.permissions.permission_profile.clone(), + active_permission_profile: config.permissions.active_permission_profile(), + windows_sandbox_level: WindowsSandboxLevel::from_config(&config), + cwd: config.cwd.clone(), + codex_home: config.codex_home.clone(), + thread_name: None, + environments: default_environments, + original_config_do_not_use: Arc::clone(&config), + metrics_service_name: None, + app_server_client_name: None, + app_server_client_version: None, + session_source: session_source.clone(), + thread_source: None, + dynamic_tools: Vec::new(), + persist_extended_history: false, + inherited_shell_snapshot: None, + user_shell_override: None, + }; + + let (tx_event, rx_event) = async_channel::unbounded(); + let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit); + let plugins_manager = Arc::new(PluginsManager::new(config.codex_home.to_path_buf())); + let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager))); + let skills_manager = Arc::new(SkillsManager::new( + config.codex_home.clone(), + /*bundled_skills_enabled*/ true, + )); + + let session = Session::new( + session_configuration, + Arc::clone(&config), + auth_manager, + models_manager, + Arc::new(ExecPolicyManager::default()), + tx_event, + agent_status_tx, + initial_history, + session_source, + skills_manager, + plugins_manager, + mcp_manager, + Arc::new(SkillsWatcher::noop()), + agent_control, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + /*analytics_events_client*/ None, + /*state_db*/ None, + Arc::new(codex_thread_store::LocalThreadStore::new( + codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + )), + codex_rollout_trace::ThreadTraceContext::disabled(), + ) + .await?; + + Ok((session, rx_event)) +} + +#[tokio::test] +async fn resumed_root_session_uses_thread_id_as_session_id() { + let thread_id = ThreadId::new(); + let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx( + InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: Vec::new(), + rollout_path: None, + }), + SessionSource::Exec, + AgentControl::default(), + ) + .await + .expect("resume should succeed"); + + assert_eq!( + session.services.agent_control.session_id(), + SessionId::from(thread_id) + ); + + let event = rx_event.recv().await.expect("session configured event"); + let EventMsg::SessionConfigured(event) = event.msg else { + panic!("expected session configured event"); + }; + assert_eq!(event.session_id, SessionId::from(thread_id)); + assert_eq!(event.thread_id, thread_id); +} + +#[tokio::test] +async fn resumed_subagent_session_keeps_inherited_session_id() { + let parent_thread_id = ThreadId::new(); + let parent_session_id = SessionId::from(parent_thread_id); + let thread_id = ThreadId::new(); + let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: None, + }); + let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx( + InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: Vec::new(), + rollout_path: None, + }), + session_source, + AgentControl::default().with_session_id(parent_session_id), + ) + .await + .expect("resume should succeed"); + + assert_eq!( + session.services.agent_control.session_id(), + parent_session_id + ); + + let event = rx_event.recv().await.expect("session configured event"); + let EventMsg::SessionConfigured(event) = event.msg else { + panic!("expected session configured event"); + }; + assert_eq!(event.session_id, parent_session_id); + assert_eq!(event.thread_id, thread_id); +} + #[tokio::test] async fn notify_request_permissions_response_ignores_unmatched_call_id() { let (session, _turn_context) = make_session_and_context().await; @@ -5117,7 +5293,7 @@ where let mut config = build_test_config(codex_home.as_path()).await; configure_config(&mut config); let config = Arc::new(config); - let conversation_id = ThreadId::default(); + let thread_id = ThreadId::default(); let auth_manager = AuthManager::from_auth_for_testing(auth); let models_manager = models_manager_with_provider( config.codex_home.to_path_buf(), @@ -5183,7 +5359,7 @@ where &per_turn_config.to_models_manager_config(), ); let session_telemetry = session_telemetry( - conversation_id, + thread_id, config.as_ref(), &model_info, session_configuration.session_source.clone(), @@ -5256,7 +5432,8 @@ where )), model_client: ModelClient::new( Some(Arc::clone(&auth_manager)), - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(), session_configuration.provider.clone(), session_configuration.session_source.clone(), @@ -5285,7 +5462,7 @@ where ); let turn_environments = turn_environments_for_tests(&environment, &session_configuration.cwd); let turn_context = Arc::new(Session::make_turn_context( - conversation_id, + thread_id, Some(Arc::clone(&auth_manager)), &session_telemetry, session_configuration.provider.clone(), @@ -5306,7 +5483,7 @@ where let (mailbox, mailbox_rx) = crate::agent::Mailbox::new(); let session = Arc::new(Session { - conversation_id, + conversation_id: thread_id, tx_event, agent_status: agent_status_tx, out_of_band_elicitation_paused: watch::channel(false).0, @@ -7767,7 +7944,7 @@ async fn completed_goal_accounts_current_turn_tokens_before_tool_response() -> a ) .await?; let persisted_goal = state_db - .get_thread_goal(test.session_configured.session_id) + .get_thread_goal(test.session_configured.thread_id) .await? .expect("goal should be persisted"); assert_eq!( diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 893d38ed83..af99790a1f 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -80,13 +80,13 @@ async fn responses_stream_includes_subagent_header_on_review() { config.model = Some(model.clone()); let config = Arc::new(config); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let auth_mode = TelemetryAuthMode::Chatgpt; let session_source = SessionSource::SubAgent(SubAgentSource::Review); let model_info = codex_core::test_support::construct_model_info_offline(model.as_str(), &config); let session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, model.as_str(), model_info.slug.as_str(), /*account_id*/ None, @@ -100,7 +100,8 @@ async fn responses_stream_includes_subagent_header_on_review() { let client = ModelClient::new( /*auth_manager*/ None, - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ TEST_INSTALLATION_ID.to_string(), provider.clone(), session_source, @@ -141,7 +142,7 @@ async fn responses_stream_includes_subagent_header_on_review() { } let request = request_recorder.single_request(); - let expected_window_id = format!("{conversation_id}:0"); + let expected_window_id = format!("{thread_id}:0"); assert_eq!( request.header("x-openai-subagent").as_deref(), Some("review") @@ -205,14 +206,14 @@ async fn responses_stream_includes_subagent_header_on_other() { config.model = Some(model.clone()); let config = Arc::new(config); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let auth_mode = TelemetryAuthMode::Chatgpt; let session_source = SessionSource::SubAgent(SubAgentSource::Other("my-task".to_string())); let model_info = codex_core::test_support::construct_model_info_offline(model.as_str(), &config); let session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, model.as_str(), model_info.slug.as_str(), /*account_id*/ None, @@ -226,7 +227,8 @@ async fn responses_stream_includes_subagent_header_on_other() { let client = ModelClient::new( /*auth_manager*/ None, - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ TEST_INSTALLATION_ID.to_string(), provider.clone(), session_source, @@ -317,7 +319,7 @@ async fn responses_respects_model_info_overrides_from_config() { let model = config.model.clone().expect("model configured"); let config = Arc::new(config); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let auth_mode = codex_core::test_support::auth_manager_from_auth(CodexAuth::from_api_key("Test API Key")) .auth_mode() @@ -327,7 +329,7 @@ async fn responses_respects_model_info_overrides_from_config() { let model_info = codex_core::test_support::construct_model_info_offline(model.as_str(), &config); let session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, model.as_str(), model_info.slug.as_str(), /*account_id*/ None, @@ -341,7 +343,8 @@ async fn responses_respects_model_info_overrides_from_config() { let client = ModelClient::new( /*auth_manager*/ None, - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ TEST_INSTALLATION_ID.to_string(), provider.clone(), session_source, diff --git a/codex-rs/core/tests/suite/approvals.rs b/codex-rs/core/tests/suite/approvals.rs index 96cc1f3a99..2538c850e3 100644 --- a/codex-rs/core/tests/suite/approvals.rs +++ b/codex-rs/core/tests/suite/approvals.rs @@ -807,7 +807,7 @@ async fn wait_for_spawned_thread(test: &TestCodex) -> Result> { let ids = test.thread_manager.list_thread_ids().await; if let Some(thread_id) = ids .iter() - .find(|id| **id != test.session_configured.session_id) + .find(|id| **id != test.session_configured.thread_id) { return test .thread_manager diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index dafde4ccea..6f1ab3d672 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -726,7 +726,7 @@ async fn resume_replays_image_tool_outputs_with_detail() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn includes_conversation_id_and_model_headers_in_request() { +async fn includes_session_id_thread_id_and_model_headers_in_request() { skip_if_no_network!(); // Mock server @@ -744,7 +744,8 @@ async fn includes_conversation_id_and_model_headers_in_request() { .await .expect("create new conversation"); let codex = test.codex.clone(); - let session_id = test.session_configured.session_id; + let expected_session_id = test.session_configured.session_id; + let expected_thread_id = test.session_configured.thread_id; codex .submit(Op::UserInput { @@ -764,6 +765,7 @@ async fn includes_conversation_id_and_model_headers_in_request() { let request = resp_mock.single_request(); assert_eq!(request.path(), "/v1/responses"); let request_session_id = request.header("session_id").expect("session_id header"); + let request_thread_id = request.header("thread_id").expect("thread_id header"); let request_authorization = request .header("authorization") .expect("authorization header"); @@ -772,10 +774,16 @@ async fn includes_conversation_id_and_model_headers_in_request() { let installation_id = std::fs::read_to_string(test.codex_home_path().join(INSTALLATION_ID_FILENAME)) .expect("read installation id"); + let thread_id_string = expected_thread_id.to_string(); - assert_eq!(request_session_id, session_id.to_string()); + assert_eq!(request_session_id, expected_session_id.to_string()); + assert_eq!(request_thread_id, thread_id_string.as_str()); assert_eq!(request_originator, originator().value); assert_eq!(request_authorization, "Bearer Test API Key"); + assert_eq!( + request_body["prompt_cache_key"].as_str(), + Some(thread_id_string.as_str()) + ); assert_eq!( request_body["client_metadata"]["x-codex-installation-id"].as_str(), Some(installation_id.as_str()) @@ -867,9 +875,9 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth let config = Arc::new(config); let model_info = codex_core::test_support::construct_model_info_offline(model.as_str(), &config); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, model.as_str(), model_info.slug.as_str(), /*account_id*/ None, @@ -884,7 +892,8 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth Some(AuthManager::from_auth_for_testing(CodexAuth::from_api_key( "unused-api-key", ))), - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(), provider, SessionSource::Exec, @@ -1000,7 +1009,8 @@ async fn chatgpt_auth_sends_correct_request() { .await .expect("create new conversation"); let codex = test.codex.clone(); - let thread_id = test.session_configured.session_id; + let expected_session_id = test.session_configured.session_id; + let expected_thread_id = test.session_configured.thread_id; codex .submit(Op::UserInput { @@ -1028,11 +1038,13 @@ async fn chatgpt_auth_sends_correct_request() { .expect("chatgpt-account-id header"); let request_body = request.body_json(); - let session_id = request.header("session_id").expect("session_id header"); + let request_session_id = request.header("session_id").expect("session_id header"); + let request_thread_id = request.header("thread_id").expect("thread_id header"); let installation_id = std::fs::read_to_string(test.codex_home_path().join(INSTALLATION_ID_FILENAME)) .expect("read installation id"); - assert_eq!(session_id, thread_id.to_string()); + assert_eq!(request_session_id, expected_session_id.to_string()); + assert_eq!(request_thread_id, expected_thread_id.to_string()); assert_eq!(request_originator, originator().value); assert_eq!(request_authorization, "Bearer Access Token"); @@ -2277,11 +2289,11 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { let config = Arc::new(config); let model_info = codex_core::test_support::construct_model_info_offline(model.as_str(), &config); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let auth_manager = codex_core::test_support::auth_manager_from_auth(CodexAuth::from_api_key("Test API Key")); let session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, model.as_str(), model_info.slug.as_str(), /*account_id*/ None, @@ -2295,7 +2307,8 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { let client = ModelClient::new( /*auth_manager*/ None, - conversation_id, + thread_id.into(), + thread_id, /*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(), provider.clone(), SessionSource::Exec, diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index cdbb65aabd..7ef571af37 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -15,6 +15,7 @@ use codex_otel::MetricsConfig; use codex_otel::SessionTelemetry; use codex_otel::TelemetryAuthMode; use codex_otel::current_span_w3c_trace_context; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::account::PlanType; use codex_protocol::config_types::ReasoningSummary; @@ -87,7 +88,8 @@ fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTr struct WebsocketTestHarness { _codex_home: TempDir, client: ModelClient, - conversation_id: ThreadId, + session_id: SessionId, + thread_id: ThreadId, model_info: ModelInfo, effort: Option, summary: ReasoningSummary, @@ -125,7 +127,15 @@ async fn responses_websocket_streams_request() { ); assert_eq!( handshake.header(X_CLIENT_REQUEST_ID_HEADER), - Some(harness.conversation_id.to_string()) + Some(harness.thread_id.to_string()) + ); + assert_eq!( + handshake.header("session_id"), + Some(harness.session_id.to_string()) + ); + assert_eq!( + handshake.header("thread_id"), + Some(harness.thread_id.to_string()) ); assert_eq!( handshake.header(USER_AGENT_HEADER), @@ -1827,7 +1837,8 @@ async fn websocket_harness_with_provider_options( } let config = Arc::new(config); let model_info = codex_core::test_support::construct_model_info_offline(MODEL, &config); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); + let session_id = SessionId::new(); let auth_manager = codex_core::test_support::auth_manager_from_auth(CodexAuth::from_api_key("Test API Key")); let exporter = InMemoryMetricExporter::default(); @@ -1837,7 +1848,7 @@ async fn websocket_harness_with_provider_options( ) .expect("in-memory metrics client"); let session_telemetry = SessionTelemetry::new( - conversation_id, + thread_id, MODEL, model_info.slug.as_str(), /*account_id*/ None, @@ -1853,7 +1864,8 @@ async fn websocket_harness_with_provider_options( let summary = ReasoningSummary::Auto; let client = ModelClient::new( /*auth_manager*/ None, - conversation_id, + session_id, + thread_id, /*installation_id*/ TEST_INSTALLATION_ID.to_string(), provider.clone(), SessionSource::Exec, @@ -1866,7 +1878,8 @@ async fn websocket_harness_with_provider_options( WebsocketTestHarness { _codex_home: codex_home, client, - conversation_id, + session_id, + thread_id, model_info, effort, summary, diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 38671bd763..c0a2a89394 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -273,6 +273,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { .await?; let codex = harness.test().codex.clone(); let session_id = harness.test().session_configured.session_id.to_string(); + let thread_id = harness.test().session_configured.thread_id.to_string(); let responses_mock = responses::mount_sse_sequence( harness.server(), @@ -341,6 +342,10 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { compact_request.header("session_id").as_deref(), Some(session_id.as_str()) ); + assert_eq!( + compact_request.header("thread_id").as_deref(), + Some(thread_id.as_str()) + ); let compact_body = compact_request.body_json(); assert_eq!( compact_body.get("model").and_then(|v| v.as_str()), @@ -713,6 +718,7 @@ async fn remote_compact_runs_automatically() -> Result<()> { .await?; let codex = harness.test().codex.clone(); let session_id = harness.test().session_configured.session_id.to_string(); + let thread_id = harness.test().session_configured.thread_id.to_string(); mount_sse_once( harness.server(), @@ -764,6 +770,10 @@ async fn remote_compact_runs_automatically() -> Result<()> { .as_deref(), Some(session_id.as_str()) ); + assert_eq!( + compact_mock.single_request().header("thread_id").as_deref(), + Some(thread_id.as_str()) + ); let follow_up_request = responses_mock.single_request(); let follow_up_body = follow_up_request.body_json().to_string(); assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY")); diff --git a/codex-rs/core/tests/suite/fork_thread.rs b/codex-rs/core/tests/suite/fork_thread.rs index 91d321b540..37456dce63 100644 --- a/codex-rs/core/tests/suite/fork_thread.rs +++ b/codex-rs/core/tests/suite/fork_thread.rs @@ -195,7 +195,7 @@ async fn fork_thread_from_history_does_not_require_source_rollout_path() { ForkSnapshot::Interrupted, test.config.clone(), InitialHistory::Resumed(ResumedHistory { - conversation_id: test.session_configured.session_id, + conversation_id: test.session_configured.thread_id, history: source_items.clone(), rollout_path: None, }), diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 9530675acd..65087a1fe7 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -358,7 +358,7 @@ async fn image_generation_call_event_is_emitted() -> anyhow::Result<()> { let call_id = "ig_image_saved_to_temp_dir_default"; let expected_saved_path = image_generation_artifact_path( config.codex_home.as_path(), - &session_configured.session_id.to_string(), + &session_configured.thread_id.to_string(), call_id, ); let _ = std::fs::remove_file(&expected_saved_path); @@ -444,7 +444,7 @@ async fn image_generation_call_event_is_emitted_when_image_save_fails() -> anyho } = test_codex().build(&server).await?; let expected_saved_path = image_generation_artifact_path( config.codex_home.as_path(), - &session_configured.session_id.to_string(), + &session_configured.thread_id.to_string(), "ig_invalid", ); let _ = std::fs::remove_file(&expected_saved_path); @@ -547,8 +547,8 @@ async fn agent_message_content_delta_has_item_metadata() -> anyhow::Result<()> { }) .await; - let session_id = session_configured.session_id.to_string(); - assert_eq!(delta_event.thread_id, session_id); + let thread_id = session_configured.thread_id.to_string(); + assert_eq!(delta_event.thread_id, thread_id); assert_eq!(delta_event.turn_id, started_turn_id); assert_eq!(delta_event.item_id, started_item.id); assert_eq!(delta_event.delta, "streamed response"); @@ -614,7 +614,7 @@ async fn plan_mode_emits_plan_item_from_proposed_plan_block() -> anyhow::Result< assert_eq!( plan_delta.thread_id, - session_configured.session_id.to_string() + session_configured.thread_id.to_string() ); assert_eq!(plan_delta.delta, "- Step 1\n- Step 2\n"); assert_eq!(plan_completed.text, "- Step 1\n- Step 2\n"); diff --git a/codex-rs/core/tests/suite/model_switching.rs b/codex-rs/core/tests/suite/model_switching.rs index b0df5050c7..e962b06d41 100644 --- a/codex-rs/core/tests/suite/model_switching.rs +++ b/codex-rs/core/tests/suite/model_switching.rs @@ -487,7 +487,7 @@ async fn generated_image_is_replayed_for_image_capable_models() -> Result<()> { let test = builder.build(&server).await?; let saved_path = image_generation_artifact_path( test.codex_home_path(), - &test.session_configured.session_id.to_string(), + &test.session_configured.thread_id.to_string(), "ig_123", ); let _ = std::fs::remove_file(&saved_path); @@ -601,7 +601,7 @@ async fn model_change_from_generated_image_to_text_preserves_prior_generated_ima let test = builder.build(&server).await?; let saved_path = image_generation_artifact_path( test.codex_home_path(), - &test.session_configured.session_id.to_string(), + &test.session_configured.thread_id.to_string(), "ig_123", ); let _ = std::fs::remove_file(&saved_path); @@ -717,7 +717,7 @@ async fn thread_rollback_after_generated_image_drops_entire_image_turn_history() let test = builder.build(&server).await?; let saved_path = image_generation_artifact_path( test.codex_home_path(), - &test.session_configured.session_id.to_string(), + &test.session_configured.thread_id.to_string(), "ig_rollback", ); let _ = std::fs::remove_file(&saved_path); diff --git a/codex-rs/core/tests/suite/search_tool.rs b/codex-rs/core/tests/suite/search_tool.rs index eda970d0b2..307d066928 100644 --- a/codex-rs/core/tests/suite/search_tool.rs +++ b/codex-rs/core/tests/suite/search_tool.rs @@ -596,7 +596,7 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() - ); assert_eq!( apps_tool_call.pointer("/params/_meta/x-codex-turn-metadata/session_id"), - Some(&json!(test.session_configured.session_id.to_string())) + Some(&json!(test.session_configured.thread_id.to_string())) ); assert!( apps_tool_call diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 20fa8a8f0e..b2a2778047 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -48,7 +48,7 @@ async fn new_thread_is_recorded_in_state_db() -> Result<()> { }); let test = builder.build(&server).await?; - let thread_id = test.session_configured.session_id; + let thread_id = test.session_configured.thread_id; let rollout_path = test.codex.rollout_path().expect("rollout path"); let db_path = codex_state::state_db_path(test.config.sqlite_home.as_path()); @@ -262,7 +262,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> { test.submit_turn("another message").await?; let db = test.codex.state_db().expect("state db enabled"); - let thread_id = test.session_configured.session_id; + let thread_id = test.session_configured.thread_id; let mut metadata = None; for _ in 0..100 { @@ -305,7 +305,7 @@ async fn web_search_marks_thread_memory_mode_polluted_when_configured() -> Resul }); let test = builder.build(&server).await?; let db = test.codex.state_db().expect("state db enabled"); - let thread_id = test.session_configured.session_id; + let thread_id = test.session_configured.thread_id; test.submit_turn("search the web").await?; @@ -397,7 +397,7 @@ async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result< }); let test = builder.build(&server).await?; let db = test.codex.state_db().expect("state db enabled"); - let thread_id = test.session_configured.session_id; + let thread_id = test.session_configured.thread_id; let cwd = test.cwd_path().to_path_buf(); let (sandbox_policy, permission_profile) = turn_permission_fields(PermissionProfile::read_only(), cwd.as_path()); @@ -478,7 +478,7 @@ async fn tool_call_logs_include_thread_id() -> Result<()> { }); let test = builder.build(&server).await?; let db = test.codex.state_db().expect("state db enabled"); - let expected_thread_id = test.session_configured.session_id.to_string(); + let expected_thread_id = test.session_configured.thread_id.to_string(); test.submit_turn("run a shell command").await?; diff --git a/codex-rs/core/tests/suite/subagent_notifications.rs b/codex-rs/core/tests/suite/subagent_notifications.rs index 3f457967c1..3a0c37acc7 100644 --- a/codex-rs/core/tests/suite/subagent_notifications.rs +++ b/codex-rs/core/tests/suite/subagent_notifications.rs @@ -116,7 +116,7 @@ async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result { let ids = test.thread_manager.list_thread_ids().await; if let Some(spawned_id) = ids .iter() - .find(|id| **id != test.session_configured.session_id) + .find(|id| **id != test.session_configured.thread_id) { return Ok(spawned_id.to_string()); } diff --git a/codex-rs/exec/src/event_processor_with_jsonl_output.rs b/codex-rs/exec/src/event_processor_with_jsonl_output.rs index 1641398ae6..045baacc7b 100644 --- a/codex-rs/exec/src/event_processor_with_jsonl_output.rs +++ b/codex-rs/exec/src/event_processor_with_jsonl_output.rs @@ -392,7 +392,7 @@ impl EventProcessorWithJsonOutput { pub fn thread_started_event(session_configured: &SessionConfiguredEvent) -> ThreadEvent { ThreadEvent::ThreadStarted(ThreadStartedEvent { - thread_id: session_configured.session_id.to_string(), + thread_id: session_configured.thread_id.to_string(), }) } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 0db63b8034..5ff19c744b 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -78,6 +78,8 @@ use codex_model_provider_info::LMSTUDIO_OSS_PROVIDER_ID; use codex_model_provider_info::OLLAMA_OSS_PROVIDER_ID; use codex_otel::set_parent_from_context; use codex_otel::traceparent_context_from_env; +use codex_protocol::SessionId; +use codex_protocol::ThreadId; use codex_protocol::config_types::SandboxMode; use codex_protocol::models::ActivePermissionProfile; use codex_protocol::models::ActivePermissionProfileModification; @@ -694,7 +696,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { let session_configured = session_configured_from_thread_resume_response(&response, &config) .map_err(anyhow::Error::msg)?; - (session_configured.session_id, session_configured) + (session_configured.thread_id, session_configured) } else { let response: ThreadStartResponse = send_request_with_response( &client, @@ -709,7 +711,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { let session_configured = session_configured_from_thread_start_response(&response, &config) .map_err(anyhow::Error::msg)?; - (session_configured.session_id, session_configured) + (session_configured.thread_id, session_configured) } } else { let response: ThreadStartResponse = send_request_with_response( @@ -724,7 +726,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { .map_err(anyhow::Error::msg)?; let session_configured = session_configured_from_thread_start_response(&response, &config) .map_err(anyhow::Error::msg)?; - (session_configured.session_id, session_configured) + (session_configured.thread_id, session_configured) }; let primary_thread_id_for_span = primary_thread_id.to_string(); @@ -1058,6 +1060,7 @@ fn session_configured_from_thread_start_response( config: &Config, ) -> Result { session_configured_from_thread_response( + &response.session_id, &response.thread.id, response.thread.name.clone(), response.thread.path.clone(), @@ -1082,6 +1085,7 @@ fn session_configured_from_thread_resume_response( config: &Config, ) -> Result { session_configured_from_thread_response( + &response.session_id, &response.thread.id, response.thread.name.clone(), response.thread.path.clone(), @@ -1115,6 +1119,7 @@ fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget { reason = "session mapping keeps explicit fields" )] fn session_configured_from_thread_response( + session_id: &str, thread_id: &str, thread_name: Option, rollout_path: Option, @@ -1128,11 +1133,14 @@ fn session_configured_from_thread_response( cwd: AbsolutePathBuf, reasoning_effort: Option, ) -> Result { - let session_id = codex_protocol::ThreadId::from_string(thread_id) + let session_id = SessionId::from_string(session_id) + .map_err(|err| format!("session id `{session_id}` is invalid: {err}"))?; + let thread_id = ThreadId::from_string(thread_id) .map_err(|err| format!("thread id `{thread_id}` is invalid: {err}"))?; Ok(SessionConfiguredEvent { session_id, + thread_id, forked_from_id: None, thread_source: None, thread_name, diff --git a/codex-rs/exec/src/lib_tests.rs b/codex-rs/exec/src/lib_tests.rs index b35f6e9bfa..ad528a194d 100644 --- a/codex-rs/exec/src/lib_tests.rs +++ b/codex-rs/exec/src/lib_tests.rs @@ -441,6 +441,14 @@ async fn session_configured_from_thread_response_uses_review_policy_from_respons let event = session_configured_from_thread_start_response(&response, &config) .expect("build bootstrap session configured event"); + assert_eq!( + event.session_id.to_string(), + "67e55044-10b1-426f-9247-bb680e5fe0c7" + ); + assert_eq!( + event.thread_id.to_string(), + "67e55044-10b1-426f-9247-bb680e5fe0c8" + ); assert_eq!(event.approvals_reviewer, ApprovalsReviewer::AutoReview); } @@ -465,6 +473,7 @@ async fn session_configured_from_thread_response_uses_permission_profile_from_re fn sample_thread_start_response() -> ThreadStartResponse { ThreadStartResponse { + session_id: "67e55044-10b1-426f-9247-bb680e5fe0c7".to_string(), thread: codex_app_server_protocol::Thread { id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), forked_from_id: None, diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index e066cb6ddc..2e6560be94 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -27,6 +27,7 @@ use codex_app_server_protocol::TurnPlanUpdatedNotification; use codex_app_server_protocol::TurnStartedNotification; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::WebSearchAction as ApiWebSearchAction; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::models::PermissionProfile; use codex_protocol::models::WebSearchAction; @@ -104,9 +105,11 @@ fn map_todo_items_preserves_text_and_completion_state() { #[test] fn session_configured_produces_thread_started_event() { + let thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8") + .expect("thread id should parse"); let session_configured = SessionConfiguredEvent { - session_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8") - .expect("thread id should parse"), + session_id: SessionId::from(thread_id), + thread_id, forked_from_id: None, thread_source: None, thread_name: None, diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index 1ab6b1a0af..c9ad91bbc6 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -296,7 +296,8 @@ mod tests { let event = Event { id: "1".to_string(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id: thread_id, + session_id: codex_protocol::SessionId::new(), + thread_id, forked_from_id: None, thread_source: None, thread_name: None, @@ -339,10 +340,11 @@ mod tests { let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new()?; let session_configured_event = SessionConfiguredEvent { - session_id: conversation_id, + session_id: codex_protocol::SessionId::new(), + thread_id, forked_from_id: None, thread_source: None, thread_name: None, @@ -387,6 +389,7 @@ mod tests { "msg": { "type": "session_configured", "session_id": session_configured_event.session_id, + "thread_id": session_configured_event.thread_id, "model": "gpt-4o", "model_provider_id": "test-provider", "approval_policy": "never", @@ -411,7 +414,8 @@ mod tests { let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new()?; let session_configured_event = SessionConfiguredEvent { - session_id: thread_id, + session_id: codex_protocol::SessionId::new(), + thread_id, forked_from_id: None, thread_source: None, thread_name: None, @@ -457,6 +461,7 @@ mod tests { "msg": { "type": "session_configured", "session_id": session_configured_event.session_id, + "thread_id": session_configured_event.thread_id, "model": "gpt-4o", "model_provider_id": "test-provider", "approval_policy": "never", diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 5db8761058..2e2c664e64 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -15,6 +15,7 @@ use codex_login::auth_env_telemetry::collect_auth_env_telemetry; use codex_login::default_client::originator; use codex_otel::SessionTelemetry; use codex_otel::TelemetryAuthMode; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::ServiceTier; @@ -174,6 +175,7 @@ impl MemoryStartupContext { let session_source = self.thread.config_snapshot().await.session_source; let model_client = ModelClient::new( Some(Arc::clone(&self.auth_manager)), + SessionId::from(self.thread_id), self.thread_id, installation_id, config.model_provider.clone(), diff --git a/codex-rs/memories/write/src/startup_tests.rs b/codex-rs/memories/write/src/startup_tests.rs index d89b68825e..f80b891ddc 100644 --- a/codex-rs/memories/write/src/startup_tests.rs +++ b/codex-rs/memories/write/src/startup_tests.rs @@ -265,7 +265,7 @@ async fn memories_startup_phase1_uses_live_thread_service_tier() -> anyhow::Resu let context = crate::runtime::MemoryStartupContext::new( Arc::clone(&test.thread_manager), test.thread_manager.auth_manager(), - test.session_configured.session_id, + test.session_configured.thread_id, Arc::clone(&test.codex), &test.config, config_snapshot.session_source.clone(), @@ -317,7 +317,7 @@ async fn trigger_memories_startup(test: &TestCodex) { start_memories_startup_task( Arc::clone(&test.thread_manager), test.thread_manager.auth_manager(), - test.session_configured.session_id, + test.session_configured.thread_id, Arc::clone(&test.codex), Arc::new(config), &config_snapshot.session_source, diff --git a/codex-rs/protocol/src/lib.rs b/codex-rs/protocol/src/lib.rs index 175c92331f..a945b1a927 100644 --- a/codex-rs/protocol/src/lib.rs +++ b/codex-rs/protocol/src/lib.rs @@ -1,9 +1,11 @@ pub mod account; mod agent_path; pub mod auth; +mod session_id; mod thread_id; mod tool_name; pub use agent_path::AgentPath; +pub use session_id::SessionId; pub use thread_id::ThreadId; pub use tool_name::ToolName; pub mod approvals; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 09be839f93..017eca8c59 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -14,6 +14,7 @@ use std::time::Duration; use strum_macros::EnumIter; use crate::AgentPath; +use crate::SessionId; use crate::ThreadId; use crate::approvals::ElicitationRequestEvent; use crate::config_types::ApprovalsReviewer; @@ -3456,7 +3457,8 @@ pub struct SessionNetworkProxyRuntime { #[derive(Debug, Clone, Serialize, JsonSchema, TS)] pub struct SessionConfiguredEvent { - pub session_id: ThreadId, + pub session_id: SessionId, + pub thread_id: ThreadId, #[serde(skip_serializing_if = "Option::is_none")] pub forked_from_id: Option, /// Optional analytics source classification for this thread. @@ -3530,7 +3532,9 @@ impl<'de> Deserialize<'de> for SessionConfiguredEvent { { #[derive(Deserialize)] struct Wire { - session_id: ThreadId, + session_id: SessionId, + #[serde(default)] + thread_id: Option, forked_from_id: Option, #[serde(default)] thread_source: Option, @@ -3572,6 +3576,7 @@ impl<'de> Deserialize<'de> for SessionConfiguredEvent { Ok(Self { session_id: wire.session_id, + thread_id: wire.thread_id.unwrap_or_else(|| wire.session_id.into()), forked_from_id: wire.forked_from_id, thread_source: wire.thread_source, thread_name: wire.thread_name, @@ -5294,13 +5299,15 @@ mod tests { /// amount of nesting. #[test] fn serialize_event() -> Result<()> { - let conversation_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")?; + let session_id = SessionId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c7")?; + let thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")?; let rollout_file = NamedTempFile::new()?; let permission_profile = PermissionProfile::read_only(); let event = Event { id: "1234".to_string(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id: conversation_id, + session_id, + thread_id, forked_from_id: None, thread_source: None, thread_name: None, @@ -5325,7 +5332,8 @@ mod tests { "id": "1234", "msg": { "type": "session_configured", - "session_id": "67e55044-10b1-426f-9247-bb680e5fe0c8", + "session_id": "67e55044-10b1-426f-9247-bb680e5fe0c7", + "thread_id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "model": "codex-mini-latest", "model_provider_id": "openai", "approval_policy": "never", diff --git a/codex-rs/protocol/src/session_id.rs b/codex-rs/protocol/src/session_id.rs new file mode 100644 index 0000000000..ac22103d33 --- /dev/null +++ b/codex-rs/protocol/src/session_id.rs @@ -0,0 +1,126 @@ +use std::fmt::Display; + +use schemars::JsonSchema; +use schemars::r#gen::SchemaGenerator; +use schemars::schema::Schema; +use serde::Deserialize; +use serde::Serialize; +use ts_rs::TS; +use uuid::Uuid; + +use crate::ThreadId; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, TS, Hash)] +#[ts(type = "string")] +pub struct SessionId { + pub(crate) uuid: Uuid, +} + +impl SessionId { + pub fn new() -> Self { + Self { + uuid: Uuid::now_v7(), + } + } + + pub fn from_string(s: &str) -> Result { + Ok(Self { + uuid: Uuid::parse_str(s)?, + }) + } +} + +impl TryFrom<&str> for SessionId { + type Error = uuid::Error; + + fn try_from(value: &str) -> Result { + Self::from_string(value) + } +} + +impl TryFrom for SessionId { + type Error = uuid::Error; + + fn try_from(value: String) -> Result { + Self::from_string(value.as_str()) + } +} + +impl From for String { + fn from(value: SessionId) -> Self { + value.to_string() + } +} + +impl From for SessionId { + fn from(value: ThreadId) -> Self { + Self { uuid: value.uuid } + } +} + +impl From for ThreadId { + fn from(value: SessionId) -> Self { + ThreadId { uuid: value.uuid } + } +} + +impl Default for SessionId { + fn default() -> Self { + Self::new() + } +} + +impl Display for SessionId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.uuid, f) + } +} + +impl Serialize for SessionId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.collect_str(&self.uuid) + } +} + +impl<'de> Deserialize<'de> for SessionId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = String::deserialize(deserializer)?; + let uuid = Uuid::parse_str(&value).map_err(serde::de::Error::custom)?; + Ok(Self { uuid }) + } +} + +impl JsonSchema for SessionId { + fn schema_name() -> String { + "SessionId".to_string() + } + + fn json_schema(generator: &mut SchemaGenerator) -> Schema { + ::json_schema(generator) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_session_id_default_is_not_zeroes() { + let id = SessionId::default(); + assert_ne!(id.uuid, Uuid::nil()); + } + + #[test] + fn converts_to_and_from_thread_id() { + let thread_id = ThreadId::new(); + let session_id = SessionId::from(thread_id); + + assert_eq!(ThreadId::from(session_id), thread_id); + } +} diff --git a/codex-rs/protocol/src/thread_id.rs b/codex-rs/protocol/src/thread_id.rs index 8d6d96eff8..d6e9a8825e 100644 --- a/codex-rs/protocol/src/thread_id.rs +++ b/codex-rs/protocol/src/thread_id.rs @@ -11,7 +11,7 @@ use uuid::Uuid; #[derive(Debug, Clone, Copy, PartialEq, Eq, TS, Hash)] #[ts(type = "string")] pub struct ThreadId { - uuid: Uuid, + pub(crate) uuid: Uuid, } impl ThreadId { diff --git a/codex-rs/tui/src/app_server_session.rs b/codex-rs/tui/src/app_server_session.rs index 9aafe9aed3..12f2fdcf62 100644 --- a/codex-rs/tui/src/app_server_session.rs +++ b/codex-rs/tui/src/app_server_session.rs @@ -1813,6 +1813,7 @@ mod tests { let forked_from_id = ThreadId::new(); let read_only_profile = PermissionProfile::read_only(); let response = ThreadResumeResponse { + session_id: ThreadId::new().to_string(), thread: codex_app_server_protocol::Thread { id: thread_id.to_string(), forked_from_id: Some(forked_from_id.to_string()), diff --git a/codex-rs/tui/src/chatwidget/tests/composer_submission.rs b/codex-rs/tui/src/chatwidget/tests/composer_submission.rs index 76b2b29bc0..0fa4567d9f 100644 --- a/codex-rs/tui/src/chatwidget/tests/composer_submission.rs +++ b/codex-rs/tui/src/chatwidget/tests/composer_submission.rs @@ -12,10 +12,10 @@ use pretty_assertions::assert_eq; async fn submission_preserves_text_elements_and_local_images() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -95,7 +95,7 @@ async fn submission_preserves_text_elements_and_local_images() { async fn submission_includes_configured_permission_profile() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let expected_permission_profile: PermissionProfile = AppServerPermissionProfile::Managed { network: PermissionProfileNetworkPermissions { enabled: false }, @@ -119,7 +119,7 @@ async fn submission_includes_configured_permission_profile() { } .into(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -161,7 +161,7 @@ async fn submission_includes_configured_permission_profile() { async fn submission_keeps_profile_when_legacy_projection_is_external() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let expected_permission_profile: PermissionProfile = AppServerPermissionProfile::Managed { network: PermissionProfileNetworkPermissions { enabled: false }, @@ -169,7 +169,7 @@ async fn submission_keeps_profile_when_legacy_projection_is_external() { } .into(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -208,10 +208,10 @@ async fn submission_keeps_profile_when_legacy_projection_is_external() { async fn submission_with_remote_and_local_images_keeps_local_placeholder_numbering() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -302,10 +302,10 @@ async fn submission_with_remote_and_local_images_keeps_local_placeholder_numberi async fn enter_with_only_remote_images_submits_user_turn() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -366,10 +366,10 @@ async fn enter_with_only_remote_images_submits_user_turn() { async fn shift_enter_with_only_remote_images_does_not_submit_user_turn() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -405,10 +405,10 @@ async fn shift_enter_with_only_remote_images_does_not_submit_user_turn() { async fn enter_with_only_remote_images_does_not_submit_when_modal_is_active() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -444,10 +444,10 @@ async fn enter_with_only_remote_images_does_not_submit_when_modal_is_active() { async fn enter_with_only_remote_images_does_not_submit_when_input_disabled() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -486,10 +486,10 @@ async fn enter_with_only_remote_images_does_not_submit_when_input_disabled() { async fn submission_prefers_selected_duplicate_skill_path() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, diff --git a/codex-rs/tui/src/chatwidget/tests/exec_flow.rs b/codex-rs/tui/src/chatwidget/tests/exec_flow.rs index c9789be14d..40ccadb7c3 100644 --- a/codex-rs/tui/src/chatwidget/tests/exec_flow.rs +++ b/codex-rs/tui/src/chatwidget/tests/exec_flow.rs @@ -941,10 +941,10 @@ async fn user_shell_command_renders_output_not_exploring() { #[tokio::test] async fn bang_shell_enter_while_task_running_submits_run_user_shell_command() { let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, diff --git a/codex-rs/tui/src/chatwidget/tests/history_replay.rs b/codex-rs/tui/src/chatwidget/tests/history_replay.rs index 8c31175998..93e8e62f07 100644 --- a/codex-rs/tui/src/chatwidget/tests/history_replay.rs +++ b/codex-rs/tui/src/chatwidget/tests/history_replay.rs @@ -14,10 +14,10 @@ use pretty_assertions::assert_eq; async fn resumed_initial_messages_render_history() { let (mut chat, mut rx, _ops) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -85,10 +85,10 @@ async fn replayed_user_message_preserves_text_elements_and_local_images() { )]; let local_images = vec![PathBuf::from("/tmp/replay.png")]; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -154,10 +154,10 @@ async fn replayed_user_message_preserves_remote_image_urls() { let message = "replayed with remote image".to_string(); let remote_image_urls = vec!["https://example.com/image.png".to_string()]; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -350,10 +350,10 @@ async fn replayed_user_message_with_only_remote_images_renders_history_cell() { let remote_image_urls = vec!["https://example.com/remote-only.png".to_string()]; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, @@ -405,10 +405,10 @@ async fn replayed_user_message_with_only_local_images_renders_history_cell() { let local_images = [PathBuf::from("/tmp/replay-local-only.png")]; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None, diff --git a/codex-rs/tui/src/chatwidget/tests/plan_mode.rs b/codex-rs/tui/src/chatwidget/tests/plan_mode.rs index 8af07a6aa9..5fb10b8087 100644 --- a/codex-rs/tui/src/chatwidget/tests/plan_mode.rs +++ b/codex-rs/tui/src/chatwidget/tests/plan_mode.rs @@ -1202,10 +1202,10 @@ async fn submit_user_message_queues_while_compaction_turn_is_running() { #[tokio::test(flavor = "multi_thread")] async fn submit_user_message_emits_structured_plugin_mentions_from_bindings() { let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new().unwrap(); let configured = crate::session_state::ThreadSessionState { - thread_id: conversation_id, + thread_id, forked_from_id: None, fork_parent_title: None, thread_name: None,