session_id = thread_id

This commit is contained in:
jif-oai
2026-05-04 12:06:46 +01:00
parent 85f522a73e
commit b08376a727
3 changed files with 177 additions and 1 deletions

View File

@@ -152,6 +152,11 @@ impl AgentControl {
}
}
pub(crate) fn with_session_id(mut self, session_id: SessionId) -> Self {
self.session_id = session_id;
self
}
pub(crate) fn session_id(&self) -> SessionId {
self.session_id
}

View File

@@ -1,5 +1,6 @@
use super::*;
use crate::goals::GoalRuntimeState;
use codex_protocol::SessionId;
use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSpecialPath;
use tokio::sync::Semaphore;
@@ -802,7 +803,12 @@ impl Session {
config.analytics_enabled,
)
});
let session_id = agent_control.session_id();
let session_id = if session_configuration.session_source.is_non_root_agent() {
agent_control.session_id()
} else {
SessionId::from(thread_id)
};
let agent_control = agent_control.with_session_id(session_id);
let services = SessionServices {
// Initialize the MCP connection manager with an uninitialized
// instance. It will be replaced with one created via

View File

@@ -30,6 +30,7 @@ use codex_models_manager::model_info;
use codex_models_manager::test_support::construct_model_info_offline_for_tests;
use codex_models_manager::test_support::get_model_offline_for_tests;
use codex_protocol::AgentPath;
use codex_protocol::SessionId;
use codex_protocol::ThreadId;
use codex_protocol::account::PlanType as AccountPlanType;
use codex_protocol::config_types::ServiceTier;
@@ -3730,6 +3731,170 @@ async fn make_session_with_config_and_rx(
Ok((session, rx_event))
}
async fn make_session_with_history_source_and_agent_control_and_rx(
initial_history: InitialHistory,
session_source: SessionSource,
agent_control: AgentControl,
) -> anyhow::Result<(Arc<Session>, async_channel::Receiver<Event>)> {
let codex_home = tempfile::tempdir().expect("create temp dir");
let mut config = build_test_config(codex_home.path()).await;
config.ephemeral = true;
let config = Arc::new(config);
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
let models_manager = models_manager_with_provider(
config.codex_home.to_path_buf(),
auth_manager.clone(),
config.model_provider.clone(),
);
let model = get_model_offline_for_tests(config.model.as_deref());
let model_info =
construct_model_info_offline_for_tests(model.as_str(), &config.to_models_manager_config());
let collaboration_mode = CollaborationMode {
mode: ModeKind::Default,
settings: Settings {
model,
reasoning_effort: config.model_reasoning_effort,
developer_instructions: None,
},
};
let default_environments = vec![TurnEnvironmentSelection {
environment_id: codex_exec_server::LOCAL_ENVIRONMENT_ID.to_string(),
cwd: config.cwd.clone(),
}];
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
collaboration_mode,
model_reasoning_summary: config.model_reasoning_summary,
developer_instructions: config.developer_instructions.clone(),
user_instructions: config.user_instructions.clone(),
service_tier: None,
personality: config.personality,
base_instructions: config
.base_instructions
.clone()
.unwrap_or_else(|| model_info.get_model_instructions(config.personality)),
compact_prompt: config.compact_prompt.clone(),
approval_policy: config.permissions.approval_policy.clone(),
approvals_reviewer: config.approvals_reviewer,
permission_profile: config.permissions.permission_profile.clone(),
active_permission_profile: config.permissions.active_permission_profile(),
windows_sandbox_level: WindowsSandboxLevel::from_config(&config),
cwd: config.cwd.clone(),
codex_home: config.codex_home.clone(),
thread_name: None,
environments: default_environments,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
app_server_client_version: None,
session_source: session_source.clone(),
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
let (tx_event, rx_event) = async_channel::unbounded();
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let plugins_manager = Arc::new(PluginsManager::new(config.codex_home.to_path_buf()));
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
let skills_manager = Arc::new(SkillsManager::new(
config.codex_home.clone(),
/*bundled_skills_enabled*/ true,
));
let session = Session::new(
session_configuration,
Arc::clone(&config),
auth_manager,
models_manager,
Arc::new(ExecPolicyManager::default()),
tx_event,
agent_status_tx,
initial_history,
session_source,
skills_manager,
plugins_manager,
mcp_manager,
Arc::new(SkillsWatcher::noop()),
agent_control,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
)),
codex_rollout_trace::ThreadTraceContext::disabled(),
)
.await?;
Ok((session, rx_event))
}
#[tokio::test]
async fn resumed_root_session_uses_thread_id_as_session_id() {
let thread_id = ThreadId::new();
let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx(
InitialHistory::Resumed(ResumedHistory {
conversation_id: thread_id,
history: Vec::new(),
rollout_path: None,
}),
SessionSource::Exec,
AgentControl::default(),
)
.await
.expect("resume should succeed");
assert_eq!(
session.services.agent_control.session_id(),
SessionId::from(thread_id)
);
let event = rx_event.recv().await.expect("session configured event");
let EventMsg::SessionConfigured(event) = event.msg else {
panic!("expected session configured event");
};
assert_eq!(event.session_id, SessionId::from(thread_id));
assert_eq!(event.thread_id, thread_id);
}
#[tokio::test]
async fn resumed_subagent_session_keeps_inherited_session_id() {
let parent_thread_id = ThreadId::new();
let parent_session_id = SessionId::from(parent_thread_id);
let thread_id = ThreadId::new();
let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
});
let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx(
InitialHistory::Resumed(ResumedHistory {
conversation_id: thread_id,
history: Vec::new(),
rollout_path: None,
}),
session_source,
AgentControl::default().with_session_id(parent_session_id),
)
.await
.expect("resume should succeed");
assert_eq!(
session.services.agent_control.session_id(),
parent_session_id
);
let event = rx_event.recv().await.expect("session configured event");
let EventMsg::SessionConfigured(event) = event.msg else {
panic!("expected session configured event");
};
assert_eq!(event.session_id, parent_session_id);
assert_eq!(event.thread_id, thread_id);
}
#[tokio::test]
async fn notify_request_permissions_response_ignores_unmatched_call_id() {
let (session, _turn_context) = make_session_and_context().await;