From b08376a7274bde7f63264deb16d3ababa5a0d6fa Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 4 May 2026 12:06:46 +0100 Subject: [PATCH] session_id = thread_id --- codex-rs/core/src/agent/control.rs | 5 + codex-rs/core/src/session/session.rs | 8 +- codex-rs/core/src/session/tests.rs | 165 +++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 11391147cc..f3b5f2e3ab 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -152,6 +152,11 @@ impl AgentControl { } } + pub(crate) fn with_session_id(mut self, session_id: SessionId) -> Self { + self.session_id = session_id; + self + } + pub(crate) fn session_id(&self) -> SessionId { self.session_id } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index d12927c684..2989e13096 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -1,5 +1,6 @@ use super::*; use crate::goals::GoalRuntimeState; +use codex_protocol::SessionId; use codex_protocol::permissions::FileSystemPath; use codex_protocol::permissions::FileSystemSpecialPath; use tokio::sync::Semaphore; @@ -802,7 +803,12 @@ impl Session { config.analytics_enabled, ) }); - let session_id = agent_control.session_id(); + let session_id = if session_configuration.session_source.is_non_root_agent() { + agent_control.session_id() + } else { + SessionId::from(thread_id) + }; + let agent_control = agent_control.with_session_id(session_id); let services = SessionServices { // Initialize the MCP connection manager with an uninitialized // instance. It will be replaced with one created via diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index b82f9ff938..0406699449 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -30,6 +30,7 @@ use codex_models_manager::model_info; use codex_models_manager::test_support::construct_model_info_offline_for_tests; use codex_models_manager::test_support::get_model_offline_for_tests; use codex_protocol::AgentPath; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::account::PlanType as AccountPlanType; use codex_protocol::config_types::ServiceTier; @@ -3730,6 +3731,170 @@ async fn make_session_with_config_and_rx( Ok((session, rx_event)) } +async fn make_session_with_history_source_and_agent_control_and_rx( + initial_history: InitialHistory, + session_source: SessionSource, + agent_control: AgentControl, +) -> anyhow::Result<(Arc, async_channel::Receiver)> { + let codex_home = tempfile::tempdir().expect("create temp dir"); + let mut config = build_test_config(codex_home.path()).await; + config.ephemeral = true; + let config = Arc::new(config); + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); + let models_manager = models_manager_with_provider( + config.codex_home.to_path_buf(), + auth_manager.clone(), + config.model_provider.clone(), + ); + let model = get_model_offline_for_tests(config.model.as_deref()); + let model_info = + construct_model_info_offline_for_tests(model.as_str(), &config.to_models_manager_config()); + let collaboration_mode = CollaborationMode { + mode: ModeKind::Default, + settings: Settings { + model, + reasoning_effort: config.model_reasoning_effort, + developer_instructions: None, + }, + }; + let default_environments = vec![TurnEnvironmentSelection { + environment_id: codex_exec_server::LOCAL_ENVIRONMENT_ID.to_string(), + cwd: config.cwd.clone(), + }]; + let session_configuration = SessionConfiguration { + provider: config.model_provider.clone(), + collaboration_mode, + model_reasoning_summary: config.model_reasoning_summary, + developer_instructions: config.developer_instructions.clone(), + user_instructions: config.user_instructions.clone(), + service_tier: None, + personality: config.personality, + base_instructions: config + .base_instructions + .clone() + .unwrap_or_else(|| model_info.get_model_instructions(config.personality)), + compact_prompt: config.compact_prompt.clone(), + approval_policy: config.permissions.approval_policy.clone(), + approvals_reviewer: config.approvals_reviewer, + permission_profile: config.permissions.permission_profile.clone(), + active_permission_profile: config.permissions.active_permission_profile(), + windows_sandbox_level: WindowsSandboxLevel::from_config(&config), + cwd: config.cwd.clone(), + codex_home: config.codex_home.clone(), + thread_name: None, + environments: default_environments, + original_config_do_not_use: Arc::clone(&config), + metrics_service_name: None, + app_server_client_name: None, + app_server_client_version: None, + session_source: session_source.clone(), + dynamic_tools: Vec::new(), + persist_extended_history: false, + inherited_shell_snapshot: None, + user_shell_override: None, + }; + + let (tx_event, rx_event) = async_channel::unbounded(); + let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit); + let plugins_manager = Arc::new(PluginsManager::new(config.codex_home.to_path_buf())); + let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager))); + let skills_manager = Arc::new(SkillsManager::new( + config.codex_home.clone(), + /*bundled_skills_enabled*/ true, + )); + + let session = Session::new( + session_configuration, + Arc::clone(&config), + auth_manager, + models_manager, + Arc::new(ExecPolicyManager::default()), + tx_event, + agent_status_tx, + initial_history, + session_source, + skills_manager, + plugins_manager, + mcp_manager, + Arc::new(SkillsWatcher::noop()), + agent_control, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + /*analytics_events_client*/ None, + Arc::new(codex_thread_store::LocalThreadStore::new( + codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), + )), + codex_rollout_trace::ThreadTraceContext::disabled(), + ) + .await?; + + Ok((session, rx_event)) +} + +#[tokio::test] +async fn resumed_root_session_uses_thread_id_as_session_id() { + let thread_id = ThreadId::new(); + let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx( + InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: Vec::new(), + rollout_path: None, + }), + SessionSource::Exec, + AgentControl::default(), + ) + .await + .expect("resume should succeed"); + + assert_eq!( + session.services.agent_control.session_id(), + SessionId::from(thread_id) + ); + + let event = rx_event.recv().await.expect("session configured event"); + let EventMsg::SessionConfigured(event) = event.msg else { + panic!("expected session configured event"); + }; + assert_eq!(event.session_id, SessionId::from(thread_id)); + assert_eq!(event.thread_id, thread_id); +} + +#[tokio::test] +async fn resumed_subagent_session_keeps_inherited_session_id() { + let parent_thread_id = ThreadId::new(); + let parent_session_id = SessionId::from(parent_thread_id); + let thread_id = ThreadId::new(); + let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: None, + }); + let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx( + InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: Vec::new(), + rollout_path: None, + }), + session_source, + AgentControl::default().with_session_id(parent_session_id), + ) + .await + .expect("resume should succeed"); + + assert_eq!( + session.services.agent_control.session_id(), + parent_session_id + ); + + let event = rx_event.recv().await.expect("session configured event"); + let EventMsg::SessionConfigured(event) = event.msg else { + panic!("expected session configured event"); + }; + assert_eq!(event.session_id, parent_session_id); + assert_eq!(event.thread_id, thread_id); +} + #[tokio::test] async fn notify_request_permissions_response_ignores_unmatched_call_id() { let (session, _turn_context) = make_session_and_context().await;