diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 3d50f7735e..1b81af7058 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -163,11 +163,20 @@ use codex_protocol::error::Result as CodexResult; use codex_protocol::exec_output::StreamOutput; mod handlers; +mod mcp; +mod review; mod rollout_reconstruction; +mod session; mod turn; +mod turn_context; #[cfg(test)] use self::handlers::submission_dispatch_span; use self::handlers::submission_loop; +use self::review::spawn_review_thread; +pub(crate) use self::session::AppServerClientMetadata; +pub(crate) use self::session::Session; +pub(crate) use self::session::SessionConfiguration; +pub(crate) use self::session::SessionSettingsUpdate; #[cfg(test)] use self::turn::AssistantMessageStreamParsers; pub(crate) use self::turn::build_prompt; @@ -179,6 +188,8 @@ use self::turn::filter_connectors_for_input; pub(crate) use self::turn::get_last_assistant_message_from_turn; use self::turn::realtime_text_for_event; pub(crate) use self::turn::run_turn; +pub(crate) use self::turn_context::TurnContext; +pub(crate) use self::turn_context::TurnSkillsContext; #[cfg(test)] mod rollout_reconstruction_tests; @@ -339,13 +350,6 @@ use codex_utils_readiness::ReadinessFlag; #[cfg(test)] use codex_utils_stream_parser::ProposedPlanSegment; -fn image_generation_tool_auth_allowed(auth_manager: Option<&AuthManager>) -> bool { - matches!( - auth_manager.and_then(AuthManager::auth_mode), - Some(AuthMode::Chatgpt) - ) -} - /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. pub struct Codex { @@ -786,317 +790,6 @@ pub(crate) fn session_loop_termination_from_handle( .shared() } -/// Context for an initialized model agent -/// -/// A session has at most 1 running task at a time, and can be interrupted by user input. -pub(crate) struct Session { - pub(crate) conversation_id: ThreadId, - tx_event: Sender, - agent_status: watch::Sender, - out_of_band_elicitation_paused: watch::Sender, - state: Mutex, - /// Serializes rebuild/apply cycles for the running proxy; each cycle - /// rebuilds from the current SessionState while holding this lock. - managed_network_proxy_refresh_lock: Mutex<()>, - /// The set of enabled features should be invariant for the lifetime of the - /// session. - features: ManagedFeatures, - pending_mcp_server_refresh_config: Mutex>, - pub(crate) conversation: Arc, - pub(crate) active_turn: Mutex>, - mailbox: Mailbox, - mailbox_rx: Mutex, - idle_pending_input: Mutex>, // TODO (jif) merge with mailbox! - pub(crate) guardian_review_session: GuardianReviewSessionManager, - pub(crate) services: SessionServices, - js_repl: Arc, - next_internal_sub_id: AtomicU64, -} - -#[derive(Clone, Debug)] -pub(crate) struct TurnSkillsContext { - pub(crate) outcome: Arc, - pub(crate) implicit_invocation_seen_skills: Arc>>, -} - -impl TurnSkillsContext { - pub(crate) fn new(outcome: Arc) -> Self { - Self { - outcome, - implicit_invocation_seen_skills: Arc::new(Mutex::new(HashSet::new())), - } - } -} - -/// The context needed for a single turn of the thread. -#[derive(Debug)] -pub(crate) struct TurnContext { - pub(crate) sub_id: String, - pub(crate) trace_id: Option, - pub(crate) realtime_active: bool, - pub(crate) config: Arc, - pub(crate) auth_manager: Option>, - pub(crate) model_info: ModelInfo, - pub(crate) session_telemetry: SessionTelemetry, - pub(crate) provider: ModelProviderInfo, - pub(crate) reasoning_effort: Option, - pub(crate) reasoning_summary: ReasoningSummaryConfig, - pub(crate) session_source: SessionSource, - pub(crate) environment: Option>, - /// The session's absolute working directory. All relative paths provided - /// by the model as well as sandbox policies are resolved against this path - /// instead of `std::env::current_dir()`. - pub(crate) cwd: AbsolutePathBuf, - pub(crate) current_date: Option, - pub(crate) timezone: Option, - pub(crate) app_server_client_name: Option, - pub(crate) developer_instructions: Option, - pub(crate) compact_prompt: Option, - pub(crate) user_instructions: Option, - pub(crate) collaboration_mode: CollaborationMode, - pub(crate) personality: Option, - pub(crate) approval_policy: Constrained, - pub(crate) sandbox_policy: Constrained, - pub(crate) file_system_sandbox_policy: FileSystemSandboxPolicy, - pub(crate) network_sandbox_policy: NetworkSandboxPolicy, - pub(crate) network: Option, - pub(crate) windows_sandbox_level: WindowsSandboxLevel, - pub(crate) shell_environment_policy: ShellEnvironmentPolicy, - pub(crate) tools_config: ToolsConfig, - pub(crate) features: ManagedFeatures, - pub(crate) ghost_snapshot: GhostSnapshotConfig, - pub(crate) final_output_json_schema: Option, - pub(crate) codex_self_exe: Option, - pub(crate) codex_linux_sandbox_exe: Option, - pub(crate) tool_call_gate: Arc, - pub(crate) truncation_policy: TruncationPolicy, - pub(crate) js_repl: Arc, - pub(crate) dynamic_tools: Vec, - pub(crate) turn_metadata_state: Arc, - pub(crate) turn_skills: TurnSkillsContext, - pub(crate) turn_timing_state: Arc, -} -impl TurnContext { - pub(crate) fn model_context_window(&self) -> Option { - let effective_context_window_percent = self.model_info.effective_context_window_percent; - self.model_info.context_window.map(|context_window| { - context_window.saturating_mul(effective_context_window_percent) / 100 - }) - } - - pub(crate) fn apps_enabled(&self) -> bool { - let is_chatgpt_auth = self - .auth_manager - .as_deref() - .and_then(AuthManager::auth_cached) - .as_ref() - .is_some_and(CodexAuth::is_chatgpt_auth); - self.features.apps_enabled_for_auth(is_chatgpt_auth) - } - - pub(crate) async fn with_model(&self, model: String, models_manager: &ModelsManager) -> Self { - let mut config = (*self.config).clone(); - config.model = Some(model.clone()); - let model_info = models_manager - .get_model_info(model.as_str(), &config.to_models_manager_config()) - .await; - let truncation_policy = model_info.truncation_policy.into(); - let supported_reasoning_levels = model_info - .supported_reasoning_levels - .iter() - .map(|preset| preset.effort) - .collect::>(); - let reasoning_effort = if let Some(current_reasoning_effort) = self.reasoning_effort { - if supported_reasoning_levels.contains(¤t_reasoning_effort) { - Some(current_reasoning_effort) - } else { - supported_reasoning_levels - .get(supported_reasoning_levels.len().saturating_sub(1) / 2) - .copied() - .or(model_info.default_reasoning_level) - } - } else { - supported_reasoning_levels - .get(supported_reasoning_levels.len().saturating_sub(1) / 2) - .copied() - .or(model_info.default_reasoning_level) - }; - config.model_reasoning_effort = reasoning_effort; - - let collaboration_mode = self.collaboration_mode.with_updates( - Some(model.clone()), - Some(reasoning_effort), - /*developer_instructions*/ None, - ); - let features = self.features.clone(); - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_info: &model_info, - available_models: &models_manager - .list_models(RefreshStrategy::OnlineIfUncached) - .await, - features: &features, - image_generation_tool_auth_allowed: image_generation_tool_auth_allowed( - self.auth_manager.as_deref(), - ), - web_search_mode: self.tools_config.web_search_mode, - session_source: self.session_source.clone(), - sandbox_policy: self.sandbox_policy.get(), - windows_sandbox_level: self.windows_sandbox_level, - }) - .with_unified_exec_shell_mode(self.tools_config.unified_exec_shell_mode.clone()) - .with_web_search_config(self.tools_config.web_search_config.clone()) - .with_allow_login_shell(self.tools_config.allow_login_shell) - .with_has_environment(self.tools_config.has_environment) - .with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled) - .with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone()) - .with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata) - .with_agent_type_description(crate::agent::role::spawn_tool_spec::build( - &config.agent_roles, - )); - - Self { - sub_id: self.sub_id.clone(), - trace_id: self.trace_id.clone(), - realtime_active: self.realtime_active, - config: Arc::new(config), - auth_manager: self.auth_manager.clone(), - model_info: model_info.clone(), - session_telemetry: self - .session_telemetry - .clone() - .with_model(model.as_str(), model_info.slug.as_str()), - provider: self.provider.clone(), - reasoning_effort, - reasoning_summary: self.reasoning_summary, - session_source: self.session_source.clone(), - environment: self.environment.clone(), - cwd: self.cwd.clone(), - current_date: self.current_date.clone(), - timezone: self.timezone.clone(), - app_server_client_name: self.app_server_client_name.clone(), - developer_instructions: self.developer_instructions.clone(), - compact_prompt: self.compact_prompt.clone(), - user_instructions: self.user_instructions.clone(), - collaboration_mode, - personality: self.personality, - approval_policy: self.approval_policy.clone(), - sandbox_policy: self.sandbox_policy.clone(), - file_system_sandbox_policy: self.file_system_sandbox_policy.clone(), - network_sandbox_policy: self.network_sandbox_policy, - network: self.network.clone(), - windows_sandbox_level: self.windows_sandbox_level, - shell_environment_policy: self.shell_environment_policy.clone(), - tools_config, - features, - ghost_snapshot: self.ghost_snapshot.clone(), - final_output_json_schema: self.final_output_json_schema.clone(), - codex_self_exe: self.codex_self_exe.clone(), - codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), - tool_call_gate: Arc::new(ReadinessFlag::new()), - truncation_policy, - js_repl: Arc::clone(&self.js_repl), - dynamic_tools: self.dynamic_tools.clone(), - turn_metadata_state: self.turn_metadata_state.clone(), - turn_skills: self.turn_skills.clone(), - turn_timing_state: Arc::clone(&self.turn_timing_state), - } - } - - pub(crate) fn resolve_path(&self, path: Option) -> AbsolutePathBuf { - path.as_ref() - .map_or_else(|| self.cwd.clone(), |path| self.cwd.join(path)) - } - - pub(crate) fn file_system_sandbox_context( - &self, - additional_permissions: Option, - ) -> FileSystemSandboxContext { - FileSystemSandboxContext { - sandbox_policy: self.sandbox_policy.get().clone(), - windows_sandbox_level: self.windows_sandbox_level, - windows_sandbox_private_desktop: self - .config - .permissions - .windows_sandbox_private_desktop, - use_legacy_landlock: self.features.use_legacy_landlock(), - additional_permissions, - } - } - - pub(crate) fn compact_prompt(&self) -> &str { - self.compact_prompt - .as_deref() - .unwrap_or(compact::SUMMARIZATION_PROMPT) - } - - pub(crate) fn to_turn_context_item(&self) -> TurnContextItem { - let legacy_file_system_sandbox_policy = FileSystemSandboxPolicy::from_legacy_sandbox_policy( - self.sandbox_policy.get(), - &self.cwd, - ); - // Omit the derived split filesystem policy when it is equivalent to - // the legacy sandbox policy. This keeps turn-context payloads stable - // while both fields exist; once callers consume only the split policy, - // this comparison and the legacy projection should go away. - let file_system_sandbox_policy = (self.file_system_sandbox_policy - != legacy_file_system_sandbox_policy) - .then(|| self.file_system_sandbox_policy.clone()); - - TurnContextItem { - turn_id: Some(self.sub_id.clone()), - trace_id: self.trace_id.clone(), - cwd: self.cwd.to_path_buf(), - current_date: self.current_date.clone(), - timezone: self.timezone.clone(), - approval_policy: self.approval_policy.value(), - sandbox_policy: self.sandbox_policy.get().clone(), - network: self.turn_context_network_item(), - file_system_sandbox_policy, - model: self.model_info.slug.clone(), - personality: self.personality, - collaboration_mode: Some(self.collaboration_mode.clone()), - realtime_active: Some(self.realtime_active), - effort: self.reasoning_effort, - summary: self.reasoning_summary, - user_instructions: self.user_instructions.clone(), - developer_instructions: self.developer_instructions.clone(), - final_output_json_schema: self.final_output_json_schema.clone(), - truncation_policy: Some(self.truncation_policy), - } - } - - fn turn_context_network_item(&self) -> Option { - let network = self - .config - .config_layer_stack - .requirements() - .network - .as_ref()?; - Some(TurnContextNetworkItem { - allowed_domains: network - .domains - .as_ref() - .and_then(codex_config::NetworkDomainPermissionsToml::allowed_domains) - .unwrap_or_default(), - denied_domains: network - .domains - .as_ref() - .and_then(codex_config::NetworkDomainPermissionsToml::denied_domains) - .unwrap_or_default(), - }) - } -} - -fn local_time_context() -> (String, String) { - match iana_time_zone::get_timezone() { - Ok(timezone) => (Local::now().format("%Y-%m-%d").to_string(), timezone), - Err(_) => ( - Utc::now().format("%Y-%m-%d").to_string(), - "Etc/UTC".to_string(), - ), - } -} - async fn thread_title_from_state_db( state_db: Option<&state_db::StateDbHandle>, codex_home: &AbsolutePathBuf, @@ -1117,183 +810,6 @@ async fn thread_title_from_state_db( .flatten() } -#[derive(Clone)] -pub(crate) struct SessionConfiguration { - /// Provider identifier ("openai", "openrouter", ...). - provider: ModelProviderInfo, - - collaboration_mode: CollaborationMode, - model_reasoning_summary: Option, - service_tier: Option, - - /// Developer instructions that supplement the base instructions. - developer_instructions: Option, - - /// Model instructions that are appended to the base instructions. - user_instructions: Option, - - /// Personality preference for the model. - personality: Option, - - /// Base instructions for the session. - base_instructions: String, - - /// Compact prompt override. - compact_prompt: Option, - - /// When to escalate for approval for execution - approval_policy: Constrained, - approvals_reviewer: ApprovalsReviewer, - /// How to sandbox commands executed in the system - sandbox_policy: Constrained, - file_system_sandbox_policy: FileSystemSandboxPolicy, - network_sandbox_policy: NetworkSandboxPolicy, - windows_sandbox_level: WindowsSandboxLevel, - - /// Absolute working directory that should be treated as the *root* of the - /// session. All relative paths supplied by the model as well as the - /// execution sandbox are resolved against this directory **instead** of - /// the process-wide current working directory. - cwd: AbsolutePathBuf, - /// Directory containing all Codex state for this session. - codex_home: AbsolutePathBuf, - /// Optional user-facing name for the thread, updated during the session. - thread_name: Option, - - // TODO(pakrym): Remove config from here - original_config_do_not_use: Arc, - /// Optional service name tag for session metrics. - metrics_service_name: Option, - app_server_client_name: Option, - app_server_client_version: Option, - /// Source of the session (cli, vscode, exec, mcp, ...) - session_source: SessionSource, - dynamic_tools: Vec, - persist_extended_history: bool, - inherited_shell_snapshot: Option>, - user_shell_override: Option, -} - -impl SessionConfiguration { - pub(crate) fn codex_home(&self) -> &AbsolutePathBuf { - &self.codex_home - } - - fn thread_config_snapshot(&self) -> ThreadConfigSnapshot { - ThreadConfigSnapshot { - model: self.collaboration_mode.model().to_string(), - model_provider_id: self.original_config_do_not_use.model_provider_id.clone(), - service_tier: self.service_tier, - approval_policy: self.approval_policy.value(), - approvals_reviewer: self.approvals_reviewer, - sandbox_policy: self.sandbox_policy.get().clone(), - cwd: self.cwd.clone(), - ephemeral: self.original_config_do_not_use.ephemeral, - reasoning_effort: self.collaboration_mode.reasoning_effort(), - personality: self.personality, - session_source: self.session_source.clone(), - } - } - - pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> ConstraintResult { - let mut next_configuration = self.clone(); - let file_system_policy_matches_legacy = self.file_system_sandbox_policy - == FileSystemSandboxPolicy::from_legacy_sandbox_policy( - self.sandbox_policy.get(), - &self.cwd, - ); - if let Some(collaboration_mode) = updates.collaboration_mode.clone() { - next_configuration.collaboration_mode = collaboration_mode; - } - if let Some(summary) = updates.reasoning_summary { - next_configuration.model_reasoning_summary = Some(summary); - } - if let Some(service_tier) = updates.service_tier { - next_configuration.service_tier = service_tier; - } - if let Some(personality) = updates.personality { - next_configuration.personality = Some(personality); - } - if let Some(approval_policy) = updates.approval_policy { - next_configuration.approval_policy.set(approval_policy)?; - } - if let Some(approvals_reviewer) = updates.approvals_reviewer { - next_configuration.approvals_reviewer = approvals_reviewer; - } - let mut sandbox_policy_changed = false; - if let Some(sandbox_policy) = updates.sandbox_policy.clone() { - next_configuration.sandbox_policy.set(sandbox_policy)?; - next_configuration.network_sandbox_policy = - NetworkSandboxPolicy::from(next_configuration.sandbox_policy.get()); - sandbox_policy_changed = true; - } - if let Some(windows_sandbox_level) = updates.windows_sandbox_level { - next_configuration.windows_sandbox_level = windows_sandbox_level; - } - - let absolute_cwd = updates - .cwd - .as_ref() - .map(|cwd| { - AbsolutePathBuf::relative_to_current_dir(normalize_for_native_workdir( - cwd.as_path(), - )) - .unwrap_or_else(|e| { - warn!("failed to normalize update cwd: {cwd:?}: {e}"); - self.cwd.clone() - }) - }) - .unwrap_or_else(|| self.cwd.clone()); - - let cwd_changed = absolute_cwd.as_path() != self.cwd.as_path(); - next_configuration.cwd = absolute_cwd; - if sandbox_policy_changed { - next_configuration.file_system_sandbox_policy = - FileSystemSandboxPolicy::from_legacy_sandbox_policy_preserving_deny_entries( - next_configuration.sandbox_policy.get(), - &next_configuration.cwd, - &self.file_system_sandbox_policy, - ); - } else if cwd_changed && file_system_policy_matches_legacy { - // Preserve richer split policies across cwd-only updates; only - // rederive when the session is already using the legacy bridge. - next_configuration.file_system_sandbox_policy = - FileSystemSandboxPolicy::from_legacy_sandbox_policy( - next_configuration.sandbox_policy.get(), - &next_configuration.cwd, - ); - } - if let Some(app_server_client_name) = updates.app_server_client_name.clone() { - next_configuration.app_server_client_name = Some(app_server_client_name); - } - if let Some(app_server_client_version) = updates.app_server_client_version.clone() { - next_configuration.app_server_client_version = Some(app_server_client_version); - } - Ok(next_configuration) - } -} - -#[derive(Default, Clone)] -pub(crate) struct SessionSettingsUpdate { - pub(crate) cwd: Option, - pub(crate) approval_policy: Option, - pub(crate) approvals_reviewer: Option, - pub(crate) sandbox_policy: Option, - pub(crate) windows_sandbox_level: Option, - pub(crate) collaboration_mode: Option, - pub(crate) reasoning_summary: Option, - pub(crate) service_tier: Option>, - pub(crate) final_output_json_schema: Option>, - pub(crate) personality: Option, - pub(crate) app_server_client_name: Option, - pub(crate) app_server_client_version: Option, -} - -pub(crate) struct AppServerClientMetadata { - pub(crate) client_name: Option, - pub(crate) client_version: Option, -} - impl Session { pub(crate) async fn app_server_client_metadata(&self) -> AppServerClientMetadata { let state = self.state.lock().await; @@ -1417,38 +933,6 @@ impl Session { } } - /// Don't expand the number of mutated arguments on config. We are in the process of getting rid of it. - pub(crate) fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config { - // todo(aibrahim): store this state somewhere else so we don't need to mut config - let config = session_configuration.original_config_do_not_use.clone(); - let mut per_turn_config = (*config).clone(); - per_turn_config.cwd = session_configuration.cwd.clone(); - per_turn_config.model_reasoning_effort = - session_configuration.collaboration_mode.reasoning_effort(); - per_turn_config.model_reasoning_summary = session_configuration.model_reasoning_summary; - per_turn_config.service_tier = session_configuration.service_tier; - per_turn_config.personality = session_configuration.personality; - per_turn_config.approvals_reviewer = session_configuration.approvals_reviewer; - let resolved_web_search_mode = resolve_web_search_mode_for_turn( - &per_turn_config.web_search_mode, - session_configuration.sandbox_policy.get(), - ); - if let Err(err) = per_turn_config - .web_search_mode - .set(resolved_web_search_mode) - { - let fallback_value = per_turn_config.web_search_mode.value(); - tracing::warn!( - error = %err, - ?resolved_web_search_mode, - ?fallback_value, - "resolved web_search_mode is disallowed by requirements; keeping constrained value" - ); - } - per_turn_config.features = config.features.clone(); - per_turn_config - } - pub(crate) async fn codex_home(&self) -> AbsolutePathBuf { let state = self.state.lock().await; state.session_configuration.codex_home().clone() @@ -1618,758 +1102,6 @@ impl Session { Ok(None) } - #[allow(clippy::too_many_arguments)] - fn make_turn_context( - conversation_id: ThreadId, - auth_manager: Option>, - session_telemetry: &SessionTelemetry, - provider: ModelProviderInfo, - session_configuration: &SessionConfiguration, - user_shell: &shell::Shell, - shell_zsh_path: Option<&PathBuf>, - main_execve_wrapper_exe: Option<&PathBuf>, - per_turn_config: Config, - model_info: ModelInfo, - models_manager: &ModelsManager, - network: Option, - environment: Option>, - sub_id: String, - js_repl: Arc, - skills_outcome: Arc, - ) -> TurnContext { - let reasoning_effort = session_configuration.collaboration_mode.reasoning_effort(); - let reasoning_summary = session_configuration - .model_reasoning_summary - .unwrap_or(model_info.default_reasoning_summary); - let session_telemetry = session_telemetry.clone().with_model( - session_configuration.collaboration_mode.model(), - model_info.slug.as_str(), - ); - let session_source = session_configuration.session_source.clone(); - let image_generation_tool_auth_allowed = - image_generation_tool_auth_allowed(auth_manager.as_deref()); - let auth_manager_for_context = auth_manager; - let provider_for_context = provider; - let session_telemetry_for_context = session_telemetry; - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_info: &model_info, - available_models: &models_manager.try_list_models().unwrap_or_default(), - features: &per_turn_config.features, - image_generation_tool_auth_allowed, - web_search_mode: Some(per_turn_config.web_search_mode.value()), - session_source: session_source.clone(), - sandbox_policy: session_configuration.sandbox_policy.get(), - windows_sandbox_level: session_configuration.windows_sandbox_level, - }) - .with_unified_exec_shell_mode_for_session( - crate::tools::spec::tool_user_shell_type(user_shell), - shell_zsh_path, - main_execve_wrapper_exe, - ) - .with_web_search_config(per_turn_config.web_search_config.clone()) - .with_allow_login_shell(per_turn_config.permissions.allow_login_shell) - .with_has_environment(environment.is_some()) - .with_spawn_agent_usage_hint(per_turn_config.multi_agent_v2.usage_hint_enabled) - .with_spawn_agent_usage_hint_text(per_turn_config.multi_agent_v2.usage_hint_text.clone()) - .with_hide_spawn_agent_metadata(per_turn_config.multi_agent_v2.hide_spawn_agent_metadata) - .with_agent_type_description(crate::agent::role::spawn_tool_spec::build( - &per_turn_config.agent_roles, - )); - - let cwd = session_configuration.cwd.clone(); - - let per_turn_config = Arc::new(per_turn_config); - let turn_metadata_state = Arc::new(TurnMetadataState::new( - conversation_id.to_string(), - &session_source, - sub_id.clone(), - cwd.clone(), - session_configuration.sandbox_policy.get(), - session_configuration.windows_sandbox_level, - )); - let (current_date, timezone) = local_time_context(); - TurnContext { - sub_id, - trace_id: current_span_trace_id(), - realtime_active: false, - config: per_turn_config.clone(), - auth_manager: auth_manager_for_context, - model_info: model_info.clone(), - session_telemetry: session_telemetry_for_context, - provider: provider_for_context, - reasoning_effort, - reasoning_summary, - session_source, - environment, - cwd, - current_date: Some(current_date), - timezone: Some(timezone), - app_server_client_name: session_configuration.app_server_client_name.clone(), - developer_instructions: session_configuration.developer_instructions.clone(), - compact_prompt: session_configuration.compact_prompt.clone(), - user_instructions: session_configuration.user_instructions.clone(), - collaboration_mode: session_configuration.collaboration_mode.clone(), - personality: session_configuration.personality, - approval_policy: session_configuration.approval_policy.clone(), - sandbox_policy: session_configuration.sandbox_policy.clone(), - file_system_sandbox_policy: session_configuration.file_system_sandbox_policy.clone(), - network_sandbox_policy: session_configuration.network_sandbox_policy, - network, - windows_sandbox_level: session_configuration.windows_sandbox_level, - shell_environment_policy: per_turn_config.permissions.shell_environment_policy.clone(), - tools_config, - features: per_turn_config.features.clone(), - ghost_snapshot: per_turn_config.ghost_snapshot.clone(), - final_output_json_schema: None, - codex_self_exe: per_turn_config.codex_self_exe.clone(), - codex_linux_sandbox_exe: per_turn_config.codex_linux_sandbox_exe.clone(), - tool_call_gate: Arc::new(ReadinessFlag::new()), - truncation_policy: model_info.truncation_policy.into(), - js_repl, - dynamic_tools: session_configuration.dynamic_tools.clone(), - turn_metadata_state, - turn_skills: TurnSkillsContext::new(skills_outcome), - turn_timing_state: Arc::new(TurnTimingState::default()), - } - } - - #[instrument(name = "session_init", level = "info", skip_all)] - #[allow(clippy::too_many_arguments)] - async fn new( - mut session_configuration: SessionConfiguration, - config: Arc, - auth_manager: Arc, - models_manager: Arc, - exec_policy: Arc, - tx_event: Sender, - agent_status: watch::Sender, - initial_history: InitialHistory, - session_source: SessionSource, - skills_manager: Arc, - plugins_manager: Arc, - mcp_manager: Arc, - skills_watcher: Arc, - agent_control: AgentControl, - environment: Option>, - analytics_events_client: Option, - ) -> anyhow::Result> { - debug!( - "Configuring session: model={}; provider={:?}", - session_configuration.collaboration_mode.model(), - session_configuration.provider - ); - let forked_from_id = initial_history.forked_from_id(); - - let (conversation_id, rollout_params) = match &initial_history { - InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => { - let conversation_id = ThreadId::default(); - ( - conversation_id, - RolloutRecorderParams::new( - conversation_id, - forked_from_id, - session_source, - BaseInstructions { - text: session_configuration.base_instructions.clone(), - }, - session_configuration.dynamic_tools.clone(), - if session_configuration.persist_extended_history { - EventPersistenceMode::Extended - } else { - EventPersistenceMode::Limited - }, - ), - ) - } - InitialHistory::Resumed(resumed_history) => ( - resumed_history.conversation_id, - RolloutRecorderParams::resume( - resumed_history.rollout_path.clone(), - if session_configuration.persist_extended_history { - EventPersistenceMode::Extended - } else { - EventPersistenceMode::Limited - }, - ), - ), - }; - let window_generation = match &initial_history { - InitialHistory::Resumed(resumed_history) => u64::try_from( - resumed_history - .history - .iter() - .filter(|item| matches!(item, RolloutItem::Compacted(_))) - .count(), - ) - .unwrap_or(u64::MAX), - InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => 0, - }; - let state_builder = match &initial_history { - InitialHistory::Resumed(resumed) => metadata::builder_from_items( - resumed.history.as_slice(), - resumed.rollout_path.as_path(), - ), - InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None, - }; - - // Kick off independent async setup tasks in parallel to reduce startup latency. - // - // - initialize RolloutRecorder with new or resumed session info - // - perform default shell discovery - // - load history metadata (skipped for subagents) - let rollout_fut = async { - if config.ephemeral { - Ok::<_, anyhow::Error>((None, None)) - } else { - let state_db_ctx = state_db::init(&config).await; - let rollout_recorder = RolloutRecorder::new( - &config, - rollout_params, - state_db_ctx.clone(), - state_builder.clone(), - ) - .await?; - Ok((Some(rollout_recorder), state_db_ctx)) - } - } - .instrument(info_span!( - "session_init.rollout", - otel.name = "session_init.rollout", - session_init.ephemeral = config.ephemeral, - )); - - let is_subagent = matches!( - session_configuration.session_source, - SessionSource::SubAgent(_) - ); - let history_meta_fut = async { - if is_subagent { - (0, 0) - } else { - crate::message_history::history_metadata(&config).await - } - } - .instrument(info_span!( - "session_init.history_metadata", - otel.name = "session_init.history_metadata", - session_init.is_subagent = is_subagent, - )); - let auth_manager_clone = Arc::clone(&auth_manager); - let config_for_mcp = Arc::clone(&config); - let mcp_manager_for_mcp = Arc::clone(&mcp_manager); - let auth_and_mcp_fut = async move { - let auth = auth_manager_clone.auth().await; - let mcp_servers = mcp_manager_for_mcp - .effective_servers(&config_for_mcp, auth.as_ref()) - .await; - let auth_statuses = compute_auth_statuses( - mcp_servers.iter(), - config_for_mcp.mcp_oauth_credentials_store_mode, - ) - .await; - (auth, mcp_servers, auth_statuses) - } - .instrument(info_span!( - "session_init.auth_mcp", - otel.name = "session_init.auth_mcp", - )); - - // Join all independent futures. - let ( - rollout_recorder_and_state_db, - (history_log_id, history_entry_count), - (auth, mcp_servers, auth_statuses), - ) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut); - - let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| { - error!("failed to initialize rollout recorder: {e:#}"); - e - })?; - let rollout_path = rollout_recorder - .as_ref() - .map(|rec| rec.rollout_path().to_path_buf()); - - let mut post_session_configured_events = Vec::::new(); - - for usage in config.features.legacy_feature_usages() { - post_session_configured_events.push(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent { - summary: usage.summary.clone(), - details: usage.details.clone(), - }), - }); - } - if crate::config::uses_deprecated_instructions_file(&config.config_layer_stack) { - post_session_configured_events.push(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent { - summary: "`experimental_instructions_file` is deprecated and ignored. Use `model_instructions_file` instead." - .to_string(), - details: Some( - "Move the setting to `model_instructions_file` in config.toml (or under a profile) to load instructions from a file." - .to_string(), - ), - }), - }); - } - for message in &config.startup_warnings { - post_session_configured_events.push(Event { - id: "".to_owned(), - msg: EventMsg::Warning(WarningEvent { - message: message.clone(), - }), - }); - } - let config_path = config.codex_home.join(CONFIG_TOML_FILE); - if let Some(event) = unstable_features_warning_event( - config - .config_layer_stack - .effective_config() - .get("features") - .and_then(TomlValue::as_table), - config.suppress_unstable_features_warning, - &config.features, - &config_path.display().to_string(), - ) { - post_session_configured_events.push(event); - } - if config.permissions.approval_policy.value() == AskForApproval::OnFailure { - post_session_configured_events.push(Event { - id: "".to_owned(), - msg: EventMsg::Warning(WarningEvent { - message: "`on-failure` approval policy is deprecated and will be removed in a future release. Use `on-request` for interactive approvals or `never` for non-interactive runs.".to_string(), - }), - }); - } - - let auth = auth.as_ref(); - let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); - let account_id = auth.and_then(CodexAuth::get_account_id); - let account_email = auth.and_then(CodexAuth::get_account_email); - let originator = originator().value; - let terminal_type = user_agent(); - let session_model = session_configuration.collaboration_mode.model().to_string(); - let auth_env_telemetry = collect_auth_env_telemetry( - &session_configuration.provider, - auth_manager.codex_api_key_env_enabled(), - ); - let mut session_telemetry = SessionTelemetry::new( - conversation_id, - session_model.as_str(), - session_model.as_str(), - account_id.clone(), - account_email.clone(), - auth_mode, - originator.clone(), - config.otel.log_user_prompt, - terminal_type.clone(), - session_configuration.session_source.clone(), - ) - .with_auth_env(auth_env_telemetry.to_otel_metadata()); - if let Some(service_name) = session_configuration.metrics_service_name.as_deref() { - session_telemetry = session_telemetry.with_metrics_service_name(service_name); - } - let network_proxy_audit_metadata = NetworkProxyAuditMetadata { - conversation_id: Some(conversation_id.to_string()), - app_version: Some(env!("CARGO_PKG_VERSION").to_string()), - user_account_id: account_id, - auth_mode: auth_mode.map(|mode| mode.to_string()), - originator: Some(originator), - user_email: account_email, - terminal_type: Some(terminal_type), - model: Some(session_model.clone()), - slug: Some(session_model), - }; - config.features.emit_metrics(&session_telemetry); - session_telemetry.counter( - THREAD_STARTED_METRIC, - /*inc*/ 1, - &[( - "is_git", - if get_git_repo_root(&session_configuration.cwd).is_some() { - "true" - } else { - "false" - }, - )], - ); - - session_telemetry.conversation_starts( - config.model_provider.name.as_str(), - session_configuration.collaboration_mode.reasoning_effort(), - config - .model_reasoning_summary - .unwrap_or(ReasoningSummaryConfig::Auto), - config.model_context_window, - config.model_auto_compact_token_limit, - config.permissions.approval_policy.value(), - config.permissions.sandbox_policy.get().clone(), - mcp_servers.keys().map(String::as_str).collect(), - config.active_profile.clone(), - ); - - let use_zsh_fork_shell = config.features.enabled(Feature::ShellZshFork); - let mut default_shell = if let Some(user_shell_override) = - session_configuration.user_shell_override.clone() - { - user_shell_override - } else if use_zsh_fork_shell { - let zsh_path = config.zsh_path.as_ref().ok_or_else(|| { - anyhow::anyhow!( - "zsh fork feature enabled, but `zsh_path` is not configured; set `zsh_path` in config.toml" - ) - })?; - let zsh_path = zsh_path.to_path_buf(); - shell::get_shell(shell::ShellType::Zsh, Some(&zsh_path)).ok_or_else(|| { - anyhow::anyhow!( - "zsh fork feature enabled, but zsh_path `{}` is not usable; set `zsh_path` to a valid zsh executable", - zsh_path.display() - ) - })? - } else { - shell::default_user_shell() - }; - // Create the mutable state for the Session. - let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) { - if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() { - let (tx, rx) = watch::channel(Some(snapshot)); - default_shell.shell_snapshot = rx; - tx - } else { - ShellSnapshot::start_snapshotting( - config.codex_home.clone(), - conversation_id, - session_configuration.cwd.clone(), - &mut default_shell, - session_telemetry.clone(), - ) - } - } else { - let (tx, rx) = watch::channel(None); - default_shell.shell_snapshot = rx; - tx - }; - let thread_name = - thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, conversation_id) - .instrument(info_span!( - "session_init.thread_name_lookup", - otel.name = "session_init.thread_name_lookup", - )) - .await; - session_configuration.thread_name = thread_name.clone(); - let state = SessionState::new(session_configuration.clone()); - let managed_network_requirements_configured = config - .config_layer_stack - .requirements_toml() - .network - .is_some(); - let managed_network_requirements_enabled = config.managed_network_requirements_enabled(); - let network_approval = Arc::new(NetworkApprovalService::default()); - // The managed proxy can call back into core for allowlist-miss decisions. - let network_policy_decider_session = if managed_network_requirements_configured { - config - .permissions - .network - .as_ref() - .map(|_| Arc::new(RwLock::new(std::sync::Weak::::new()))) - } else { - None - }; - let blocked_request_observer = if managed_network_requirements_configured { - config - .permissions - .network - .as_ref() - .map(|_| build_blocked_request_observer(Arc::clone(&network_approval))) - } else { - None - }; - let network_policy_decider = - network_policy_decider_session - .as_ref() - .map(|network_policy_decider_session| { - build_network_policy_decider( - Arc::clone(&network_approval), - Arc::clone(network_policy_decider_session), - ) - }); - let (network_proxy, session_network_proxy) = - if let Some(spec) = config.permissions.network.as_ref() { - let current_exec_policy = exec_policy.current(); - let (network_proxy, session_network_proxy) = Self::start_managed_network_proxy( - spec, - current_exec_policy.as_ref(), - config.permissions.sandbox_policy.get(), - network_policy_decider.as_ref().map(Arc::clone), - blocked_request_observer.as_ref().map(Arc::clone), - managed_network_requirements_configured, - network_proxy_audit_metadata, - ) - .instrument(info_span!( - "session_init.network_proxy", - otel.name = "session_init.network_proxy", - session_init.managed_network_requirements_enabled = - managed_network_requirements_enabled, - )) - .await?; - (Some(network_proxy), Some(session_network_proxy)) - } else { - (None, None) - }; - - let mut hook_shell_argv = - default_shell.derive_exec_args("", /*use_login_shell*/ false); - let hook_shell_program = hook_shell_argv.remove(0); - let _ = hook_shell_argv.pop(); - let hooks = Hooks::new(HooksConfig { - legacy_notify_argv: config.notify.clone(), - feature_enabled: config.features.enabled(Feature::CodexHooks), - config_layer_stack: Some(config.config_layer_stack.clone()), - shell_program: Some(hook_shell_program), - shell_args: hook_shell_argv, - }); - for warning in hooks.startup_warnings() { - post_session_configured_events.push(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::Warning(WarningEvent { - message: warning.clone(), - }), - }); - } - - let installation_id = resolve_installation_id(&config.codex_home).await?; - let analytics_events_client = analytics_events_client.unwrap_or_else(|| { - AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - ) - }); - let services = SessionServices { - // Initialize the MCP connection manager with an uninitialized - // instance. It will be replaced with one created via - // McpConnectionManager::new() once all its constructor args are - // available. This also ensures `SessionConfigured` is emitted - // before any MCP-related events. It is reasonable to consider - // changing this to use Option or OnceCell, though the current - // setup is straightforward enough and performs well. - mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( - &config.permissions.approval_policy, - &config.permissions.sandbox_policy, - ))), - mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), - unified_exec_manager: UnifiedExecProcessManager::new( - config.background_terminal_max_timeout, - ), - shell_zsh_path: config.zsh_path.clone(), - main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(), - analytics_events_client, - hooks, - rollout: Mutex::new(rollout_recorder), - user_shell: Arc::new(default_shell), - agent_identity_manager: Arc::new(AgentIdentityManager::new( - config.as_ref(), - Arc::clone(&auth_manager), - session_configuration.session_source.clone(), - )), - shell_snapshot_tx, - show_raw_agent_reasoning: config.show_raw_agent_reasoning, - exec_policy, - auth_manager: Arc::clone(&auth_manager), - session_telemetry, - models_manager: Arc::clone(&models_manager), - tool_approvals: Mutex::new(ApprovalStore::default()), - guardian_rejections: Mutex::new(HashMap::new()), - skills_manager, - plugins_manager: Arc::clone(&plugins_manager), - mcp_manager: Arc::clone(&mcp_manager), - skills_watcher, - agent_control, - network_proxy, - network_approval: Arc::clone(&network_approval), - state_db: state_db_ctx.clone(), - thread_store: LocalThreadStore::new(RolloutConfig::from_view(config.as_ref())), - model_client: ModelClient::new( - Some(Arc::clone(&auth_manager)), - conversation_id, - installation_id, - session_configuration.provider.clone(), - session_configuration.session_source.clone(), - config.model_verbosity, - config.features.enabled(Feature::EnableRequestCompression), - config.features.enabled(Feature::RuntimeMetrics), - Self::build_model_client_beta_features_header(config.as_ref()), - ), - code_mode_service: crate::tools::code_mode::CodeModeService::new( - config.js_repl_node_path.clone(), - ), - environment, - }; - services - .model_client - .set_window_generation(window_generation); - let js_repl = Arc::new(JsReplHandle::with_node_path( - config.js_repl_node_path.clone(), - config.js_repl_node_module_dirs.clone(), - )); - let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) = - watch::channel(false); - - let (mailbox, mailbox_rx) = Mailbox::new(); - let sess = Arc::new(Session { - conversation_id, - tx_event: tx_event.clone(), - agent_status, - out_of_band_elicitation_paused, - state: Mutex::new(state), - managed_network_proxy_refresh_lock: Mutex::new(()), - features: config.features.clone(), - pending_mcp_server_refresh_config: Mutex::new(None), - conversation: Arc::new(RealtimeConversationManager::new()), - active_turn: Mutex::new(None), - mailbox, - mailbox_rx: Mutex::new(mailbox_rx), - idle_pending_input: Mutex::new(Vec::new()), - guardian_review_session: GuardianReviewSessionManager::default(), - services, - js_repl, - next_internal_sub_id: AtomicU64::new(0), - }); - if let Some(network_policy_decider_session) = network_policy_decider_session { - let mut guard = network_policy_decider_session.write().await; - *guard = Arc::downgrade(&sess); - } - // Dispatch the SessionConfiguredEvent first and then report any errors. - // If resuming, include converted initial messages in the payload so UIs can render them immediately. - let initial_messages = initial_history.get_event_msgs(); - let events = std::iter::once(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id: conversation_id, - forked_from_id, - thread_name: session_configuration.thread_name.clone(), - model: session_configuration.collaboration_mode.model().to_string(), - model_provider_id: config.model_provider_id.clone(), - service_tier: session_configuration.service_tier, - approval_policy: session_configuration.approval_policy.value(), - approvals_reviewer: session_configuration.approvals_reviewer, - sandbox_policy: session_configuration.sandbox_policy.get().clone(), - cwd: session_configuration.cwd.clone(), - reasoning_effort: session_configuration.collaboration_mode.reasoning_effort(), - history_log_id, - history_entry_count, - initial_messages, - network_proxy: session_network_proxy.filter(|_| { - Self::managed_network_proxy_active_for_sandbox_policy( - session_configuration.sandbox_policy.get(), - ) - }), - rollout_path, - }), - }) - .chain(post_session_configured_events.into_iter()); - for event in events { - sess.send_event_raw(event).await; - } - - // Start the watcher after SessionConfigured so it cannot emit earlier events. - sess.start_skills_watcher_listener(); - sess.start_agent_identity_registration(); - let mut required_mcp_servers: Vec = mcp_servers - .iter() - .filter(|(_, server)| server.enabled && server.required) - .map(|(name, _)| name.clone()) - .collect(); - required_mcp_servers.sort(); - let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count(); - let required_mcp_server_count = required_mcp_servers.len(); - let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await; - { - let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; - cancel_guard.cancel(); - *cancel_guard = CancellationToken::new(); - } - let (mcp_connection_manager, cancel_token) = McpConnectionManager::new( - &mcp_servers, - config.mcp_oauth_credentials_store_mode, - auth_statuses.clone(), - &session_configuration.approval_policy, - INITIAL_SUBMIT_ID.to_owned(), - tx_event.clone(), - session_configuration.sandbox_policy.get().clone(), - config.codex_home.to_path_buf(), - codex_apps_tools_cache_key(auth), - tool_plugin_provenance, - ) - .instrument(info_span!( - "session_init.mcp_manager_init", - otel.name = "session_init.mcp_manager_init", - session_init.enabled_mcp_server_count = enabled_mcp_server_count, - session_init.required_mcp_server_count = required_mcp_server_count, - )) - .await; - { - let mut manager_guard = sess.services.mcp_connection_manager.write().await; - *manager_guard = mcp_connection_manager; - } - { - let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; - if cancel_guard.is_cancelled() { - cancel_token.cancel(); - } - *cancel_guard = cancel_token; - } - if !required_mcp_servers.is_empty() { - let failures = sess - .services - .mcp_connection_manager - .read() - .await - .required_startup_failures(&required_mcp_servers) - .instrument(info_span!( - "session_init.required_mcp_wait", - otel.name = "session_init.required_mcp_wait", - session_init.required_mcp_server_count = required_mcp_server_count, - )) - .await; - if !failures.is_empty() { - let details = failures - .iter() - .map(|failure| format!("{}: {}", failure.server, failure.error)) - .collect::>() - .join("; "); - return Err(anyhow::anyhow!( - "required MCP servers failed to initialize: {details}" - )); - } - } - sess.schedule_startup_prewarm(session_configuration.base_instructions.clone()) - .await; - let session_start_source = match &initial_history { - InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume, - InitialHistory::New | InitialHistory::Forked(_) => { - codex_hooks::SessionStartSource::Startup - } - InitialHistory::Cleared => codex_hooks::SessionStartSource::Clear, - }; - - // record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted. - sess.record_initial_history(initial_history).await; - { - let mut state = sess.state.lock().await; - state.set_pending_session_start_source(Some(session_start_source)); - } - - memories::start_memories_startup_task( - &sess, - Arc::clone(&config), - &session_configuration.session_source, - ); - - Ok(sess) - } - pub(crate) fn get_tx_event(&self) -> Sender { self.tx_event.clone() } @@ -2689,167 +1421,6 @@ impl Session { } } - pub(crate) async fn new_turn_with_sub_id( - &self, - sub_id: String, - updates: SessionSettingsUpdate, - ) -> ConstraintResult> { - let ( - session_configuration, - sandbox_policy_changed, - previous_cwd, - codex_home, - session_source, - ) = { - let mut state = self.state.lock().await; - match state.session_configuration.clone().apply(&updates) { - Ok(next) => { - let previous_cwd = state.session_configuration.cwd.clone(); - let sandbox_policy_changed = - state.session_configuration.sandbox_policy != next.sandbox_policy; - let codex_home = next.codex_home.clone(); - let session_source = next.session_source.clone(); - state.session_configuration = next.clone(); - ( - next, - sandbox_policy_changed, - previous_cwd, - codex_home, - session_source, - ) - } - Err(err) => { - drop(state); - self.send_event_raw(Event { - id: sub_id.clone(), - msg: EventMsg::Error(ErrorEvent { - message: err.to_string(), - codex_error_info: Some(CodexErrorInfo::BadRequest), - }), - }) - .await; - return Err(err); - } - } - }; - - self.maybe_refresh_shell_snapshot_for_cwd( - &previous_cwd, - &session_configuration.cwd, - &codex_home, - &session_source, - ); - - if sandbox_policy_changed { - self.refresh_managed_network_proxy_for_current_sandbox_policy() - .await; - } - - Ok(self - .new_turn_from_configuration( - sub_id, - session_configuration, - updates.final_output_json_schema, - ) - .await) - } - - async fn new_turn_from_configuration( - &self, - sub_id: String, - session_configuration: SessionConfiguration, - final_output_json_schema: Option>, - ) -> Arc { - let per_turn_config = Self::build_per_turn_config(&session_configuration); - { - let mcp_connection_manager = self.services.mcp_connection_manager.read().await; - mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy); - mcp_connection_manager - .set_sandbox_policy(per_turn_config.permissions.sandbox_policy.get()); - } - - let model_info = self - .services - .models_manager - .get_model_info( - session_configuration.collaboration_mode.model(), - &per_turn_config.to_models_manager_config(), - ) - .await; - let plugin_outcome = self - .services - .plugins_manager - .plugins_for_config(&per_turn_config) - .await; - let effective_skill_roots = plugin_outcome.effective_skill_roots(); - let skills_input = skills_load_input_from_config(&per_turn_config, effective_skill_roots); - let fs = self - .services - .environment - .as_ref() - .map(|environment| environment.get_filesystem()); - let skills_outcome = Arc::new( - self.services - .skills_manager - .skills_for_config(&skills_input, fs) - .await, - ); - let mut turn_context: TurnContext = Self::make_turn_context( - self.conversation_id, - Some(Arc::clone(&self.services.auth_manager)), - &self.services.session_telemetry, - session_configuration.provider.clone(), - &session_configuration, - self.services.user_shell.as_ref(), - self.services.shell_zsh_path.as_ref(), - self.services.main_execve_wrapper_exe.as_ref(), - per_turn_config, - model_info, - &self.services.models_manager, - self.services - .network_proxy - .as_ref() - .and_then(|started_proxy| { - Self::managed_network_proxy_active_for_sandbox_policy( - session_configuration.sandbox_policy.get(), - ) - .then(|| started_proxy.proxy()) - }), - self.services.environment.clone(), - sub_id, - Arc::clone(&self.js_repl), - skills_outcome, - ); - turn_context.realtime_active = self.conversation.running_state().await.is_some(); - - if let Some(final_schema) = final_output_json_schema { - turn_context.final_output_json_schema = final_schema; - } - let turn_context = Arc::new(turn_context); - turn_context.turn_metadata_state.spawn_git_enrichment_task(); - turn_context - } - - pub(crate) async fn maybe_emit_unknown_model_warning_for_turn(&self, tc: &TurnContext) { - if tc.model_info.used_fallback_model_metadata { - self.send_event( - tc, - EventMsg::Warning(WarningEvent { - message: format!( - "Model metadata for `{}` not found. Defaulting to fallback metadata; this can degrade performance and cause issues.", - tc.model_info.slug - ), - }), - ) - .await; - } - } - - pub(crate) async fn new_default_turn(&self) -> Arc { - self.new_default_turn_with_sub_id(self.next_internal_sub_id()) - .await - } - pub(crate) async fn set_session_startup_prewarm( &self, startup_prewarm: SessionStartupPrewarmHandle, @@ -2912,19 +1483,6 @@ impl Session { self.services.plugins_manager.clear_cache(); } - pub(crate) async fn new_default_turn_with_sub_id(&self, sub_id: String) -> Arc { - let session_configuration = { - let state = self.state.lock().await; - state.session_configuration.clone() - }; - self.new_turn_from_configuration( - sub_id, - session_configuration, - /*final_output_json_schema*/ None, - ) - .await - } - async fn build_settings_update_items( &self, reference_context_item: Option<&TurnContextItem>, @@ -3486,85 +2044,6 @@ impl Session { rx_response.await.ok() } - pub async fn request_mcp_server_elicitation( - &self, - turn_context: &TurnContext, - request_id: RequestId, - params: McpServerElicitationRequestParams, - ) -> Option { - let server_name = params.server_name.clone(); - let request = match params.request { - McpServerElicitationRequest::Form { - meta, - message, - requested_schema, - } => { - let requested_schema = match serde_json::to_value(requested_schema) { - Ok(requested_schema) => requested_schema, - Err(err) => { - warn!( - "failed to serialize MCP elicitation schema for server_name: {server_name}, request_id: {request_id}: {err:#}" - ); - return None; - } - }; - codex_protocol::approvals::ElicitationRequest::Form { - meta, - message, - requested_schema, - } - } - McpServerElicitationRequest::Url { - meta, - message, - url, - elicitation_id, - } => codex_protocol::approvals::ElicitationRequest::Url { - meta, - message, - url, - elicitation_id, - }, - }; - - let (tx_response, rx_response) = oneshot::channel(); - let prev_entry = { - let mut active = self.active_turn.lock().await; - match active.as_mut() { - Some(at) => { - let mut ts = at.turn_state.lock().await; - ts.insert_pending_elicitation( - server_name.clone(), - request_id.clone(), - tx_response, - ) - } - None => None, - } - }; - if prev_entry.is_some() { - warn!( - "Overwriting existing pending elicitation for server_name: {server_name}, request_id: {request_id}" - ); - } - let id = match request_id { - rmcp::model::NumberOrString::String(value) => { - codex_protocol::mcp::RequestId::String(value.to_string()) - } - rmcp::model::NumberOrString::Number(value) => { - codex_protocol::mcp::RequestId::Integer(value) - } - }; - let event = EventMsg::ElicitationRequest(ElicitationRequestEvent { - turn_id: params.turn_id, - server_name, - id, - request, - }); - self.send_event(turn_context, event).await; - rx_response.await.ok() - } - pub async fn notify_user_input_response( &self, sub_id: &str, @@ -3685,37 +2164,6 @@ impl Session { } } - pub async fn resolve_elicitation( - &self, - server_name: String, - id: RequestId, - response: ElicitationResponse, - ) -> anyhow::Result<()> { - let entry = { - let mut active = self.active_turn.lock().await; - match active.as_mut() { - Some(at) => { - let mut ts = at.turn_state.lock().await; - ts.remove_pending_elicitation(&server_name, &id) - } - None => None, - } - }; - if let Some(tx_response) = entry { - tx_response - .send(response) - .map_err(|e| anyhow::anyhow!("failed to send elicitation response: {e:?}"))?; - return Ok(()); - } - - self.services - .mcp_connection_manager - .read() - .await - .resolve_elicitation(server_name, id, response) - .await - } - /// Records input items: always append to conversation history and /// persist these response items to rollout. pub(crate) async fn record_conversation_items( @@ -4513,69 +2961,6 @@ impl Session { self.mailbox_rx.lock().await.has_pending() } - pub async fn list_resources( - &self, - server: &str, - params: Option, - ) -> anyhow::Result { - self.services - .mcp_connection_manager - .read() - .await - .list_resources(server, params) - .await - } - - pub async fn list_resource_templates( - &self, - server: &str, - params: Option, - ) -> anyhow::Result { - self.services - .mcp_connection_manager - .read() - .await - .list_resource_templates(server, params) - .await - } - - pub async fn read_resource( - &self, - server: &str, - params: ReadResourceRequestParams, - ) -> anyhow::Result { - self.services - .mcp_connection_manager - .read() - .await - .read_resource(server, params) - .await - } - - pub async fn call_tool( - &self, - server: &str, - tool: &str, - arguments: Option, - meta: Option, - ) -> anyhow::Result { - self.services - .mcp_connection_manager - .read() - .await - .call_tool(server, tool, arguments, meta) - .await - } - - pub(crate) async fn resolve_mcp_tool_info(&self, tool_name: &ToolName) -> Option { - self.services - .mcp_connection_manager - .read() - .await - .resolve_tool_info(tool_name) - .await - } - pub async fn interrupt_task(self: &Arc) { info!("interrupt received: abort current task, if any"); let has_active_turn = { self.active_turn.lock().await.is_some() }; @@ -4614,117 +2999,9 @@ impl Session { state.take_pending_session_start_source() } - async fn refresh_mcp_servers_inner( - &self, - turn_context: &TurnContext, - mcp_servers: HashMap, - store_mode: OAuthCredentialsStoreMode, - ) { - let auth = self.services.auth_manager.auth().await; - let config = self.get_config().await; - let mcp_config = config - .to_mcp_config(self.services.plugins_manager.as_ref()) - .await; - let tool_plugin_provenance = self - .services - .mcp_manager - .tool_plugin_provenance(config.as_ref()) - .await; - let mcp_servers = with_codex_apps_mcp(mcp_servers, auth.as_ref(), &mcp_config); - let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await; - { - let mut guard = self.services.mcp_startup_cancellation_token.lock().await; - guard.cancel(); - *guard = CancellationToken::new(); - } - let (refreshed_manager, cancel_token) = McpConnectionManager::new( - &mcp_servers, - store_mode, - auth_statuses, - &turn_context.config.permissions.approval_policy, - turn_context.sub_id.clone(), - self.get_tx_event(), - turn_context.sandbox_policy.get().clone(), - config.codex_home.to_path_buf(), - codex_apps_tools_cache_key(auth.as_ref()), - tool_plugin_provenance, - ) - .await; - { - let mut guard = self.services.mcp_startup_cancellation_token.lock().await; - if guard.is_cancelled() { - cancel_token.cancel(); - } - *guard = cancel_token; - } - - let mut manager = self.services.mcp_connection_manager.write().await; - *manager = refreshed_manager; - } - - async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) { - let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() }; - let Some(refresh_config) = refresh_config else { - return; - }; - - let McpServerRefreshConfig { - mcp_servers, - mcp_oauth_credentials_store_mode, - } = refresh_config; - - let mcp_servers = - match serde_json::from_value::>(mcp_servers) { - Ok(servers) => servers, - Err(err) => { - warn!("failed to parse MCP server refresh config: {err}"); - return; - } - }; - let store_mode = match serde_json::from_value::( - mcp_oauth_credentials_store_mode, - ) { - Ok(mode) => mode, - Err(err) => { - warn!("failed to parse MCP OAuth refresh config: {err}"); - return; - } - }; - - self.refresh_mcp_servers_inner(turn_context, mcp_servers, store_mode) - .await; - } - - pub(crate) async fn refresh_mcp_servers_now( - &self, - turn_context: &TurnContext, - mcp_servers: HashMap, - store_mode: OAuthCredentialsStoreMode, - ) { - self.refresh_mcp_servers_inner(turn_context, mcp_servers, store_mode) - .await; - } - - #[cfg(test)] - async fn mcp_startup_cancellation_token(&self) -> CancellationToken { - self.services - .mcp_startup_cancellation_token - .lock() - .await - .clone() - } - fn show_raw_agent_reasoning(&self) -> bool { self.services.show_raw_agent_reasoning } - - async fn cancel_mcp_startup(&self) { - self.services - .mcp_startup_cancellation_token - .lock() - .await - .cancel(); - } } pub(crate) fn emit_subagent_session_started( @@ -4760,168 +3037,6 @@ pub(crate) fn emit_subagent_session_started( }); } -/// Spawn a review thread using the given prompt. -async fn spawn_review_thread( - sess: Arc, - config: Arc, - parent_turn_context: Arc, - sub_id: String, - resolved: crate::review_prompts::ResolvedReviewRequest, -) { - let model = config - .review_model - .clone() - .unwrap_or_else(|| parent_turn_context.model_info.slug.clone()); - let review_model_info = sess - .services - .models_manager - .get_model_info(&model, &config.to_models_manager_config()) - .await; - // For reviews, disable web_search and view_image regardless of global settings. - let mut review_features = sess.features.clone(); - let _ = review_features.disable(Feature::WebSearchRequest); - let _ = review_features.disable(Feature::WebSearchCached); - let review_web_search_mode = WebSearchMode::Disabled; - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_info: &review_model_info, - available_models: &sess - .services - .models_manager - .list_models(RefreshStrategy::OnlineIfUncached) - .await, - features: &review_features, - image_generation_tool_auth_allowed: image_generation_tool_auth_allowed(Some( - sess.services.auth_manager.as_ref(), - )), - web_search_mode: Some(review_web_search_mode), - session_source: parent_turn_context.session_source.clone(), - sandbox_policy: parent_turn_context.sandbox_policy.get(), - windows_sandbox_level: parent_turn_context.windows_sandbox_level, - }) - .with_unified_exec_shell_mode_for_session( - crate::tools::spec::tool_user_shell_type(sess.services.user_shell.as_ref()), - sess.services.shell_zsh_path.as_ref(), - sess.services.main_execve_wrapper_exe.as_ref(), - ) - .with_web_search_config(/*web_search_config*/ None) - .with_allow_login_shell(config.permissions.allow_login_shell) - .with_has_environment(parent_turn_context.environment.is_some()) - .with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled) - .with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone()) - .with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata) - .with_agent_type_description(crate::agent::role::spawn_tool_spec::build( - &config.agent_roles, - )); - - let review_prompt = resolved.prompt.clone(); - let provider = parent_turn_context.provider.clone(); - let auth_manager = parent_turn_context.auth_manager.clone(); - let model_info = review_model_info.clone(); - - // Build per‑turn client with the requested model/family. - let mut per_turn_config = (*config).clone(); - per_turn_config.model = Some(model.clone()); - per_turn_config.features = review_features.clone(); - if let Err(err) = per_turn_config.web_search_mode.set(review_web_search_mode) { - let fallback_value = per_turn_config.web_search_mode.value(); - tracing::warn!( - error = %err, - ?review_web_search_mode, - ?fallback_value, - "review web_search_mode is disallowed by requirements; keeping constrained value" - ); - } - - let session_telemetry = parent_turn_context - .session_telemetry - .clone() - .with_model(model.as_str(), review_model_info.slug.as_str()); - let auth_manager_for_context = auth_manager.clone(); - let provider_for_context = provider.clone(); - let session_telemetry_for_context = session_telemetry.clone(); - let reasoning_effort = per_turn_config.model_reasoning_effort; - let reasoning_summary = per_turn_config - .model_reasoning_summary - .unwrap_or(model_info.default_reasoning_summary); - let session_source = parent_turn_context.session_source.clone(); - - let per_turn_config = Arc::new(per_turn_config); - let review_turn_id = sub_id.to_string(); - let turn_metadata_state = Arc::new(TurnMetadataState::new( - sess.conversation_id.to_string(), - &session_source, - review_turn_id.clone(), - parent_turn_context.cwd.clone(), - parent_turn_context.sandbox_policy.get(), - parent_turn_context.windows_sandbox_level, - )); - - let review_turn_context = TurnContext { - sub_id: review_turn_id, - trace_id: current_span_trace_id(), - realtime_active: parent_turn_context.realtime_active, - config: per_turn_config, - auth_manager: auth_manager_for_context, - model_info: model_info.clone(), - session_telemetry: session_telemetry_for_context, - provider: provider_for_context, - reasoning_effort, - reasoning_summary, - session_source, - environment: parent_turn_context.environment.clone(), - tools_config, - features: parent_turn_context.features.clone(), - ghost_snapshot: parent_turn_context.ghost_snapshot.clone(), - current_date: parent_turn_context.current_date.clone(), - timezone: parent_turn_context.timezone.clone(), - app_server_client_name: parent_turn_context.app_server_client_name.clone(), - developer_instructions: None, - user_instructions: None, - compact_prompt: parent_turn_context.compact_prompt.clone(), - collaboration_mode: parent_turn_context.collaboration_mode.clone(), - personality: parent_turn_context.personality, - approval_policy: parent_turn_context.approval_policy.clone(), - sandbox_policy: parent_turn_context.sandbox_policy.clone(), - file_system_sandbox_policy: parent_turn_context.file_system_sandbox_policy.clone(), - network_sandbox_policy: parent_turn_context.network_sandbox_policy, - network: parent_turn_context.network.clone(), - windows_sandbox_level: parent_turn_context.windows_sandbox_level, - shell_environment_policy: parent_turn_context.shell_environment_policy.clone(), - cwd: parent_turn_context.cwd.clone(), - final_output_json_schema: None, - codex_self_exe: parent_turn_context.codex_self_exe.clone(), - codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), - tool_call_gate: Arc::new(ReadinessFlag::new()), - js_repl: Arc::clone(&sess.js_repl), - dynamic_tools: parent_turn_context.dynamic_tools.clone(), - truncation_policy: model_info.truncation_policy.into(), - turn_metadata_state, - turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()), - turn_timing_state: Arc::new(TurnTimingState::default()), - }; - - // Seed the child task with the review prompt as the initial user message. - let input: Vec = vec![UserInput::Text { - text: review_prompt, - // Review prompt is synthesized; no UI element ranges to preserve. - text_elements: Vec::new(), - }]; - let tc = Arc::new(review_turn_context); - tc.turn_metadata_state.spawn_git_enrichment_task(); - // TODO(ccunningham): Review turns currently rely on `spawn_task` for TurnComplete but do not - // emit a parent TurnStarted. Consider giving review a full parent turn lifecycle - // (TurnStarted + TurnComplete) for consistency with other standalone tasks. - sess.spawn_task(tc.clone(), input, ReviewTask::new()).await; - - // Announce entering review mode so UIs can switch modes. - let review_request = ReviewRequest { - target: resolved.target, - user_facing_hint: Some(resolved.user_facing_hint), - }; - sess.send_event(&tc, EventMsg::EnteredReviewMode(review_request)) - .await; -} - fn skills_to_info( skills: &[SkillMetadata], disabled_paths: &HashSet, diff --git a/codex-rs/core/src/codex/mcp.rs b/codex-rs/core/src/codex/mcp.rs new file mode 100644 index 0000000000..56628f0543 --- /dev/null +++ b/codex-rs/core/src/codex/mcp.rs @@ -0,0 +1,284 @@ +use super::*; + +impl Session { + pub async fn request_mcp_server_elicitation( + &self, + turn_context: &TurnContext, + request_id: RequestId, + params: McpServerElicitationRequestParams, + ) -> Option { + let server_name = params.server_name.clone(); + let request = match params.request { + McpServerElicitationRequest::Form { + meta, + message, + requested_schema, + } => { + let requested_schema = match serde_json::to_value(requested_schema) { + Ok(requested_schema) => requested_schema, + Err(err) => { + warn!( + "failed to serialize MCP elicitation schema for server_name: {server_name}, request_id: {request_id}: {err:#}" + ); + return None; + } + }; + codex_protocol::approvals::ElicitationRequest::Form { + meta, + message, + requested_schema, + } + } + McpServerElicitationRequest::Url { + meta, + message, + url, + elicitation_id, + } => codex_protocol::approvals::ElicitationRequest::Url { + meta, + message, + url, + elicitation_id, + }, + }; + + let (tx_response, rx_response) = oneshot::channel(); + let prev_entry = { + let mut active = self.active_turn.lock().await; + match active.as_mut() { + Some(at) => { + let mut ts = at.turn_state.lock().await; + ts.insert_pending_elicitation( + server_name.clone(), + request_id.clone(), + tx_response, + ) + } + None => None, + } + }; + if prev_entry.is_some() { + warn!( + "Overwriting existing pending elicitation for server_name: {server_name}, request_id: {request_id}" + ); + } + let id = match request_id { + rmcp::model::NumberOrString::String(value) => { + codex_protocol::mcp::RequestId::String(value.to_string()) + } + rmcp::model::NumberOrString::Number(value) => { + codex_protocol::mcp::RequestId::Integer(value) + } + }; + let event = EventMsg::ElicitationRequest(ElicitationRequestEvent { + turn_id: params.turn_id, + server_name, + id, + request, + }); + self.send_event(turn_context, event).await; + rx_response.await.ok() + } + + pub async fn resolve_elicitation( + &self, + server_name: String, + id: RequestId, + response: ElicitationResponse, + ) -> anyhow::Result<()> { + let entry = { + let mut active = self.active_turn.lock().await; + match active.as_mut() { + Some(at) => { + let mut ts = at.turn_state.lock().await; + ts.remove_pending_elicitation(&server_name, &id) + } + None => None, + } + }; + if let Some(tx_response) = entry { + tx_response + .send(response) + .map_err(|e| anyhow::anyhow!("failed to send elicitation response: {e:?}"))?; + return Ok(()); + } + + self.services + .mcp_connection_manager + .read() + .await + .resolve_elicitation(server_name, id, response) + .await + } + + pub async fn list_resources( + &self, + server: &str, + params: Option, + ) -> anyhow::Result { + self.services + .mcp_connection_manager + .read() + .await + .list_resources(server, params) + .await + } + + pub async fn list_resource_templates( + &self, + server: &str, + params: Option, + ) -> anyhow::Result { + self.services + .mcp_connection_manager + .read() + .await + .list_resource_templates(server, params) + .await + } + + pub async fn read_resource( + &self, + server: &str, + params: ReadResourceRequestParams, + ) -> anyhow::Result { + self.services + .mcp_connection_manager + .read() + .await + .read_resource(server, params) + .await + } + + pub async fn call_tool( + &self, + server: &str, + tool: &str, + arguments: Option, + meta: Option, + ) -> anyhow::Result { + self.services + .mcp_connection_manager + .read() + .await + .call_tool(server, tool, arguments, meta) + .await + } + + pub(crate) async fn resolve_mcp_tool_info(&self, tool_name: &ToolName) -> Option { + self.services + .mcp_connection_manager + .read() + .await + .resolve_tool_info(tool_name) + .await + } + + async fn refresh_mcp_servers_inner( + &self, + turn_context: &TurnContext, + mcp_servers: HashMap, + store_mode: OAuthCredentialsStoreMode, + ) { + let auth = self.services.auth_manager.auth().await; + let config = self.get_config().await; + let mcp_config = config + .to_mcp_config(self.services.plugins_manager.as_ref()) + .await; + let tool_plugin_provenance = self + .services + .mcp_manager + .tool_plugin_provenance(config.as_ref()) + .await; + let mcp_servers = with_codex_apps_mcp(mcp_servers, auth.as_ref(), &mcp_config); + let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await; + { + let mut guard = self.services.mcp_startup_cancellation_token.lock().await; + guard.cancel(); + *guard = CancellationToken::new(); + } + let (refreshed_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + store_mode, + auth_statuses, + &turn_context.config.permissions.approval_policy, + turn_context.sub_id.clone(), + self.get_tx_event(), + turn_context.sandbox_policy.get().clone(), + config.codex_home.to_path_buf(), + codex_apps_tools_cache_key(auth.as_ref()), + tool_plugin_provenance, + ) + .await; + { + let mut guard = self.services.mcp_startup_cancellation_token.lock().await; + if guard.is_cancelled() { + cancel_token.cancel(); + } + *guard = cancel_token; + } + + let mut manager = self.services.mcp_connection_manager.write().await; + *manager = refreshed_manager; + } + + pub(crate) async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) { + let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() }; + let Some(refresh_config) = refresh_config else { + return; + }; + + let McpServerRefreshConfig { + mcp_servers, + mcp_oauth_credentials_store_mode, + } = refresh_config; + + let mcp_servers = + match serde_json::from_value::>(mcp_servers) { + Ok(servers) => servers, + Err(err) => { + warn!("failed to parse MCP server refresh config: {err}"); + return; + } + }; + let store_mode = match serde_json::from_value::( + mcp_oauth_credentials_store_mode, + ) { + Ok(mode) => mode, + Err(err) => { + warn!("failed to parse MCP OAuth refresh config: {err}"); + return; + } + }; + + self.refresh_mcp_servers_inner(turn_context, mcp_servers, store_mode) + .await; + } + + pub(crate) async fn refresh_mcp_servers_now( + &self, + turn_context: &TurnContext, + mcp_servers: HashMap, + store_mode: OAuthCredentialsStoreMode, + ) { + self.refresh_mcp_servers_inner(turn_context, mcp_servers, store_mode) + .await; + } + + #[cfg(test)] + pub(crate) async fn mcp_startup_cancellation_token(&self) -> CancellationToken { + self.services + .mcp_startup_cancellation_token + .lock() + .await + .clone() + } + + pub(crate) async fn cancel_mcp_startup(&self) { + self.services + .mcp_startup_cancellation_token + .lock() + .await + .cancel(); + } +} diff --git a/codex-rs/core/src/codex/review.rs b/codex-rs/core/src/codex/review.rs new file mode 100644 index 0000000000..94de4617d5 --- /dev/null +++ b/codex-rs/core/src/codex/review.rs @@ -0,0 +1,164 @@ +use super::turn_context::image_generation_tool_auth_allowed; +use super::*; + +/// Spawn a review thread using the given prompt. +pub(super) async fn spawn_review_thread( + sess: Arc, + config: Arc, + parent_turn_context: Arc, + sub_id: String, + resolved: crate::review_prompts::ResolvedReviewRequest, +) { + let model = config + .review_model + .clone() + .unwrap_or_else(|| parent_turn_context.model_info.slug.clone()); + let review_model_info = sess + .services + .models_manager + .get_model_info(&model, &config.to_models_manager_config()) + .await; + // For reviews, disable web_search and view_image regardless of global settings. + let mut review_features = sess.features.clone(); + let _ = review_features.disable(Feature::WebSearchRequest); + let _ = review_features.disable(Feature::WebSearchCached); + let review_web_search_mode = WebSearchMode::Disabled; + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_info: &review_model_info, + available_models: &sess + .services + .models_manager + .list_models(RefreshStrategy::OnlineIfUncached) + .await, + features: &review_features, + image_generation_tool_auth_allowed: image_generation_tool_auth_allowed(Some( + sess.services.auth_manager.as_ref(), + )), + web_search_mode: Some(review_web_search_mode), + session_source: parent_turn_context.session_source.clone(), + sandbox_policy: parent_turn_context.sandbox_policy.get(), + windows_sandbox_level: parent_turn_context.windows_sandbox_level, + }) + .with_unified_exec_shell_mode_for_session( + crate::tools::spec::tool_user_shell_type(sess.services.user_shell.as_ref()), + sess.services.shell_zsh_path.as_ref(), + sess.services.main_execve_wrapper_exe.as_ref(), + ) + .with_web_search_config(/*web_search_config*/ None) + .with_allow_login_shell(config.permissions.allow_login_shell) + .with_has_environment(parent_turn_context.environment.is_some()) + .with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled) + .with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone()) + .with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata) + .with_agent_type_description(crate::agent::role::spawn_tool_spec::build( + &config.agent_roles, + )); + + let review_prompt = resolved.prompt.clone(); + let provider = parent_turn_context.provider.clone(); + let auth_manager = parent_turn_context.auth_manager.clone(); + let model_info = review_model_info.clone(); + + // Build per‑turn client with the requested model/family. + let mut per_turn_config = (*config).clone(); + per_turn_config.model = Some(model.clone()); + per_turn_config.features = review_features.clone(); + if let Err(err) = per_turn_config.web_search_mode.set(review_web_search_mode) { + let fallback_value = per_turn_config.web_search_mode.value(); + tracing::warn!( + error = %err, + ?review_web_search_mode, + ?fallback_value, + "review web_search_mode is disallowed by requirements; keeping constrained value" + ); + } + + let session_telemetry = parent_turn_context + .session_telemetry + .clone() + .with_model(model.as_str(), review_model_info.slug.as_str()); + let auth_manager_for_context = auth_manager.clone(); + let provider_for_context = provider.clone(); + let session_telemetry_for_context = session_telemetry.clone(); + let reasoning_effort = per_turn_config.model_reasoning_effort; + let reasoning_summary = per_turn_config + .model_reasoning_summary + .unwrap_or(model_info.default_reasoning_summary); + let session_source = parent_turn_context.session_source.clone(); + + let per_turn_config = Arc::new(per_turn_config); + let review_turn_id = sub_id.to_string(); + let turn_metadata_state = Arc::new(TurnMetadataState::new( + sess.conversation_id.to_string(), + &session_source, + review_turn_id.clone(), + parent_turn_context.cwd.clone(), + parent_turn_context.sandbox_policy.get(), + parent_turn_context.windows_sandbox_level, + )); + + let review_turn_context = TurnContext { + sub_id: review_turn_id, + trace_id: current_span_trace_id(), + realtime_active: parent_turn_context.realtime_active, + config: per_turn_config, + auth_manager: auth_manager_for_context, + model_info: model_info.clone(), + session_telemetry: session_telemetry_for_context, + provider: provider_for_context, + reasoning_effort, + reasoning_summary, + session_source, + environment: parent_turn_context.environment.clone(), + tools_config, + features: parent_turn_context.features.clone(), + ghost_snapshot: parent_turn_context.ghost_snapshot.clone(), + current_date: parent_turn_context.current_date.clone(), + timezone: parent_turn_context.timezone.clone(), + app_server_client_name: parent_turn_context.app_server_client_name.clone(), + developer_instructions: None, + user_instructions: None, + compact_prompt: parent_turn_context.compact_prompt.clone(), + collaboration_mode: parent_turn_context.collaboration_mode.clone(), + personality: parent_turn_context.personality, + approval_policy: parent_turn_context.approval_policy.clone(), + sandbox_policy: parent_turn_context.sandbox_policy.clone(), + file_system_sandbox_policy: parent_turn_context.file_system_sandbox_policy.clone(), + network_sandbox_policy: parent_turn_context.network_sandbox_policy, + network: parent_turn_context.network.clone(), + windows_sandbox_level: parent_turn_context.windows_sandbox_level, + shell_environment_policy: parent_turn_context.shell_environment_policy.clone(), + cwd: parent_turn_context.cwd.clone(), + final_output_json_schema: None, + codex_self_exe: parent_turn_context.codex_self_exe.clone(), + codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), + tool_call_gate: Arc::new(ReadinessFlag::new()), + js_repl: Arc::clone(&sess.js_repl), + dynamic_tools: parent_turn_context.dynamic_tools.clone(), + truncation_policy: model_info.truncation_policy.into(), + turn_metadata_state, + turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()), + turn_timing_state: Arc::new(TurnTimingState::default()), + }; + + // Seed the child task with the review prompt as the initial user message. + let input: Vec = vec![UserInput::Text { + text: review_prompt, + // Review prompt is synthesized; no UI element ranges to preserve. + text_elements: Vec::new(), + }]; + let tc = Arc::new(review_turn_context); + tc.turn_metadata_state.spawn_git_enrichment_task(); + // TODO(ccunningham): Review turns currently rely on `spawn_task` for TurnComplete but do not + // emit a parent TurnStarted. Consider giving review a full parent turn lifecycle + // (TurnStarted + TurnComplete) for consistency with other standalone tasks. + sess.spawn_task(tc.clone(), input, ReviewTask::new()).await; + + // Announce entering review mode so UIs can switch modes. + let review_request = ReviewRequest { + target: resolved.target, + user_facing_hint: Some(resolved.user_facing_hint), + }; + sess.send_event(&tc, EventMsg::EnteredReviewMode(review_request)) + .await; +} diff --git a/codex-rs/core/src/codex/session.rs b/codex-rs/core/src/codex/session.rs new file mode 100644 index 0000000000..766ac79ec1 --- /dev/null +++ b/codex-rs/core/src/codex/session.rs @@ -0,0 +1,844 @@ +use super::*; + +/// Context for an initialized model agent +/// +/// A session has at most 1 running task at a time, and can be interrupted by user input. +pub(crate) struct Session { + pub(crate) conversation_id: ThreadId, + pub(super) tx_event: Sender, + pub(super) agent_status: watch::Sender, + pub(super) out_of_band_elicitation_paused: watch::Sender, + pub(super) state: Mutex, + /// Serializes rebuild/apply cycles for the running proxy; each cycle + /// rebuilds from the current SessionState while holding this lock. + pub(super) managed_network_proxy_refresh_lock: Mutex<()>, + /// The set of enabled features should be invariant for the lifetime of the + /// session. + pub(super) features: ManagedFeatures, + pub(super) pending_mcp_server_refresh_config: Mutex>, + pub(crate) conversation: Arc, + pub(crate) active_turn: Mutex>, + pub(super) mailbox: Mailbox, + pub(super) mailbox_rx: Mutex, + pub(super) idle_pending_input: Mutex>, // TODO (jif) merge with mailbox! + pub(crate) guardian_review_session: GuardianReviewSessionManager, + pub(crate) services: SessionServices, + pub(super) js_repl: Arc, + pub(super) next_internal_sub_id: AtomicU64, +} + +#[derive(Clone)] +pub(crate) struct SessionConfiguration { + /// Provider identifier ("openai", "openrouter", ...). + pub(super) provider: ModelProviderInfo, + + pub(super) collaboration_mode: CollaborationMode, + pub(super) model_reasoning_summary: Option, + pub(super) service_tier: Option, + + /// Developer instructions that supplement the base instructions. + pub(super) developer_instructions: Option, + + /// Model instructions that are appended to the base instructions. + pub(super) user_instructions: Option, + + /// Personality preference for the model. + pub(super) personality: Option, + + /// Base instructions for the session. + pub(super) base_instructions: String, + + /// Compact prompt override. + pub(super) compact_prompt: Option, + + /// When to escalate for approval for execution + pub(super) approval_policy: Constrained, + pub(super) approvals_reviewer: ApprovalsReviewer, + /// How to sandbox commands executed in the system + pub(super) sandbox_policy: Constrained, + pub(super) file_system_sandbox_policy: FileSystemSandboxPolicy, + pub(super) network_sandbox_policy: NetworkSandboxPolicy, + pub(super) windows_sandbox_level: WindowsSandboxLevel, + + /// Absolute working directory that should be treated as the *root* of the + /// session. All relative paths supplied by the model as well as the + /// execution sandbox are resolved against this directory **instead** of + /// the process-wide current working directory. + pub(super) cwd: AbsolutePathBuf, + /// Directory containing all Codex state for this session. + pub(super) codex_home: AbsolutePathBuf, + /// Optional user-facing name for the thread, updated during the session. + pub(super) thread_name: Option, + + // TODO(pakrym): Remove config from here + pub(super) original_config_do_not_use: Arc, + /// Optional service name tag for session metrics. + pub(super) metrics_service_name: Option, + pub(super) app_server_client_name: Option, + pub(super) app_server_client_version: Option, + /// Source of the session (cli, vscode, exec, mcp, ...) + pub(super) session_source: SessionSource, + pub(super) dynamic_tools: Vec, + pub(super) persist_extended_history: bool, + pub(super) inherited_shell_snapshot: Option>, + pub(super) user_shell_override: Option, +} + +impl SessionConfiguration { + pub(crate) fn codex_home(&self) -> &AbsolutePathBuf { + &self.codex_home + } + + pub(super) fn thread_config_snapshot(&self) -> ThreadConfigSnapshot { + ThreadConfigSnapshot { + model: self.collaboration_mode.model().to_string(), + model_provider_id: self.original_config_do_not_use.model_provider_id.clone(), + service_tier: self.service_tier, + approval_policy: self.approval_policy.value(), + approvals_reviewer: self.approvals_reviewer, + sandbox_policy: self.sandbox_policy.get().clone(), + cwd: self.cwd.clone(), + ephemeral: self.original_config_do_not_use.ephemeral, + reasoning_effort: self.collaboration_mode.reasoning_effort(), + personality: self.personality, + session_source: self.session_source.clone(), + } + } + + pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> ConstraintResult { + let mut next_configuration = self.clone(); + let file_system_policy_matches_legacy = self.file_system_sandbox_policy + == FileSystemSandboxPolicy::from_legacy_sandbox_policy( + self.sandbox_policy.get(), + &self.cwd, + ); + if let Some(collaboration_mode) = updates.collaboration_mode.clone() { + next_configuration.collaboration_mode = collaboration_mode; + } + if let Some(summary) = updates.reasoning_summary { + next_configuration.model_reasoning_summary = Some(summary); + } + if let Some(service_tier) = updates.service_tier { + next_configuration.service_tier = service_tier; + } + if let Some(personality) = updates.personality { + next_configuration.personality = Some(personality); + } + if let Some(approval_policy) = updates.approval_policy { + next_configuration.approval_policy.set(approval_policy)?; + } + if let Some(approvals_reviewer) = updates.approvals_reviewer { + next_configuration.approvals_reviewer = approvals_reviewer; + } + let mut sandbox_policy_changed = false; + if let Some(sandbox_policy) = updates.sandbox_policy.clone() { + next_configuration.sandbox_policy.set(sandbox_policy)?; + next_configuration.network_sandbox_policy = + NetworkSandboxPolicy::from(next_configuration.sandbox_policy.get()); + sandbox_policy_changed = true; + } + if let Some(windows_sandbox_level) = updates.windows_sandbox_level { + next_configuration.windows_sandbox_level = windows_sandbox_level; + } + + let absolute_cwd = updates + .cwd + .as_ref() + .map(|cwd| { + AbsolutePathBuf::relative_to_current_dir(normalize_for_native_workdir( + cwd.as_path(), + )) + .unwrap_or_else(|e| { + warn!("failed to normalize update cwd: {cwd:?}: {e}"); + self.cwd.clone() + }) + }) + .unwrap_or_else(|| self.cwd.clone()); + + let cwd_changed = absolute_cwd.as_path() != self.cwd.as_path(); + next_configuration.cwd = absolute_cwd; + if sandbox_policy_changed { + next_configuration.file_system_sandbox_policy = + FileSystemSandboxPolicy::from_legacy_sandbox_policy_preserving_deny_entries( + next_configuration.sandbox_policy.get(), + &next_configuration.cwd, + &self.file_system_sandbox_policy, + ); + } else if cwd_changed && file_system_policy_matches_legacy { + // Preserve richer split policies across cwd-only updates; only + // rederive when the session is already using the legacy bridge. + next_configuration.file_system_sandbox_policy = + FileSystemSandboxPolicy::from_legacy_sandbox_policy( + next_configuration.sandbox_policy.get(), + &next_configuration.cwd, + ); + } + if let Some(app_server_client_name) = updates.app_server_client_name.clone() { + next_configuration.app_server_client_name = Some(app_server_client_name); + } + if let Some(app_server_client_version) = updates.app_server_client_version.clone() { + next_configuration.app_server_client_version = Some(app_server_client_version); + } + Ok(next_configuration) + } +} + +#[derive(Default, Clone)] +pub(crate) struct SessionSettingsUpdate { + pub(crate) cwd: Option, + pub(crate) approval_policy: Option, + pub(crate) approvals_reviewer: Option, + pub(crate) sandbox_policy: Option, + pub(crate) windows_sandbox_level: Option, + pub(crate) collaboration_mode: Option, + pub(crate) reasoning_summary: Option, + pub(crate) service_tier: Option>, + pub(crate) final_output_json_schema: Option>, + pub(crate) personality: Option, + pub(crate) app_server_client_name: Option, + pub(crate) app_server_client_version: Option, +} + +pub(crate) struct AppServerClientMetadata { + pub(crate) client_name: Option, + pub(crate) client_version: Option, +} + +impl Session { + #[instrument(name = "session_init", level = "info", skip_all)] + #[allow(clippy::too_many_arguments)] + pub(crate) async fn new( + mut session_configuration: SessionConfiguration, + config: Arc, + auth_manager: Arc, + models_manager: Arc, + exec_policy: Arc, + tx_event: Sender, + agent_status: watch::Sender, + initial_history: InitialHistory, + session_source: SessionSource, + skills_manager: Arc, + plugins_manager: Arc, + mcp_manager: Arc, + skills_watcher: Arc, + agent_control: AgentControl, + environment: Option>, + analytics_events_client: Option, + ) -> anyhow::Result> { + debug!( + "Configuring session: model={}; provider={:?}", + session_configuration.collaboration_mode.model(), + session_configuration.provider + ); + let forked_from_id = initial_history.forked_from_id(); + + let (conversation_id, rollout_params) = match &initial_history { + InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => { + let conversation_id = ThreadId::default(); + ( + conversation_id, + RolloutRecorderParams::new( + conversation_id, + forked_from_id, + session_source, + BaseInstructions { + text: session_configuration.base_instructions.clone(), + }, + session_configuration.dynamic_tools.clone(), + if session_configuration.persist_extended_history { + EventPersistenceMode::Extended + } else { + EventPersistenceMode::Limited + }, + ), + ) + } + InitialHistory::Resumed(resumed_history) => ( + resumed_history.conversation_id, + RolloutRecorderParams::resume( + resumed_history.rollout_path.clone(), + if session_configuration.persist_extended_history { + EventPersistenceMode::Extended + } else { + EventPersistenceMode::Limited + }, + ), + ), + }; + let window_generation = match &initial_history { + InitialHistory::Resumed(resumed_history) => u64::try_from( + resumed_history + .history + .iter() + .filter(|item| matches!(item, RolloutItem::Compacted(_))) + .count(), + ) + .unwrap_or(u64::MAX), + InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => 0, + }; + let state_builder = match &initial_history { + InitialHistory::Resumed(resumed) => metadata::builder_from_items( + resumed.history.as_slice(), + resumed.rollout_path.as_path(), + ), + InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None, + }; + + // Kick off independent async setup tasks in parallel to reduce startup latency. + // + // - initialize RolloutRecorder with new or resumed session info + // - perform default shell discovery + // - load history metadata (skipped for subagents) + let rollout_fut = async { + if config.ephemeral { + Ok::<_, anyhow::Error>((None, None)) + } else { + let state_db_ctx = state_db::init(&config).await; + let rollout_recorder = RolloutRecorder::new( + &config, + rollout_params, + state_db_ctx.clone(), + state_builder.clone(), + ) + .await?; + Ok((Some(rollout_recorder), state_db_ctx)) + } + } + .instrument(info_span!( + "session_init.rollout", + otel.name = "session_init.rollout", + session_init.ephemeral = config.ephemeral, + )); + + let is_subagent = matches!( + session_configuration.session_source, + SessionSource::SubAgent(_) + ); + let history_meta_fut = async { + if is_subagent { + (0, 0) + } else { + crate::message_history::history_metadata(&config).await + } + } + .instrument(info_span!( + "session_init.history_metadata", + otel.name = "session_init.history_metadata", + session_init.is_subagent = is_subagent, + )); + let auth_manager_clone = Arc::clone(&auth_manager); + let config_for_mcp = Arc::clone(&config); + let mcp_manager_for_mcp = Arc::clone(&mcp_manager); + let auth_and_mcp_fut = async move { + let auth = auth_manager_clone.auth().await; + let mcp_servers = mcp_manager_for_mcp + .effective_servers(&config_for_mcp, auth.as_ref()) + .await; + let auth_statuses = compute_auth_statuses( + mcp_servers.iter(), + config_for_mcp.mcp_oauth_credentials_store_mode, + ) + .await; + (auth, mcp_servers, auth_statuses) + } + .instrument(info_span!( + "session_init.auth_mcp", + otel.name = "session_init.auth_mcp", + )); + + // Join all independent futures. + let ( + rollout_recorder_and_state_db, + (history_log_id, history_entry_count), + (auth, mcp_servers, auth_statuses), + ) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut); + + let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| { + error!("failed to initialize rollout recorder: {e:#}"); + e + })?; + let rollout_path = rollout_recorder + .as_ref() + .map(|rec| rec.rollout_path().to_path_buf()); + + let mut post_session_configured_events = Vec::::new(); + + for usage in config.features.legacy_feature_usages() { + post_session_configured_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent { + summary: usage.summary.clone(), + details: usage.details.clone(), + }), + }); + } + if crate::config::uses_deprecated_instructions_file(&config.config_layer_stack) { + post_session_configured_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent { + summary: "`experimental_instructions_file` is deprecated and ignored. Use `model_instructions_file` instead." + .to_string(), + details: Some( + "Move the setting to `model_instructions_file` in config.toml (or under a profile) to load instructions from a file." + .to_string(), + ), + }), + }); + } + for message in &config.startup_warnings { + post_session_configured_events.push(Event { + id: "".to_owned(), + msg: EventMsg::Warning(WarningEvent { + message: message.clone(), + }), + }); + } + let config_path = config.codex_home.join(CONFIG_TOML_FILE); + if let Some(event) = unstable_features_warning_event( + config + .config_layer_stack + .effective_config() + .get("features") + .and_then(TomlValue::as_table), + config.suppress_unstable_features_warning, + &config.features, + &config_path.display().to_string(), + ) { + post_session_configured_events.push(event); + } + if config.permissions.approval_policy.value() == AskForApproval::OnFailure { + post_session_configured_events.push(Event { + id: "".to_owned(), + msg: EventMsg::Warning(WarningEvent { + message: "`on-failure` approval policy is deprecated and will be removed in a future release. Use `on-request` for interactive approvals or `never` for non-interactive runs.".to_string(), + }), + }); + } + + let auth = auth.as_ref(); + let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); + let account_id = auth.and_then(CodexAuth::get_account_id); + let account_email = auth.and_then(CodexAuth::get_account_email); + let originator = originator().value; + let terminal_type = user_agent(); + let session_model = session_configuration.collaboration_mode.model().to_string(); + let auth_env_telemetry = collect_auth_env_telemetry( + &session_configuration.provider, + auth_manager.codex_api_key_env_enabled(), + ); + let mut session_telemetry = SessionTelemetry::new( + conversation_id, + session_model.as_str(), + session_model.as_str(), + account_id.clone(), + account_email.clone(), + auth_mode, + originator.clone(), + config.otel.log_user_prompt, + terminal_type.clone(), + session_configuration.session_source.clone(), + ) + .with_auth_env(auth_env_telemetry.to_otel_metadata()); + if let Some(service_name) = session_configuration.metrics_service_name.as_deref() { + session_telemetry = session_telemetry.with_metrics_service_name(service_name); + } + let network_proxy_audit_metadata = NetworkProxyAuditMetadata { + conversation_id: Some(conversation_id.to_string()), + app_version: Some(env!("CARGO_PKG_VERSION").to_string()), + user_account_id: account_id, + auth_mode: auth_mode.map(|mode| mode.to_string()), + originator: Some(originator), + user_email: account_email, + terminal_type: Some(terminal_type), + model: Some(session_model.clone()), + slug: Some(session_model), + }; + config.features.emit_metrics(&session_telemetry); + session_telemetry.counter( + THREAD_STARTED_METRIC, + /*inc*/ 1, + &[( + "is_git", + if get_git_repo_root(&session_configuration.cwd).is_some() { + "true" + } else { + "false" + }, + )], + ); + + session_telemetry.conversation_starts( + config.model_provider.name.as_str(), + session_configuration.collaboration_mode.reasoning_effort(), + config + .model_reasoning_summary + .unwrap_or(ReasoningSummaryConfig::Auto), + config.model_context_window, + config.model_auto_compact_token_limit, + config.permissions.approval_policy.value(), + config.permissions.sandbox_policy.get().clone(), + mcp_servers.keys().map(String::as_str).collect(), + config.active_profile.clone(), + ); + + let use_zsh_fork_shell = config.features.enabled(Feature::ShellZshFork); + let mut default_shell = if let Some(user_shell_override) = + session_configuration.user_shell_override.clone() + { + user_shell_override + } else if use_zsh_fork_shell { + let zsh_path = config.zsh_path.as_ref().ok_or_else(|| { + anyhow::anyhow!( + "zsh fork feature enabled, but `zsh_path` is not configured; set `zsh_path` in config.toml" + ) + })?; + let zsh_path = zsh_path.to_path_buf(); + shell::get_shell(shell::ShellType::Zsh, Some(&zsh_path)).ok_or_else(|| { + anyhow::anyhow!( + "zsh fork feature enabled, but zsh_path `{}` is not usable; set `zsh_path` to a valid zsh executable", + zsh_path.display() + ) + })? + } else { + shell::default_user_shell() + }; + // Create the mutable state for the Session. + let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) { + if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() { + let (tx, rx) = watch::channel(Some(snapshot)); + default_shell.shell_snapshot = rx; + tx + } else { + ShellSnapshot::start_snapshotting( + config.codex_home.clone(), + conversation_id, + session_configuration.cwd.clone(), + &mut default_shell, + session_telemetry.clone(), + ) + } + } else { + let (tx, rx) = watch::channel(None); + default_shell.shell_snapshot = rx; + tx + }; + let thread_name = + thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, conversation_id) + .instrument(info_span!( + "session_init.thread_name_lookup", + otel.name = "session_init.thread_name_lookup", + )) + .await; + session_configuration.thread_name = thread_name.clone(); + let state = SessionState::new(session_configuration.clone()); + let managed_network_requirements_configured = config + .config_layer_stack + .requirements_toml() + .network + .is_some(); + let managed_network_requirements_enabled = config.managed_network_requirements_enabled(); + let network_approval = Arc::new(NetworkApprovalService::default()); + // The managed proxy can call back into core for allowlist-miss decisions. + let network_policy_decider_session = if managed_network_requirements_configured { + config + .permissions + .network + .as_ref() + .map(|_| Arc::new(RwLock::new(std::sync::Weak::::new()))) + } else { + None + }; + let blocked_request_observer = if managed_network_requirements_configured { + config + .permissions + .network + .as_ref() + .map(|_| build_blocked_request_observer(Arc::clone(&network_approval))) + } else { + None + }; + let network_policy_decider = + network_policy_decider_session + .as_ref() + .map(|network_policy_decider_session| { + build_network_policy_decider( + Arc::clone(&network_approval), + Arc::clone(network_policy_decider_session), + ) + }); + let (network_proxy, session_network_proxy) = + if let Some(spec) = config.permissions.network.as_ref() { + let current_exec_policy = exec_policy.current(); + let (network_proxy, session_network_proxy) = Self::start_managed_network_proxy( + spec, + current_exec_policy.as_ref(), + config.permissions.sandbox_policy.get(), + network_policy_decider.as_ref().map(Arc::clone), + blocked_request_observer.as_ref().map(Arc::clone), + managed_network_requirements_configured, + network_proxy_audit_metadata, + ) + .instrument(info_span!( + "session_init.network_proxy", + otel.name = "session_init.network_proxy", + session_init.managed_network_requirements_enabled = + managed_network_requirements_enabled, + )) + .await?; + (Some(network_proxy), Some(session_network_proxy)) + } else { + (None, None) + }; + + let mut hook_shell_argv = + default_shell.derive_exec_args("", /*use_login_shell*/ false); + let hook_shell_program = hook_shell_argv.remove(0); + let _ = hook_shell_argv.pop(); + let hooks = Hooks::new(HooksConfig { + legacy_notify_argv: config.notify.clone(), + feature_enabled: config.features.enabled(Feature::CodexHooks), + config_layer_stack: Some(config.config_layer_stack.clone()), + shell_program: Some(hook_shell_program), + shell_args: hook_shell_argv, + }); + for warning in hooks.startup_warnings() { + post_session_configured_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Warning(WarningEvent { + message: warning.clone(), + }), + }); + } + + let installation_id = resolve_installation_id(&config.codex_home).await?; + let analytics_events_client = analytics_events_client.unwrap_or_else(|| { + AnalyticsEventsClient::new( + Arc::clone(&auth_manager), + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + ) + }); + let services = SessionServices { + // Initialize the MCP connection manager with an uninitialized + // instance. It will be replaced with one created via + // McpConnectionManager::new() once all its constructor args are + // available. This also ensures `SessionConfigured` is emitted + // before any MCP-related events. It is reasonable to consider + // changing this to use Option or OnceCell, though the current + // setup is straightforward enough and performs well. + mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( + &config.permissions.approval_policy, + &config.permissions.sandbox_policy, + ))), + mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), + unified_exec_manager: UnifiedExecProcessManager::new( + config.background_terminal_max_timeout, + ), + shell_zsh_path: config.zsh_path.clone(), + main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(), + analytics_events_client, + hooks, + rollout: Mutex::new(rollout_recorder), + user_shell: Arc::new(default_shell), + agent_identity_manager: Arc::new(AgentIdentityManager::new( + config.as_ref(), + Arc::clone(&auth_manager), + session_configuration.session_source.clone(), + )), + shell_snapshot_tx, + show_raw_agent_reasoning: config.show_raw_agent_reasoning, + exec_policy, + auth_manager: Arc::clone(&auth_manager), + session_telemetry, + models_manager: Arc::clone(&models_manager), + tool_approvals: Mutex::new(ApprovalStore::default()), + guardian_rejections: Mutex::new(HashMap::new()), + skills_manager, + plugins_manager: Arc::clone(&plugins_manager), + mcp_manager: Arc::clone(&mcp_manager), + skills_watcher, + agent_control, + network_proxy, + network_approval: Arc::clone(&network_approval), + state_db: state_db_ctx.clone(), + thread_store: LocalThreadStore::new(RolloutConfig::from_view(config.as_ref())), + model_client: ModelClient::new( + Some(Arc::clone(&auth_manager)), + conversation_id, + installation_id, + session_configuration.provider.clone(), + session_configuration.session_source.clone(), + config.model_verbosity, + config.features.enabled(Feature::EnableRequestCompression), + config.features.enabled(Feature::RuntimeMetrics), + Self::build_model_client_beta_features_header(config.as_ref()), + ), + code_mode_service: crate::tools::code_mode::CodeModeService::new( + config.js_repl_node_path.clone(), + ), + environment, + }; + services + .model_client + .set_window_generation(window_generation); + let js_repl = Arc::new(JsReplHandle::with_node_path( + config.js_repl_node_path.clone(), + config.js_repl_node_module_dirs.clone(), + )); + let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) = + watch::channel(false); + + let (mailbox, mailbox_rx) = Mailbox::new(); + let sess = Arc::new(Session { + conversation_id, + tx_event: tx_event.clone(), + agent_status, + out_of_band_elicitation_paused, + state: Mutex::new(state), + managed_network_proxy_refresh_lock: Mutex::new(()), + features: config.features.clone(), + pending_mcp_server_refresh_config: Mutex::new(None), + conversation: Arc::new(RealtimeConversationManager::new()), + active_turn: Mutex::new(None), + mailbox, + mailbox_rx: Mutex::new(mailbox_rx), + idle_pending_input: Mutex::new(Vec::new()), + guardian_review_session: GuardianReviewSessionManager::default(), + services, + js_repl, + next_internal_sub_id: AtomicU64::new(0), + }); + if let Some(network_policy_decider_session) = network_policy_decider_session { + let mut guard = network_policy_decider_session.write().await; + *guard = Arc::downgrade(&sess); + } + // Dispatch the SessionConfiguredEvent first and then report any errors. + // If resuming, include converted initial messages in the payload so UIs can render them immediately. + let initial_messages = initial_history.get_event_msgs(); + let events = std::iter::once(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: conversation_id, + forked_from_id, + thread_name: session_configuration.thread_name.clone(), + model: session_configuration.collaboration_mode.model().to_string(), + model_provider_id: config.model_provider_id.clone(), + service_tier: session_configuration.service_tier, + approval_policy: session_configuration.approval_policy.value(), + approvals_reviewer: session_configuration.approvals_reviewer, + sandbox_policy: session_configuration.sandbox_policy.get().clone(), + cwd: session_configuration.cwd.clone(), + reasoning_effort: session_configuration.collaboration_mode.reasoning_effort(), + history_log_id, + history_entry_count, + initial_messages, + network_proxy: session_network_proxy.filter(|_| { + Self::managed_network_proxy_active_for_sandbox_policy( + session_configuration.sandbox_policy.get(), + ) + }), + rollout_path, + }), + }) + .chain(post_session_configured_events.into_iter()); + for event in events { + sess.send_event_raw(event).await; + } + + // Start the watcher after SessionConfigured so it cannot emit earlier events. + sess.start_skills_watcher_listener(); + sess.start_agent_identity_registration(); + let mut required_mcp_servers: Vec = mcp_servers + .iter() + .filter(|(_, server)| server.enabled && server.required) + .map(|(name, _)| name.clone()) + .collect(); + required_mcp_servers.sort(); + let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count(); + let required_mcp_server_count = required_mcp_servers.len(); + let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await; + { + let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; + cancel_guard.cancel(); + *cancel_guard = CancellationToken::new(); + } + let (mcp_connection_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + config.mcp_oauth_credentials_store_mode, + auth_statuses.clone(), + &session_configuration.approval_policy, + INITIAL_SUBMIT_ID.to_owned(), + tx_event.clone(), + session_configuration.sandbox_policy.get().clone(), + config.codex_home.to_path_buf(), + codex_apps_tools_cache_key(auth), + tool_plugin_provenance, + ) + .instrument(info_span!( + "session_init.mcp_manager_init", + otel.name = "session_init.mcp_manager_init", + session_init.enabled_mcp_server_count = enabled_mcp_server_count, + session_init.required_mcp_server_count = required_mcp_server_count, + )) + .await; + { + let mut manager_guard = sess.services.mcp_connection_manager.write().await; + *manager_guard = mcp_connection_manager; + } + { + let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; + if cancel_guard.is_cancelled() { + cancel_token.cancel(); + } + *cancel_guard = cancel_token; + } + if !required_mcp_servers.is_empty() { + let failures = sess + .services + .mcp_connection_manager + .read() + .await + .required_startup_failures(&required_mcp_servers) + .instrument(info_span!( + "session_init.required_mcp_wait", + otel.name = "session_init.required_mcp_wait", + session_init.required_mcp_server_count = required_mcp_server_count, + )) + .await; + if !failures.is_empty() { + let details = failures + .iter() + .map(|failure| format!("{}: {}", failure.server, failure.error)) + .collect::>() + .join("; "); + return Err(anyhow::anyhow!( + "required MCP servers failed to initialize: {details}" + )); + } + } + sess.schedule_startup_prewarm(session_configuration.base_instructions.clone()) + .await; + let session_start_source = match &initial_history { + InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume, + InitialHistory::New | InitialHistory::Forked(_) => { + codex_hooks::SessionStartSource::Startup + } + InitialHistory::Cleared => codex_hooks::SessionStartSource::Clear, + }; + + // record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted. + sess.record_initial_history(initial_history).await; + { + let mut state = sess.state.lock().await; + state.set_pending_session_start_source(Some(session_start_source)); + } + + memories::start_memories_startup_task( + &sess, + Arc::clone(&config), + &session_configuration.session_source, + ); + + Ok(sess) + } +} diff --git a/codex-rs/core/src/codex/turn_context.rs b/codex-rs/core/src/codex/turn_context.rs new file mode 100644 index 0000000000..6e5bf22c4b --- /dev/null +++ b/codex-rs/core/src/codex/turn_context.rs @@ -0,0 +1,615 @@ +use super::*; + +pub(super) fn image_generation_tool_auth_allowed(auth_manager: Option<&AuthManager>) -> bool { + matches!( + auth_manager.and_then(AuthManager::auth_mode), + Some(AuthMode::Chatgpt) + ) +} + +#[derive(Clone, Debug)] +pub(crate) struct TurnSkillsContext { + pub(crate) outcome: Arc, + pub(crate) implicit_invocation_seen_skills: Arc>>, +} + +impl TurnSkillsContext { + pub(crate) fn new(outcome: Arc) -> Self { + Self { + outcome, + implicit_invocation_seen_skills: Arc::new(Mutex::new(HashSet::new())), + } + } +} + +/// The context needed for a single turn of the thread. +#[derive(Debug)] +pub(crate) struct TurnContext { + pub(crate) sub_id: String, + pub(crate) trace_id: Option, + pub(crate) realtime_active: bool, + pub(crate) config: Arc, + pub(crate) auth_manager: Option>, + pub(crate) model_info: ModelInfo, + pub(crate) session_telemetry: SessionTelemetry, + pub(crate) provider: ModelProviderInfo, + pub(crate) reasoning_effort: Option, + pub(crate) reasoning_summary: ReasoningSummaryConfig, + pub(crate) session_source: SessionSource, + pub(crate) environment: Option>, + /// The session's absolute working directory. All relative paths provided + /// by the model as well as sandbox policies are resolved against this path + /// instead of `std::env::current_dir()`. + pub(crate) cwd: AbsolutePathBuf, + pub(crate) current_date: Option, + pub(crate) timezone: Option, + pub(crate) app_server_client_name: Option, + pub(crate) developer_instructions: Option, + pub(crate) compact_prompt: Option, + pub(crate) user_instructions: Option, + pub(crate) collaboration_mode: CollaborationMode, + pub(crate) personality: Option, + pub(crate) approval_policy: Constrained, + pub(crate) sandbox_policy: Constrained, + pub(crate) file_system_sandbox_policy: FileSystemSandboxPolicy, + pub(crate) network_sandbox_policy: NetworkSandboxPolicy, + pub(crate) network: Option, + pub(crate) windows_sandbox_level: WindowsSandboxLevel, + pub(crate) shell_environment_policy: ShellEnvironmentPolicy, + pub(crate) tools_config: ToolsConfig, + pub(crate) features: ManagedFeatures, + pub(crate) ghost_snapshot: GhostSnapshotConfig, + pub(crate) final_output_json_schema: Option, + pub(crate) codex_self_exe: Option, + pub(crate) codex_linux_sandbox_exe: Option, + pub(crate) tool_call_gate: Arc, + pub(crate) truncation_policy: TruncationPolicy, + pub(crate) js_repl: Arc, + pub(crate) dynamic_tools: Vec, + pub(crate) turn_metadata_state: Arc, + pub(crate) turn_skills: TurnSkillsContext, + pub(crate) turn_timing_state: Arc, +} +impl TurnContext { + pub(crate) fn model_context_window(&self) -> Option { + let effective_context_window_percent = self.model_info.effective_context_window_percent; + self.model_info.context_window.map(|context_window| { + context_window.saturating_mul(effective_context_window_percent) / 100 + }) + } + + pub(crate) fn apps_enabled(&self) -> bool { + let is_chatgpt_auth = self + .auth_manager + .as_deref() + .and_then(AuthManager::auth_cached) + .as_ref() + .is_some_and(CodexAuth::is_chatgpt_auth); + self.features.apps_enabled_for_auth(is_chatgpt_auth) + } + + pub(crate) async fn with_model(&self, model: String, models_manager: &ModelsManager) -> Self { + let mut config = (*self.config).clone(); + config.model = Some(model.clone()); + let model_info = models_manager + .get_model_info(model.as_str(), &config.to_models_manager_config()) + .await; + let truncation_policy = model_info.truncation_policy.into(); + let supported_reasoning_levels = model_info + .supported_reasoning_levels + .iter() + .map(|preset| preset.effort) + .collect::>(); + let reasoning_effort = if let Some(current_reasoning_effort) = self.reasoning_effort { + if supported_reasoning_levels.contains(¤t_reasoning_effort) { + Some(current_reasoning_effort) + } else { + supported_reasoning_levels + .get(supported_reasoning_levels.len().saturating_sub(1) / 2) + .copied() + .or(model_info.default_reasoning_level) + } + } else { + supported_reasoning_levels + .get(supported_reasoning_levels.len().saturating_sub(1) / 2) + .copied() + .or(model_info.default_reasoning_level) + }; + config.model_reasoning_effort = reasoning_effort; + + let collaboration_mode = self.collaboration_mode.with_updates( + Some(model.clone()), + Some(reasoning_effort), + /*developer_instructions*/ None, + ); + let features = self.features.clone(); + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_info: &model_info, + available_models: &models_manager + .list_models(RefreshStrategy::OnlineIfUncached) + .await, + features: &features, + image_generation_tool_auth_allowed: image_generation_tool_auth_allowed( + self.auth_manager.as_deref(), + ), + web_search_mode: self.tools_config.web_search_mode, + session_source: self.session_source.clone(), + sandbox_policy: self.sandbox_policy.get(), + windows_sandbox_level: self.windows_sandbox_level, + }) + .with_unified_exec_shell_mode(self.tools_config.unified_exec_shell_mode.clone()) + .with_web_search_config(self.tools_config.web_search_config.clone()) + .with_allow_login_shell(self.tools_config.allow_login_shell) + .with_has_environment(self.tools_config.has_environment) + .with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled) + .with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone()) + .with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata) + .with_agent_type_description(crate::agent::role::spawn_tool_spec::build( + &config.agent_roles, + )); + + Self { + sub_id: self.sub_id.clone(), + trace_id: self.trace_id.clone(), + realtime_active: self.realtime_active, + config: Arc::new(config), + auth_manager: self.auth_manager.clone(), + model_info: model_info.clone(), + session_telemetry: self + .session_telemetry + .clone() + .with_model(model.as_str(), model_info.slug.as_str()), + provider: self.provider.clone(), + reasoning_effort, + reasoning_summary: self.reasoning_summary, + session_source: self.session_source.clone(), + environment: self.environment.clone(), + cwd: self.cwd.clone(), + current_date: self.current_date.clone(), + timezone: self.timezone.clone(), + app_server_client_name: self.app_server_client_name.clone(), + developer_instructions: self.developer_instructions.clone(), + compact_prompt: self.compact_prompt.clone(), + user_instructions: self.user_instructions.clone(), + collaboration_mode, + personality: self.personality, + approval_policy: self.approval_policy.clone(), + sandbox_policy: self.sandbox_policy.clone(), + file_system_sandbox_policy: self.file_system_sandbox_policy.clone(), + network_sandbox_policy: self.network_sandbox_policy, + network: self.network.clone(), + windows_sandbox_level: self.windows_sandbox_level, + shell_environment_policy: self.shell_environment_policy.clone(), + tools_config, + features, + ghost_snapshot: self.ghost_snapshot.clone(), + final_output_json_schema: self.final_output_json_schema.clone(), + codex_self_exe: self.codex_self_exe.clone(), + codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), + tool_call_gate: Arc::new(ReadinessFlag::new()), + truncation_policy, + js_repl: Arc::clone(&self.js_repl), + dynamic_tools: self.dynamic_tools.clone(), + turn_metadata_state: self.turn_metadata_state.clone(), + turn_skills: self.turn_skills.clone(), + turn_timing_state: Arc::clone(&self.turn_timing_state), + } + } + + pub(crate) fn resolve_path(&self, path: Option) -> AbsolutePathBuf { + path.as_ref() + .map_or_else(|| self.cwd.clone(), |path| self.cwd.join(path)) + } + + pub(crate) fn file_system_sandbox_context( + &self, + additional_permissions: Option, + ) -> FileSystemSandboxContext { + FileSystemSandboxContext { + sandbox_policy: self.sandbox_policy.get().clone(), + windows_sandbox_level: self.windows_sandbox_level, + windows_sandbox_private_desktop: self + .config + .permissions + .windows_sandbox_private_desktop, + use_legacy_landlock: self.features.use_legacy_landlock(), + additional_permissions, + } + } + + pub(crate) fn compact_prompt(&self) -> &str { + self.compact_prompt + .as_deref() + .unwrap_or(compact::SUMMARIZATION_PROMPT) + } + + pub(crate) fn to_turn_context_item(&self) -> TurnContextItem { + let legacy_file_system_sandbox_policy = FileSystemSandboxPolicy::from_legacy_sandbox_policy( + self.sandbox_policy.get(), + &self.cwd, + ); + // Omit the derived split filesystem policy when it is equivalent to + // the legacy sandbox policy. This keeps turn-context payloads stable + // while both fields exist; once callers consume only the split policy, + // this comparison and the legacy projection should go away. + let file_system_sandbox_policy = (self.file_system_sandbox_policy + != legacy_file_system_sandbox_policy) + .then(|| self.file_system_sandbox_policy.clone()); + + TurnContextItem { + turn_id: Some(self.sub_id.clone()), + trace_id: self.trace_id.clone(), + cwd: self.cwd.to_path_buf(), + current_date: self.current_date.clone(), + timezone: self.timezone.clone(), + approval_policy: self.approval_policy.value(), + sandbox_policy: self.sandbox_policy.get().clone(), + network: self.turn_context_network_item(), + file_system_sandbox_policy, + model: self.model_info.slug.clone(), + personality: self.personality, + collaboration_mode: Some(self.collaboration_mode.clone()), + realtime_active: Some(self.realtime_active), + effort: self.reasoning_effort, + summary: self.reasoning_summary, + user_instructions: self.user_instructions.clone(), + developer_instructions: self.developer_instructions.clone(), + final_output_json_schema: self.final_output_json_schema.clone(), + truncation_policy: Some(self.truncation_policy), + } + } + + fn turn_context_network_item(&self) -> Option { + let network = self + .config + .config_layer_stack + .requirements() + .network + .as_ref()?; + Some(TurnContextNetworkItem { + allowed_domains: network + .domains + .as_ref() + .and_then(codex_config::NetworkDomainPermissionsToml::allowed_domains) + .unwrap_or_default(), + denied_domains: network + .domains + .as_ref() + .and_then(codex_config::NetworkDomainPermissionsToml::denied_domains) + .unwrap_or_default(), + }) + } +} + +fn local_time_context() -> (String, String) { + match iana_time_zone::get_timezone() { + Ok(timezone) => (Local::now().format("%Y-%m-%d").to_string(), timezone), + Err(_) => ( + Utc::now().format("%Y-%m-%d").to_string(), + "Etc/UTC".to_string(), + ), + } +} + +impl Session { + /// Don't expand the number of mutated arguments on config. We are in the process of getting rid of it. + pub(crate) fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config { + // todo(aibrahim): store this state somewhere else so we don't need to mut config + let config = session_configuration.original_config_do_not_use.clone(); + let mut per_turn_config = (*config).clone(); + per_turn_config.cwd = session_configuration.cwd.clone(); + per_turn_config.model_reasoning_effort = + session_configuration.collaboration_mode.reasoning_effort(); + per_turn_config.model_reasoning_summary = session_configuration.model_reasoning_summary; + per_turn_config.service_tier = session_configuration.service_tier; + per_turn_config.personality = session_configuration.personality; + per_turn_config.approvals_reviewer = session_configuration.approvals_reviewer; + let resolved_web_search_mode = resolve_web_search_mode_for_turn( + &per_turn_config.web_search_mode, + session_configuration.sandbox_policy.get(), + ); + if let Err(err) = per_turn_config + .web_search_mode + .set(resolved_web_search_mode) + { + let fallback_value = per_turn_config.web_search_mode.value(); + tracing::warn!( + error = %err, + ?resolved_web_search_mode, + ?fallback_value, + "resolved web_search_mode is disallowed by requirements; keeping constrained value" + ); + } + per_turn_config.features = config.features.clone(); + per_turn_config + } + + #[allow(clippy::too_many_arguments)] + pub(crate) fn make_turn_context( + conversation_id: ThreadId, + auth_manager: Option>, + session_telemetry: &SessionTelemetry, + provider: ModelProviderInfo, + session_configuration: &SessionConfiguration, + user_shell: &shell::Shell, + shell_zsh_path: Option<&PathBuf>, + main_execve_wrapper_exe: Option<&PathBuf>, + per_turn_config: Config, + model_info: ModelInfo, + models_manager: &ModelsManager, + network: Option, + environment: Option>, + sub_id: String, + js_repl: Arc, + skills_outcome: Arc, + ) -> TurnContext { + let reasoning_effort = session_configuration.collaboration_mode.reasoning_effort(); + let reasoning_summary = session_configuration + .model_reasoning_summary + .unwrap_or(model_info.default_reasoning_summary); + let session_telemetry = session_telemetry.clone().with_model( + session_configuration.collaboration_mode.model(), + model_info.slug.as_str(), + ); + let session_source = session_configuration.session_source.clone(); + let image_generation_tool_auth_allowed = + image_generation_tool_auth_allowed(auth_manager.as_deref()); + let auth_manager_for_context = auth_manager; + let provider_for_context = provider; + let session_telemetry_for_context = session_telemetry; + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_info: &model_info, + available_models: &models_manager.try_list_models().unwrap_or_default(), + features: &per_turn_config.features, + image_generation_tool_auth_allowed, + web_search_mode: Some(per_turn_config.web_search_mode.value()), + session_source: session_source.clone(), + sandbox_policy: session_configuration.sandbox_policy.get(), + windows_sandbox_level: session_configuration.windows_sandbox_level, + }) + .with_unified_exec_shell_mode_for_session( + crate::tools::spec::tool_user_shell_type(user_shell), + shell_zsh_path, + main_execve_wrapper_exe, + ) + .with_web_search_config(per_turn_config.web_search_config.clone()) + .with_allow_login_shell(per_turn_config.permissions.allow_login_shell) + .with_has_environment(environment.is_some()) + .with_spawn_agent_usage_hint(per_turn_config.multi_agent_v2.usage_hint_enabled) + .with_spawn_agent_usage_hint_text(per_turn_config.multi_agent_v2.usage_hint_text.clone()) + .with_hide_spawn_agent_metadata(per_turn_config.multi_agent_v2.hide_spawn_agent_metadata) + .with_agent_type_description(crate::agent::role::spawn_tool_spec::build( + &per_turn_config.agent_roles, + )); + + let cwd = session_configuration.cwd.clone(); + + let per_turn_config = Arc::new(per_turn_config); + let turn_metadata_state = Arc::new(TurnMetadataState::new( + conversation_id.to_string(), + &session_source, + sub_id.clone(), + cwd.clone(), + session_configuration.sandbox_policy.get(), + session_configuration.windows_sandbox_level, + )); + let (current_date, timezone) = local_time_context(); + TurnContext { + sub_id, + trace_id: current_span_trace_id(), + realtime_active: false, + config: per_turn_config.clone(), + auth_manager: auth_manager_for_context, + model_info: model_info.clone(), + session_telemetry: session_telemetry_for_context, + provider: provider_for_context, + reasoning_effort, + reasoning_summary, + session_source, + environment, + cwd, + current_date: Some(current_date), + timezone: Some(timezone), + app_server_client_name: session_configuration.app_server_client_name.clone(), + developer_instructions: session_configuration.developer_instructions.clone(), + compact_prompt: session_configuration.compact_prompt.clone(), + user_instructions: session_configuration.user_instructions.clone(), + collaboration_mode: session_configuration.collaboration_mode.clone(), + personality: session_configuration.personality, + approval_policy: session_configuration.approval_policy.clone(), + sandbox_policy: session_configuration.sandbox_policy.clone(), + file_system_sandbox_policy: session_configuration.file_system_sandbox_policy.clone(), + network_sandbox_policy: session_configuration.network_sandbox_policy, + network, + windows_sandbox_level: session_configuration.windows_sandbox_level, + shell_environment_policy: per_turn_config.permissions.shell_environment_policy.clone(), + tools_config, + features: per_turn_config.features.clone(), + ghost_snapshot: per_turn_config.ghost_snapshot.clone(), + final_output_json_schema: None, + codex_self_exe: per_turn_config.codex_self_exe.clone(), + codex_linux_sandbox_exe: per_turn_config.codex_linux_sandbox_exe.clone(), + tool_call_gate: Arc::new(ReadinessFlag::new()), + truncation_policy: model_info.truncation_policy.into(), + js_repl, + dynamic_tools: session_configuration.dynamic_tools.clone(), + turn_metadata_state, + turn_skills: TurnSkillsContext::new(skills_outcome), + turn_timing_state: Arc::new(TurnTimingState::default()), + } + } + + pub(crate) async fn new_turn_with_sub_id( + &self, + sub_id: String, + updates: SessionSettingsUpdate, + ) -> ConstraintResult> { + let ( + session_configuration, + sandbox_policy_changed, + previous_cwd, + codex_home, + session_source, + ) = { + let mut state = self.state.lock().await; + match state.session_configuration.clone().apply(&updates) { + Ok(next) => { + let previous_cwd = state.session_configuration.cwd.clone(); + let sandbox_policy_changed = + state.session_configuration.sandbox_policy != next.sandbox_policy; + let codex_home = next.codex_home.clone(); + let session_source = next.session_source.clone(); + state.session_configuration = next.clone(); + ( + next, + sandbox_policy_changed, + previous_cwd, + codex_home, + session_source, + ) + } + Err(err) => { + drop(state); + self.send_event_raw(Event { + id: sub_id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: err.to_string(), + codex_error_info: Some(CodexErrorInfo::BadRequest), + }), + }) + .await; + return Err(err); + } + } + }; + + self.maybe_refresh_shell_snapshot_for_cwd( + &previous_cwd, + &session_configuration.cwd, + &codex_home, + &session_source, + ); + + if sandbox_policy_changed { + self.refresh_managed_network_proxy_for_current_sandbox_policy() + .await; + } + + Ok(self + .new_turn_from_configuration( + sub_id, + session_configuration, + updates.final_output_json_schema, + ) + .await) + } + + async fn new_turn_from_configuration( + &self, + sub_id: String, + session_configuration: SessionConfiguration, + final_output_json_schema: Option>, + ) -> Arc { + let per_turn_config = Self::build_per_turn_config(&session_configuration); + { + let mcp_connection_manager = self.services.mcp_connection_manager.read().await; + mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy); + mcp_connection_manager + .set_sandbox_policy(per_turn_config.permissions.sandbox_policy.get()); + } + + let model_info = self + .services + .models_manager + .get_model_info( + session_configuration.collaboration_mode.model(), + &per_turn_config.to_models_manager_config(), + ) + .await; + let plugin_outcome = self + .services + .plugins_manager + .plugins_for_config(&per_turn_config) + .await; + let effective_skill_roots = plugin_outcome.effective_skill_roots(); + let skills_input = skills_load_input_from_config(&per_turn_config, effective_skill_roots); + let fs = self + .services + .environment + .as_ref() + .map(|environment| environment.get_filesystem()); + let skills_outcome = Arc::new( + self.services + .skills_manager + .skills_for_config(&skills_input, fs) + .await, + ); + let mut turn_context: TurnContext = Self::make_turn_context( + self.conversation_id, + Some(Arc::clone(&self.services.auth_manager)), + &self.services.session_telemetry, + session_configuration.provider.clone(), + &session_configuration, + self.services.user_shell.as_ref(), + self.services.shell_zsh_path.as_ref(), + self.services.main_execve_wrapper_exe.as_ref(), + per_turn_config, + model_info, + &self.services.models_manager, + self.services + .network_proxy + .as_ref() + .and_then(|started_proxy| { + Self::managed_network_proxy_active_for_sandbox_policy( + session_configuration.sandbox_policy.get(), + ) + .then(|| started_proxy.proxy()) + }), + self.services.environment.clone(), + sub_id, + Arc::clone(&self.js_repl), + skills_outcome, + ); + turn_context.realtime_active = self.conversation.running_state().await.is_some(); + + if let Some(final_schema) = final_output_json_schema { + turn_context.final_output_json_schema = final_schema; + } + let turn_context = Arc::new(turn_context); + turn_context.turn_metadata_state.spawn_git_enrichment_task(); + turn_context + } + + pub(crate) async fn maybe_emit_unknown_model_warning_for_turn(&self, tc: &TurnContext) { + if tc.model_info.used_fallback_model_metadata { + self.send_event( + tc, + EventMsg::Warning(WarningEvent { + message: format!( + "Model metadata for `{}` not found. Defaulting to fallback metadata; this can degrade performance and cause issues.", + tc.model_info.slug + ), + }), + ) + .await; + } + } + + pub(crate) async fn new_default_turn(&self) -> Arc { + self.new_default_turn_with_sub_id(self.next_internal_sub_id()) + .await + } + + pub(crate) async fn new_default_turn_with_sub_id(&self, sub_id: String) -> Arc { + let session_configuration = { + let state = self.state.lock().await; + state.session_configuration.clone() + }; + self.new_turn_from_configuration( + sub_id, + session_configuration, + /*final_output_json_schema*/ None, + ) + .await + } +}