Compare commits

...

11 Commits

18 changed files with 519 additions and 103 deletions

View File

@@ -9212,6 +9212,10 @@ mod tests {
model: "gpt-5".to_string(),
model_provider_id: "openai".to_string(),
service_tier: Some(codex_protocol::config_types::ServiceTier::Flex),
plan_mode_reasoning_effort: None,
model_verbosity: None,
model_context_window: None,
model_auto_compact_token_limit: None,
approval_policy: codex_protocol::protocol::AskForApproval::OnRequest,
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer::User,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
@@ -9219,6 +9223,7 @@ mod tests {
ephemeral: false,
reasoning_effort: None,
personality: None,
active_profile: None,
session_source: SessionSource::Cli,
};

View File

@@ -15,6 +15,7 @@ use crate::shell_snapshot::ShellSnapshot;
use crate::thread_manager::ThreadManagerState;
use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns;
use codex_features::Feature;
use codex_mcp::McpConnectionManager;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
@@ -36,6 +37,7 @@ use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::RwLock;
use tokio::sync::watch;
use tracing::warn;
@@ -192,6 +194,12 @@ impl AgentControl {
let inherited_exec_policy = self
.inherited_exec_policy_for_source(&state, session_source.as_ref(), &config)
.await;
let inherited_prompt_cache_key = self
.inherited_prompt_cache_key_for_source(&state, session_source.as_ref())
.await;
let inherited_mcp_connection_manager = self
.inherited_mcp_connection_manager_for_source(&state, session_source.as_ref())
.await;
let (session_source, mut agent_metadata) = match session_source {
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
@@ -225,6 +233,8 @@ impl AgentControl {
&options,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
)
.await?
}
@@ -238,6 +248,8 @@ impl AgentControl {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
)
.await?
}
@@ -323,6 +335,7 @@ impl AgentControl {
})
}
#[allow(clippy::too_many_arguments)]
async fn spawn_forked_thread(
&self,
state: &Arc<ThreadManagerState>,
@@ -331,6 +344,8 @@ impl AgentControl {
options: &SpawnAgentOptions,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_prompt_cache_key: Option<ThreadId>,
inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
) -> CodexResult<crate::thread_manager::NewThread> {
if options.fork_parent_spawn_call_id.is_none() {
return Err(CodexErr::Fatal(
@@ -396,6 +411,8 @@ impl AgentControl {
/*persist_extended_history*/ false,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
)
.await
}
@@ -524,6 +541,12 @@ impl AgentControl {
let inherited_exec_policy = self
.inherited_exec_policy_for_source(&state, Some(&session_source), &config)
.await;
let inherited_prompt_cache_key = self
.inherited_prompt_cache_key_for_source(&state, Some(&session_source))
.await;
let inherited_mcp_connection_manager = self
.inherited_mcp_connection_manager_for_source(&state, Some(&session_source))
.await;
let rollout_path =
match find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
@@ -545,6 +568,8 @@ impl AgentControl {
session_source,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
)
.await?;
let mut agent_metadata = agent_metadata;
@@ -1055,6 +1080,40 @@ impl AgentControl {
))
}
async fn inherited_prompt_cache_key_for_source(
&self,
state: &Arc<ThreadManagerState>,
session_source: Option<&SessionSource>,
) -> Option<ThreadId> {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return None;
};
let parent_thread = state.get_thread(*parent_thread_id).await.ok()?;
Some(parent_thread.codex.session.prompt_cache_key())
}
async fn inherited_mcp_connection_manager_for_source(
&self,
state: &Arc<ThreadManagerState>,
session_source: Option<&SessionSource>,
) -> Option<Arc<RwLock<McpConnectionManager>>> {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return None;
};
let parent_thread = state.get_thread(*parent_thread_id).await.ok()?;
Some(Arc::clone(
&parent_thread.codex.session.services.mcp_connection_manager,
))
}
async fn open_thread_spawn_children(
&self,
parent_thread_id: ThreadId,

View File

@@ -651,6 +651,14 @@ async fn spawn_agent_can_fork_parent_thread_history_with_sanitized_items() {
.await
.expect("child thread should be registered");
assert_ne!(child_thread_id, parent_thread_id);
assert_eq!(
child_thread.codex.session.prompt_cache_key(),
parent_thread.codex.session.prompt_cache_key(),
);
assert!(Arc::ptr_eq(
&child_thread.codex.session.services.mcp_connection_manager,
&parent_thread.codex.session.services.mcp_connection_manager,
));
let history = child_thread.codex.session.clone_history().await;
let expected_history = [
ResponseItem::Message {

View File

@@ -144,6 +144,7 @@ struct ModelClientState {
conversation_id: ThreadId,
window_generation: AtomicU64,
installation_id: String,
prompt_cache_key: ThreadId,
provider: ModelProviderInfo,
auth_env_telemetry: AuthEnvTelemetry,
session_source: SessionSource,
@@ -266,6 +267,7 @@ impl ModelClient {
auth_manager: Option<Arc<AuthManager>>,
conversation_id: ThreadId,
installation_id: String,
prompt_cache_key: ThreadId,
provider: ModelProviderInfo,
session_source: SessionSource,
model_verbosity: Option<VerbosityConfig>,
@@ -284,6 +286,7 @@ impl ModelClient {
conversation_id,
window_generation: AtomicU64::new(0),
installation_id,
prompt_cache_key,
provider,
auth_env_telemetry,
session_source,
@@ -331,6 +334,10 @@ impl ModelClient {
format!("{conversation_id}:{window_generation}")
}
pub(crate) fn prompt_cache_key(&self) -> ThreadId {
self.state.prompt_cache_key
}
fn take_cached_websocket_session(&self) -> WebsocketSession {
let mut cached_websocket_session = self
.state
@@ -803,7 +810,7 @@ impl ModelClientSession {
None
};
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
let prompt_cache_key = Some(self.client.state.prompt_cache_key.to_string());
let request = ResponsesApiRequest {
model: model_info.slug.clone(),
instructions: instructions.clone(),

View File

@@ -20,11 +20,13 @@ use pretty_assertions::assert_eq;
use serde_json::json;
fn test_model_client(session_source: SessionSource) -> ModelClient {
let conversation_id = ThreadId::new();
let provider = create_oss_provider_with_base_url("https://example.com/v1", WireApi::Responses);
ModelClient::new(
/*auth_manager*/ None,
ThreadId::new(),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
conversation_id,
provider,
session_source,
/*model_verbosity*/ None,

View File

@@ -426,6 +426,8 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) metrics_service_name: Option<String>,
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
pub(crate) inherited_prompt_cache_key: Option<ThreadId>,
pub(crate) inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) parent_trace: Option<W3cTraceContext>,
}
@@ -481,6 +483,8 @@ impl Codex {
inherited_shell_snapshot,
user_shell_override,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
parent_trace: _,
} = args;
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
@@ -646,9 +650,11 @@ impl Codex {
app_server_client_name: None,
app_server_client_version: None,
session_source,
prompt_cache_key: inherited_prompt_cache_key,
dynamic_tools,
persist_extended_history,
inherited_shell_snapshot,
inherited_mcp_connection_manager,
user_shell_override,
};
@@ -1131,9 +1137,11 @@ pub(crate) struct SessionConfiguration {
app_server_client_version: Option<String>,
/// Source of the session (cli, vscode, exec, mcp, ...)
session_source: SessionSource,
prompt_cache_key: Option<ThreadId>,
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
user_shell_override: Option<shell::Shell>,
}
@@ -1147,6 +1155,12 @@ impl SessionConfiguration {
model: self.collaboration_mode.model().to_string(),
model_provider_id: self.original_config_do_not_use.model_provider_id.clone(),
service_tier: self.service_tier,
plan_mode_reasoning_effort: self.original_config_do_not_use.plan_mode_reasoning_effort,
model_verbosity: self.original_config_do_not_use.model_verbosity,
model_context_window: self.original_config_do_not_use.model_context_window,
model_auto_compact_token_limit: self
.original_config_do_not_use
.model_auto_compact_token_limit,
approval_policy: self.approval_policy.value(),
approvals_reviewer: self.approvals_reviewer,
sandbox_policy: self.sandbox_policy.get().clone(),
@@ -1154,6 +1168,7 @@ impl SessionConfiguration {
ephemeral: self.original_config_do_not_use.ephemeral,
reasoning_effort: self.collaboration_mode.reasoning_effort(),
personality: self.personality,
active_profile: self.original_config_do_not_use.active_profile.clone(),
session_source: self.session_source.clone(),
}
}
@@ -1573,6 +1588,11 @@ impl Session {
.unwrap_or(u64::MAX),
InitialHistory::New | InitialHistory::Forked(_) => 0,
};
session_configuration.prompt_cache_key = Some(
session_configuration
.prompt_cache_key
.unwrap_or(conversation_id),
);
let state_builder = match &initial_history {
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
resumed.history.as_slice(),
@@ -1909,16 +1929,21 @@ impl Session {
let installation_id = resolve_installation_id(&config.codex_home).await?;
let services = SessionServices {
// Initialize the MCP connection manager with an uninitialized
// instance. It will be replaced with one created via
// McpConnectionManager::new() once all its constructor args are
// available. This also ensures `SessionConfigured` is emitted
// before any MCP-related events. It is reasonable to consider
// changing this to use Option or OnceCell, though the current
// setup is straightforward enough and performs well.
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized(
&config.permissions.approval_policy,
))),
mcp_connection_manager: session_configuration
.inherited_mcp_connection_manager
.clone()
.unwrap_or_else(|| {
// Initialize the MCP connection manager with an uninitialized
// instance. It will be replaced with one created via
// McpConnectionManager::new() once all its constructor args are
// available. This also ensures `SessionConfigured` is emitted
// before any MCP-related events. It is reasonable to consider
// changing this to use Option or OnceCell, though the current
// setup is straightforward enough and performs well.
Arc::new(RwLock::new(McpConnectionManager::new_uninitialized(
&config.permissions.approval_policy,
)))
}),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -1952,6 +1977,9 @@ impl Session {
Some(Arc::clone(&auth_manager)),
conversation_id,
installation_id,
session_configuration
.prompt_cache_key
.unwrap_or(conversation_id),
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
@@ -2028,80 +2056,86 @@ impl Session {
// Start the watcher after SessionConfigured so it cannot emit earlier events.
sess.start_skills_watcher_listener();
// Construct sandbox_state before MCP startup so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
sandbox_cwd: session_configuration.cwd.to_path_buf(),
use_legacy_landlock: config.features.use_legacy_landlock(),
};
let mut required_mcp_servers: Vec<String> = mcp_servers
.iter()
.filter(|(_, server)| server.enabled && server.required)
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref());
if session_configuration
.inherited_mcp_connection_manager
.is_none()
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
cancel_guard.cancel();
*cancel_guard = CancellationToken::new();
}
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
&session_configuration.approval_policy,
INITIAL_SUBMIT_ID.to_owned(),
tx_event.clone(),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
)
.instrument(info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
*manager_guard = mcp_connection_manager;
}
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
if cancel_guard.is_cancelled() {
cancel_token.cancel();
// Construct sandbox_state before MCP startup so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
sandbox_cwd: session_configuration.cwd.to_path_buf(),
use_legacy_landlock: config.features.use_legacy_landlock(),
};
let mut required_mcp_servers: Vec<String> = mcp_servers
.iter()
.filter(|(_, server)| server.enabled && server.required)
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count =
mcp_servers.values().filter(|server| server.enabled).count();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref());
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
cancel_guard.cancel();
*cancel_guard = CancellationToken::new();
}
*cancel_guard = cancel_token;
}
if !required_mcp_servers.is_empty() {
let failures = sess
.services
.mcp_connection_manager
.read()
.await
.required_startup_failures(&required_mcp_servers)
.instrument(info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
.iter()
.map(|failure| format!("{}: {}", failure.server, failure.error))
.collect::<Vec<_>>()
.join("; ");
return Err(anyhow::anyhow!(
"required MCP servers failed to initialize: {details}"
));
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
&session_configuration.approval_policy,
INITIAL_SUBMIT_ID.to_owned(),
tx_event.clone(),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
)
.instrument(info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
*manager_guard = mcp_connection_manager;
}
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
if cancel_guard.is_cancelled() {
cancel_token.cancel();
}
*cancel_guard = cancel_token;
}
if !required_mcp_servers.is_empty() {
let failures = sess
.services
.mcp_connection_manager
.read()
.await
.required_startup_failures(&required_mcp_servers)
.instrument(info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
.iter()
.map(|failure| format!("{}: {}", failure.server, failure.error))
.collect::<Vec<_>>()
.join("; ");
return Err(anyhow::anyhow!(
"required MCP servers failed to initialize: {details}"
));
}
}
}
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
@@ -2137,6 +2171,10 @@ impl Session {
self.services.state_db.clone()
}
pub(crate) fn prompt_cache_key(&self) -> ThreadId {
self.services.model_client.prompt_cache_key()
}
/// Ensure rollout file writes are durably flushed.
pub(crate) async fn flush_rollout(&self) {
let recorder = {

View File

@@ -94,6 +94,10 @@ pub(crate) async fn run_codex_thread_interactive(
inherited_shell_snapshot: None,
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
inherited_prompt_cache_key: Some(parent_session.prompt_cache_key()),
inherited_mcp_connection_manager: Some(Arc::clone(
&parent_session.services.mcp_connection_manager,
)),
parent_trace: None,
})
.await?;

View File

@@ -251,11 +251,13 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
}
fn test_model_client_session() -> crate::client::ModelClientSession {
let thread_id = ThreadId::try_from("00000000-0000-4000-8000-000000000001")
.expect("test thread id should be valid");
crate::client::ModelClient::new(
/*auth_manager*/ None,
ThreadId::try_from("00000000-0000-4000-8000-000000000001")
.expect("test thread id should be valid"),
thread_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
thread_id,
ModelProviderInfo::create_openai_provider(/* base_url */ /*base_url*/ None),
codex_protocol::protocol::SessionSource::Exec,
/*model_verbosity*/ None,
@@ -1872,9 +1874,11 @@ async fn set_rate_limits_retains_previous_credits() {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
prompt_cache_key: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
};
@@ -1974,9 +1978,11 @@ async fn set_rate_limits_updates_plan_type_when_present() {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
prompt_cache_key: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
};
@@ -2323,9 +2329,11 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
prompt_cache_key: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
}
}
@@ -2586,9 +2594,11 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
prompt_cache_key: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
};
@@ -2689,9 +2699,11 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
prompt_cache_key: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
@@ -2761,6 +2773,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
Some(auth_manager.clone()),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
conversation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
@@ -3530,9 +3543,11 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
prompt_cache_key: None,
dynamic_tools,
persist_extended_history: false,
inherited_shell_snapshot: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
@@ -3602,6 +3617,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
Some(Arc::clone(&auth_manager)),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
conversation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,

View File

@@ -455,6 +455,8 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
metrics_service_name: None,
inherited_shell_snapshot: None,
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
inherited_prompt_cache_key: None,
inherited_mcp_connection_manager: None,
user_shell_override: None,
parent_trace: None,
})

View File

@@ -7,6 +7,7 @@ use codex_features::Feature;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::config_types::Verbosity;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ContentItem;
@@ -34,6 +35,10 @@ pub struct ThreadConfigSnapshot {
pub model: String,
pub model_provider_id: String,
pub service_tier: Option<ServiceTier>,
pub plan_mode_reasoning_effort: Option<ReasoningEffort>,
pub model_verbosity: Option<Verbosity>,
pub model_context_window: Option<i64>,
pub model_auto_compact_token_limit: Option<i64>,
pub approval_policy: AskForApproval,
pub approvals_reviewer: ApprovalsReviewer,
pub sandbox_policy: SandboxPolicy,
@@ -41,6 +46,7 @@ pub struct ThreadConfigSnapshot {
pub ephemeral: bool,
pub reasoning_effort: Option<ReasoningEffort>,
pub personality: Option<Personality>,
pub active_profile: Option<String>,
pub session_source: SessionSource,
}

View File

@@ -20,6 +20,7 @@ use codex_app_server_protocol::TurnStatus;
use codex_exec_server::EnvironmentManager;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_mcp::McpConnectionManager;
use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::OPENAI_PROVIDER_ID;
use codex_models_manager::collaboration_mode_presets::CollaborationModesConfig;
@@ -756,6 +757,8 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
/*inherited_prompt_cache_key*/ None,
/*inherited_mcp_connection_manager*/ None,
))
.await
}
@@ -770,6 +773,8 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_prompt_cache_key: Option<ThreadId>,
inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -782,12 +787,15 @@ impl ThreadManagerState {
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn resume_thread_from_rollout_with_source(
&self,
config: Config,
@@ -796,6 +804,8 @@ impl ThreadManagerState {
session_source: SessionSource,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_prompt_cache_key: Option<ThreadId>,
inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
Box::pin(self.spawn_thread_with_source(
@@ -809,6 +819,8 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -825,6 +837,8 @@ impl ThreadManagerState {
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_prompt_cache_key: Option<ThreadId>,
inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -837,6 +851,8 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -868,6 +884,8 @@ impl ThreadManagerState {
metrics_service_name,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
/*inherited_prompt_cache_key*/ None,
/*inherited_mcp_connection_manager*/ None,
parent_trace,
user_shell_override,
))
@@ -887,6 +905,8 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_prompt_cache_key: Option<ThreadId>,
inherited_mcp_connection_manager: Option<Arc<RwLock<McpConnectionManager>>>,
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
@@ -914,6 +934,8 @@ impl ThreadManagerState {
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_prompt_cache_key,
inherited_mcp_connection_manager,
user_shell_override,
parent_trace,
})

View File

@@ -61,17 +61,23 @@ impl ToolHandler for Handler {
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
if !args.fork_context {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
}
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
if args.fork_context {
restore_forked_spawn_agent_model_config(&mut config, turn.as_ref());
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);

View File

@@ -225,7 +225,11 @@ fn build_agent_shared_config(turn: &TurnContext) -> Result<Config, FunctionCallE
let mut config = (*base_config).clone();
config.model = Some(turn.model_info.slug.clone());
config.model_provider = turn.provider.clone();
config.model_reasoning_effort = turn.reasoning_effort;
// Forked children must preserve the spawning turn's effective model settings, including a
// model catalog default effort, so their config snapshot matches the actual request shape.
config.model_reasoning_effort = turn
.reasoning_effort
.or(turn.model_info.default_reasoning_level);
config.model_reasoning_summary = Some(turn.reasoning_summary);
config.developer_instructions = turn.developer_instructions.clone();
config.compact_prompt = turn.compact_prompt.clone();
@@ -234,6 +238,24 @@ fn build_agent_shared_config(turn: &TurnContext) -> Result<Config, FunctionCallE
Ok(config)
}
/// Restores parent-owned model selection after role application on forked spawns.
pub(crate) fn restore_forked_spawn_agent_model_config(config: &mut Config, turn: &TurnContext) {
config.model = Some(turn.model_info.slug.clone());
config.service_tier = turn.config.service_tier;
config.model_provider_id = turn.config.model_provider_id.clone();
config.model_provider = turn.provider.clone();
config.model_reasoning_effort = turn
.reasoning_effort
.or(turn.model_info.default_reasoning_level);
config.plan_mode_reasoning_effort = turn.config.plan_mode_reasoning_effort;
config.model_reasoning_summary = Some(turn.reasoning_summary);
config.model_verbosity = turn.config.model_verbosity;
config.model_context_window = turn.config.model_context_window;
config.model_auto_compact_token_limit = turn.config.model_auto_compact_token_limit;
config.model_supports_reasoning_summaries = turn.config.model_supports_reasoning_summaries;
config.active_profile = turn.config.active_profile.clone();
}
/// Copies runtime-only turn state onto a child config before it is handed to `AgentControl`.
///
/// These values are chosen by the live turn rather than persisted config, so leaving them stale

View File

@@ -2,6 +2,7 @@ use super::*;
use crate::CodexThread;
use crate::ThreadManager;
use crate::codex::make_session_and_context;
use crate::config::AgentRoleConfig;
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
use crate::function_tool::FunctionCallError;
use crate::session_prefix::format_subagent_notification_message;
@@ -23,11 +24,14 @@ use codex_login::CodexAuth;
use codex_model_provider_info::built_in_model_providers;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::config_types::Verbosity;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
@@ -90,6 +94,61 @@ fn thread_manager() -> ThreadManager {
)
}
async fn install_role_with_model_provider_and_profile_override(turn: &mut TurnContext) -> String {
let role_name = "fork-context-role".to_string();
tokio::fs::create_dir_all(&turn.config.codex_home)
.await
.expect("codex home should be created");
let role_config_path = turn.config.codex_home.join("fork-context-role.toml");
tokio::fs::write(
&role_config_path,
r#"developer_instructions = "Forked children should keep the parent model config."
model_provider = "role-provider"
model_context_window = 12345
model_auto_compact_token_limit = 1234
model_verbosity = "low"
plan_mode_reasoning_effort = "minimal"
profile = "role-profile"
service_tier = "fast"
[model_providers.role-provider]
name = "Role Provider"
base_url = "https://role.example.com/v1"
env_key = "ROLE_PROVIDER_API_KEY"
wire_api = "responses"
[profiles.role-profile]
model_provider = "role-provider"
"#,
)
.await
.expect("role config should be written");
let mut config = (*turn.config).clone();
let mut role_provider =
built_in_model_providers(/* openai_base_url */ /*openai_base_url*/ None)["openai"].clone();
role_provider.name = "Role Provider".to_string();
config
.model_providers
.insert("role-provider".to_string(), role_provider);
config.service_tier = Some(ServiceTier::Flex);
config.plan_mode_reasoning_effort = Some(ReasoningEffort::High);
config.model_verbosity = Some(Verbosity::High);
config.model_context_window = Some(200_000);
config.model_auto_compact_token_limit = Some(180_000);
config.agent_roles.insert(
role_name.clone(),
AgentRoleConfig {
description: Some("Role with model-provider and profile overrides".to_string()),
config_file: Some(role_config_path),
nickname_candidates: None,
},
);
turn.config = Arc::new(config);
role_name
}
fn history_contains_inter_agent_communication(
history_items: &[ResponseItem],
expected: &InterAgentCommunication,
@@ -366,6 +425,154 @@ async fn spawn_agent_uses_explorer_role_and_preserves_approval_policy() {
assert_eq!(snapshot.model_provider_id, "ollama");
}
#[tokio::test]
async fn spawn_agent_fork_context_ignores_child_model_overrides() {
let (mut session, mut turn) = make_session_and_context().await;
let role_name = install_role_with_model_provider_and_profile_override(&mut turn).await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let expected_model = turn.model_info.slug.clone();
let expected_model_provider_id = turn.config.model_provider_id.clone();
let expected_active_profile = turn.config.active_profile.clone();
let expected_reasoning_effort = turn.reasoning_effort;
let expected_service_tier = turn.config.service_tier;
let expected_plan_mode_reasoning_effort = turn.config.plan_mode_reasoning_effort;
let expected_model_verbosity = turn.config.model_verbosity;
let expected_model_context_window = turn.config.model_context_window;
let expected_model_auto_compact_token_limit = turn.config.model_auto_compact_token_limit;
let output = SpawnAgentHandler
.handle(invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"agent_type": role_name,
"model": "not-a-real-model",
"reasoning_effort": "low",
"fork_context": true
})),
))
.await
.expect("spawn_agent should succeed");
let (content, _) = expect_text_output(output);
let result: serde_json::Value =
serde_json::from_str(&content).expect("spawn_agent result should be json");
let agent_id = parse_agent_id(
result["agent_id"]
.as_str()
.expect("spawn_agent result should include agent_id"),
);
let snapshot = manager
.get_thread(agent_id)
.await
.expect("spawned agent thread should exist")
.config_snapshot()
.await;
assert_eq!(snapshot.model, expected_model);
assert_eq!(snapshot.model_provider_id, expected_model_provider_id);
assert_eq!(snapshot.active_profile, expected_active_profile);
assert_eq!(snapshot.reasoning_effort, expected_reasoning_effort);
assert_eq!(snapshot.service_tier, expected_service_tier);
assert_eq!(
snapshot.plan_mode_reasoning_effort,
expected_plan_mode_reasoning_effort
);
assert_eq!(snapshot.model_verbosity, expected_model_verbosity);
assert_eq!(snapshot.model_context_window, expected_model_context_window);
assert_eq!(
snapshot.model_auto_compact_token_limit,
expected_model_auto_compact_token_limit
);
}
#[tokio::test]
async fn multi_agent_v2_spawn_fork_turns_ignores_child_model_overrides() {
let (mut session, mut turn) = make_session_and_context().await;
let role_name = install_role_with_model_provider_and_profile_override(&mut turn).await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = (*turn.config).clone();
config
.features
.enable(Feature::MultiAgentV2)
.expect("test config should allow feature update");
let turn = TurnContext {
config: Arc::new(config),
..turn
};
let expected_model = turn.model_info.slug.clone();
let expected_model_provider_id = turn.config.model_provider_id.clone();
let expected_active_profile = turn.config.active_profile.clone();
let expected_reasoning_effort = turn.reasoning_effort;
let expected_service_tier = turn.config.service_tier;
let expected_plan_mode_reasoning_effort = turn.config.plan_mode_reasoning_effort;
let expected_model_verbosity = turn.config.model_verbosity;
let expected_model_context_window = turn.config.model_context_window;
let expected_model_auto_compact_token_limit = turn.config.model_auto_compact_token_limit;
let output = SpawnAgentHandlerV2
.handle(invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"agent_type": role_name,
"model": "not-a-real-model",
"reasoning_effort": "low",
"fork_turns": "all",
"task_name": "fork_context_v2"
})),
))
.await
.expect("spawn_agent should succeed");
let (content, _) = expect_text_output(output);
let result: serde_json::Value =
serde_json::from_str(&content).expect("spawn_agent result should be json");
assert_eq!(result["task_name"], "/root/fork_context_v2");
let agent_id = manager
.captured_ops()
.into_iter()
.map(|(thread_id, _)| thread_id)
.find(|thread_id| *thread_id != root.thread_id)
.expect("spawned agent should receive an op");
let snapshot = manager
.get_thread(agent_id)
.await
.expect("spawned agent thread should exist")
.config_snapshot()
.await;
assert_eq!(snapshot.model, expected_model);
assert_eq!(snapshot.model_provider_id, expected_model_provider_id);
assert_eq!(snapshot.active_profile, expected_active_profile);
assert_eq!(snapshot.reasoning_effort, expected_reasoning_effort);
assert_eq!(snapshot.service_tier, expected_service_tier);
assert_eq!(
snapshot.plan_mode_reasoning_effort,
expected_plan_mode_reasoning_effort
);
assert_eq!(snapshot.model_verbosity, expected_model_verbosity);
assert_eq!(snapshot.model_context_window, expected_model_context_window);
assert_eq!(
snapshot.model_auto_compact_token_limit,
expected_model_auto_compact_token_limit
);
}
#[tokio::test]
async fn spawn_agent_returns_agent_id_without_task_name() {
let (mut session, turn) = make_session_and_context().await;

View File

@@ -71,17 +71,23 @@ impl ToolHandler for Handler {
.await;
let mut config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
if fork_mode.is_none() {
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
}
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
if fork_mode.is_some() {
restore_forked_spawn_agent_model_config(&mut config, turn.as_ref());
}
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
config.developer_instructions = Some(

View File

@@ -101,6 +101,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
conversation_id,
provider.clone(),
session_source,
config.model_verbosity,
@@ -226,6 +227,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
conversation_id,
provider.clone(),
session_source,
config.model_verbosity,
@@ -340,6 +342,7 @@ async fn responses_respects_model_info_overrides_from_config() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
conversation_id,
provider.clone(),
session_source,
config.model_verbosity,

View File

@@ -879,6 +879,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth
))),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
conversation_id,
provider,
SessionSource::Exec,
config.model_verbosity,
@@ -2157,6 +2158,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
conversation_id,
provider.clone(),
SessionSource::Exec,
config.model_verbosity,

View File

@@ -1762,6 +1762,7 @@ async fn websocket_harness_with_provider_options(
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
conversation_id,
provider.clone(),
SessionSource::Exec,
config.model_verbosity,