use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::time::SystemTime; use std::time::UNIX_EPOCH; use crate::agent::AgentControl; use crate::agent::AgentStatus; use crate::agent::Mailbox; use crate::agent::MailboxReceiver; use crate::agent::agent_status_from_event; use crate::agent::status::is_final; use crate::agent_identity::AgentIdentityManager; use crate::agent_identity::RegisteredAgentTask; use crate::apps::render_apps_section; use crate::commit_attribution::commit_message_trailer_instruction; use crate::compact; use crate::config::ManagedFeatures; use crate::connectors; use crate::exec_policy::ExecPolicyManager; use crate::installation_id::resolve_installation_id; use crate::parse_turn_item; use crate::path_utils::normalize_for_native_workdir; use crate::realtime_conversation::RealtimeConversationManager; use crate::render_skills_section; use crate::rollout::find_thread_name_by_id; use crate::session_prefix::format_subagent_notification_message; use crate::skills_load_input_from_config; use crate::turn_metadata::TurnMetadataState; use async_channel::Receiver; use async_channel::Sender; use chrono::Local; use chrono::Utc; use codex_analytics::AnalyticsEventsClient; use codex_analytics::SubAgentThreadStartedInput; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::McpServerElicitationRequest; use codex_app_server_protocol::McpServerElicitationRequestParams; use codex_config::types::OAuthCredentialsStoreMode; use codex_exec_server::Environment; use codex_exec_server::EnvironmentManager; use codex_exec_server::FileSystemSandboxContext; use codex_features::FEATURES; use codex_features::Feature; use codex_features::unstable_features_warning_event; use codex_hooks::Hooks; use codex_hooks::HooksConfig; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_login::auth_env_telemetry::collect_auth_env_telemetry; use codex_login::default_client::originator; use codex_mcp::McpConnectionManager; use codex_mcp::ToolInfo; use codex_mcp::codex_apps_tools_cache_key; #[cfg(test)] use codex_models_manager::collaboration_mode_presets::CollaborationModesConfig; use codex_models_manager::manager::ModelsManager; use codex_models_manager::manager::RefreshStrategy; use codex_network_proxy::NetworkProxy; use codex_network_proxy::NetworkProxyAuditMetadata; use codex_network_proxy::normalize_host; use codex_otel::current_span_trace_id; use codex_otel::current_span_w3c_trace_context; use codex_otel::set_parent_from_w3c_trace_context; use codex_protocol::ThreadId; use codex_protocol::ToolName; use codex_protocol::approvals::ElicitationRequestEvent; use codex_protocol::approvals::ExecPolicyAmendment; use codex_protocol::approvals::NetworkPolicyAmendment; use codex_protocol::approvals::NetworkPolicyRuleAction; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::config_types::ModeKind; use codex_protocol::config_types::Settings; use codex_protocol::config_types::WebSearchMode; use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; use codex_protocol::mcp::CallToolResult; use codex_protocol::models::BaseInstructions; use codex_protocol::models::PermissionProfile; use codex_protocol::models::format_allow_prefixes; use codex_protocol::openai_models::ModelInfo; use codex_protocol::permissions::FileSystemSandboxPolicy; use codex_protocol::permissions::NetworkSandboxPolicy; use codex_protocol::protocol::FileChange; use codex_protocol::protocol::HasLegacyEvent; use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::RawResponseItemEvent; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::TurnContextNetworkItem; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; use codex_protocol::request_permissions::RequestPermissionsArgs; use codex_protocol::request_permissions::RequestPermissionsEvent; use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputArgs; use codex_protocol::request_user_input::RequestUserInputResponse; use codex_rmcp_client::ElicitationResponse; use codex_rollout::RolloutConfig; use codex_rollout::state_db; use codex_shell_command::parse_command::parse_command; use codex_terminal_detection::user_agent; use codex_thread_store::LocalThreadStore; use codex_utils_output_truncation::TruncationPolicy; use futures::future::BoxFuture; use futures::future::Shared; use futures::prelude::*; use rmcp::model::ListResourceTemplatesResult; use rmcp::model::ListResourcesResult; use rmcp::model::PaginatedRequestParams; use rmcp::model::ReadResourceRequestParams; use rmcp::model::ReadResourceResult; use rmcp::model::RequestId; use serde_json::Value; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::sync::oneshot; use tokio::sync::watch; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use toml::Value as TomlValue; use tracing::Instrument; use tracing::debug; use tracing::error; use tracing::info; use tracing::info_span; use tracing::instrument; use tracing::warn; use uuid::Uuid; use crate::client::ModelClient; use crate::codex_thread::ThreadConfigSnapshot; use crate::compact::collect_user_messages; use crate::config::Config; use crate::config::Constrained; use crate::config::ConstraintResult; use crate::config::GhostSnapshotConfig; use crate::config::StartedNetworkProxy; use crate::config::resolve_web_search_mode_for_turn; use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; use crate::environment_context::EnvironmentContext; use crate::thread_rollout_truncation::initial_history_has_prior_user_turns; use codex_config::CONFIG_TOML_FILE; use codex_config::types::McpServerConfig; use codex_config::types::ShellEnvironmentPolicy; use codex_model_provider_info::ModelProviderInfo; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; #[cfg(test)] use codex_protocol::exec_output::StreamOutput; mod handlers; mod mcp; mod review; mod rollout_reconstruction; mod session; mod turn; mod turn_context; #[cfg(test)] use self::handlers::submission_dispatch_span; use self::handlers::submission_loop; use self::review::spawn_review_thread; pub(crate) use self::session::AppServerClientMetadata; pub(crate) use self::session::Session; pub(crate) use self::session::SessionConfiguration; pub(crate) use self::session::SessionSettingsUpdate; #[cfg(test)] use self::turn::AssistantMessageStreamParsers; pub(crate) use self::turn::build_prompt; pub(crate) use self::turn::built_tools; #[cfg(test)] use self::turn::collect_explicit_app_ids_from_skill_items; #[cfg(test)] use self::turn::filter_connectors_for_input; pub(crate) use self::turn::get_last_assistant_message_from_turn; use self::turn::realtime_text_for_event; pub(crate) use self::turn::run_turn; pub(crate) use self::turn_context::TurnContext; pub(crate) use self::turn_context::TurnSkillsContext; #[cfg(test)] mod rollout_reconstruction_tests; #[derive(Debug, PartialEq)] pub enum SteerInputError { NoActiveTurn(Vec), ExpectedTurnMismatch { expected: String, actual: String }, ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind }, EmptyInput, } impl SteerInputError { fn to_error_event(&self) -> ErrorEvent { match self { Self::NoActiveTurn(_) => ErrorEvent { message: "no active turn to steer".to_string(), codex_error_info: Some(CodexErrorInfo::BadRequest), }, Self::ExpectedTurnMismatch { expected, actual } => ErrorEvent { message: format!("expected active turn id `{expected}` but found `{actual}`"), codex_error_info: Some(CodexErrorInfo::BadRequest), }, Self::ActiveTurnNotSteerable { turn_kind } => { let turn_kind_label = match turn_kind { NonSteerableTurnKind::Review => "review", NonSteerableTurnKind::Compact => "compact", }; ErrorEvent { message: format!("cannot steer a {turn_kind_label} turn"), codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable { turn_kind: *turn_kind, }), } } Self::EmptyInput => ErrorEvent { message: "input must not be empty".to_string(), codex_error_info: Some(CodexErrorInfo::BadRequest), }, } } } /// Notes from the previous real user turn. /// /// Conceptually this is the same role that `previous_model` used to fill, but /// it can carry other prior-turn settings that matter when constructing /// sensible state-change diffs or full-context reinjection, such as model /// switches or detecting a prior `realtime_active -> false` transition. #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct PreviousTurnSettings { pub(crate) model: String, pub(crate) realtime_active: Option, } use crate::SkillError; use crate::SkillLoadOutcome; use crate::SkillMetadata; use crate::SkillsManager; use crate::agents_md::AgentsMdManager; use crate::exec_policy::ExecPolicyUpdateError; use crate::guardian::GuardianReviewSessionManager; use crate::instructions::UserInstructions; use crate::mcp::McpManager; use crate::memories; use crate::network_policy_decision::execpolicy_network_rule_amendment; use crate::plugins::PluginsManager; use crate::plugins::render_plugins_section; use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; use crate::rollout::map_session_init_error; use crate::rollout::metadata; use crate::rollout::policy::EventPersistenceMode; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::shell; use crate::shell_snapshot::ShellSnapshot; use crate::skills_watcher::SkillsWatcher; use crate::skills_watcher::SkillsWatcherEvent; use crate::state::ActiveTurn; use crate::state::MailboxDeliveryPhase; use crate::state::SessionServices; use crate::state::SessionState; #[cfg(test)] use crate::stream_events_utils::HandleOutputCtx; #[cfg(test)] use crate::stream_events_utils::handle_output_item_done; use crate::tasks::GhostSnapshotTask; use crate::tasks::ReviewTask; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; use crate::tools::js_repl::JsReplHandle; use crate::tools::js_repl::resolve_compatible_node; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::network_approval::build_blocked_request_observer; use crate::tools::network_approval::build_network_policy_decider; #[cfg(test)] use crate::tools::parallel::ToolCallRuntime; use crate::tools::sandboxing::ApprovalStore; use crate::turn_timing::TurnTimingState; use crate::turn_timing::record_turn_ttfm_metric; use crate::unified_exec::UnifiedExecProcessManager; use crate::windows_sandbox::WindowsSandboxLevelExt; use codex_git_utils::get_git_repo_root; use codex_mcp::compute_auth_statuses; use codex_mcp::with_codex_apps_mcp; use codex_otel::SessionTelemetry; use codex_otel::THREAD_STARTED_METRIC; use codex_otel::TelemetryAuthMode; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::Personality; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::config_types::ServiceTier; use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::models::ContentItem; use codex_protocol::models::DeveloperInstructions; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::BackgroundEventEvent; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::DeprecationNoticeEvent; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExecApprovalRequestEvent; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::McpServerRefreshConfig; use codex_protocol::protocol::ModelRerouteEvent; use codex_protocol::protocol::ModelRerouteReason; use codex_protocol::protocol::NetworkApprovalContext; use codex_protocol::protocol::NonSteerableTurnKind; use codex_protocol::protocol::Op; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::RequestUserInputEvent; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionNetworkProxyRuntime; use codex_protocol::protocol::SkillDependencies as ProtocolSkillDependencies; use codex_protocol::protocol::SkillErrorInfo; use codex_protocol::protocol::SkillInterface as ProtocolSkillInterface; use codex_protocol::protocol::SkillMetadata as ProtocolSkillMetadata; use codex_protocol::protocol::SkillToolDependency as ProtocolSkillToolDependency; use codex_protocol::protocol::StreamErrorEvent; use codex_protocol::protocol::Submission; use codex_protocol::protocol::TokenCountEvent; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; use codex_tools::ToolsConfig; use codex_tools::ToolsConfigParams; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; #[cfg(test)] use codex_utils_stream_parser::ProposedPlanSegment; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. pub struct Codex { pub(crate) tx_sub: Sender, pub(crate) rx_event: Receiver, // Last known status of the agent. pub(crate) agent_status: watch::Receiver, pub(crate) session: Arc, // Shared future for the background submission loop completion so multiple // callers can wait for shutdown. pub(crate) session_loop_termination: SessionLoopTermination, } pub(crate) type SessionLoopTermination = Shared>; /// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`], /// the submission id for the initial `ConfigureSession` request and the /// unique session id. pub struct CodexSpawnOk { pub codex: Codex, pub thread_id: ThreadId, } pub(crate) struct CodexSpawnArgs { pub(crate) config: Config, pub(crate) auth_manager: Arc, pub(crate) models_manager: Arc, pub(crate) environment_manager: Arc, pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, pub(crate) skills_watcher: Arc, pub(crate) conversation_history: InitialHistory, pub(crate) session_source: SessionSource, pub(crate) agent_control: AgentControl, pub(crate) dynamic_tools: Vec, pub(crate) persist_extended_history: bool, pub(crate) metrics_service_name: Option, pub(crate) inherited_shell_snapshot: Option>, pub(crate) inherited_exec_policy: Option>, pub(crate) user_shell_override: Option, pub(crate) parent_trace: Option, pub(crate) analytics_events_client: Option, } pub(crate) const INITIAL_SUBMIT_ID: &str = ""; pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 512; const CYBER_VERIFY_URL: &str = "https://chatgpt.com/cyber"; const CYBER_SAFETY_URL: &str = "https://developers.openai.com/codex/concepts/cyber-safety"; impl Codex { /// Spawn a new [`Codex`] and initialize the session. pub(crate) async fn spawn(args: CodexSpawnArgs) -> CodexResult { let parent_trace = match args.parent_trace { Some(trace) => { if codex_otel::context_from_w3c_trace_context(&trace).is_some() { Some(trace) } else { warn!("ignoring invalid thread spawn trace carrier"); None } } None => None, }; let thread_spawn_span = info_span!("thread_spawn", otel.name = "thread_spawn"); if let Some(trace) = parent_trace.as_ref() { let _ = set_parent_from_w3c_trace_context(&thread_spawn_span, trace); } Self::spawn_internal(CodexSpawnArgs { parent_trace, ..args }) .instrument(thread_spawn_span) .await } async fn spawn_internal(args: CodexSpawnArgs) -> CodexResult { let CodexSpawnArgs { mut config, auth_manager, models_manager, environment_manager, skills_manager, plugins_manager, mcp_manager, skills_watcher, conversation_history, session_source, agent_control, dynamic_tools, persist_extended_history, metrics_service_name, inherited_shell_snapshot, user_shell_override, inherited_exec_policy, parent_trace: _, analytics_events_client, } = args; let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); let environment = environment_manager .current() .await .map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?; let fs = environment .as_ref() .map(|environment| environment.get_filesystem()); let plugin_outcome = plugins_manager.plugins_for_config(&config).await; let effective_skill_roots = plugin_outcome.effective_skill_roots(); let skills_input = skills_load_input_from_config(&config, effective_skill_roots); let loaded_skills = skills_manager.skills_for_config(&skills_input, fs).await; for err in &loaded_skills.errors { error!( "failed to load skill {}: {}", err.path.display(), err.message ); } if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = session_source && depth >= config.agent_max_depth { let _ = config.features.disable(Feature::SpawnCsv); let _ = config.features.disable(Feature::Collab); } if config.features.enabled(Feature::JsRepl) && let Err(err) = resolve_compatible_node(config.js_repl_node_path.as_deref()).await { let _ = config.features.disable(Feature::JsRepl); let _ = config.features.disable(Feature::JsReplToolsOnly); let message = if config.features.enabled(Feature::JsRepl) { format!( "`js_repl` remains enabled because enterprise requirements pin it on, but the configured Node runtime is unavailable or incompatible. {err}" ) } else { format!( "Disabled `js_repl` for this session because the configured Node runtime is unavailable or incompatible. {err}" ) }; warn!("{message}"); config.startup_warnings.push(message); } if config.features.enabled(Feature::CodeMode) && let Err(err) = resolve_compatible_node(config.js_repl_node_path.as_deref()).await { let message = format!( "Disabled `exec` for this session because the configured Node runtime is unavailable or incompatible. {err}" ); warn!("{message}"); let _ = config.features.disable(Feature::CodeMode); config.startup_warnings.push(message); } let user_instructions = AgentsMdManager::new(&config) .user_instructions(environment.as_deref()) .await; let exec_policy = if crate::guardian::is_guardian_reviewer_source(&session_source) { // Guardian review should rely on the built-in shell safety checks, // not on caller-provided exec-policy rules that could shape the // reviewer or silently auto-approve commands. Arc::new(ExecPolicyManager::default()) } else if let Some(exec_policy) = &inherited_exec_policy { Arc::clone(exec_policy) } else { Arc::new( ExecPolicyManager::load(&config.config_layer_stack) .await .map_err(|err| CodexErr::Fatal(format!("failed to load rules: {err}")))?, ) }; let config = Arc::new(config); let refresh_strategy = match session_source { SessionSource::SubAgent(_) => codex_models_manager::manager::RefreshStrategy::Offline, _ => codex_models_manager::manager::RefreshStrategy::OnlineIfUncached, }; if config.model.is_none() || !matches!( refresh_strategy, codex_models_manager::manager::RefreshStrategy::Offline ) { let _ = models_manager.list_models(refresh_strategy).await; } let model = models_manager .get_default_model(&config.model, refresh_strategy) .await; // Resolve base instructions for the session. Priority order: // 1. config.base_instructions override // 2. conversation history => session_meta.base_instructions // 3. base_instructions for current model let model_info = models_manager .get_model_info(model.as_str(), &config.to_models_manager_config()) .await; let base_instructions = config .base_instructions .clone() .or_else(|| conversation_history.get_base_instructions().map(|s| s.text)) .unwrap_or_else(|| model_info.get_model_instructions(config.personality)); // Respect thread-start tools. When missing (resumed/forked threads), read from the db // first, then fall back to rollout-file tools. let persisted_tools = if dynamic_tools.is_empty() { let thread_id = match &conversation_history { InitialHistory::Resumed(resumed) => Some(resumed.conversation_id), InitialHistory::Forked(_) => conversation_history.forked_from_id(), InitialHistory::New | InitialHistory::Cleared => None, }; match thread_id { Some(thread_id) => { let state_db_ctx = state_db::get_state_db(&config).await; state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn") .await } None => None, } } else { None }; let dynamic_tools = if dynamic_tools.is_empty() { persisted_tools .or_else(|| conversation_history.get_dynamic_tools()) .unwrap_or_default() } else { dynamic_tools }; // TODO (aibrahim): Consolidate config.model and config.model_reasoning_effort into config.collaboration_mode // to avoid extracting these fields separately and constructing CollaborationMode here. let collaboration_mode = CollaborationMode { mode: ModeKind::Default, settings: Settings { model: model.clone(), reasoning_effort: config.model_reasoning_effort, developer_instructions: None, }, }; let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), collaboration_mode, model_reasoning_summary: config.model_reasoning_summary, service_tier: config.service_tier, developer_instructions: config.developer_instructions.clone(), user_instructions, personality: config.personality, base_instructions, compact_prompt: config.compact_prompt.clone(), approval_policy: config.permissions.approval_policy.clone(), approvals_reviewer: config.approvals_reviewer, sandbox_policy: config.permissions.sandbox_policy.clone(), file_system_sandbox_policy: config.permissions.file_system_sandbox_policy.clone(), network_sandbox_policy: config.permissions.network_sandbox_policy, windows_sandbox_level: WindowsSandboxLevel::from_config(&config), cwd: config.cwd.clone(), codex_home: config.codex_home.clone(), thread_name: None, original_config_do_not_use: Arc::clone(&config), metrics_service_name, app_server_client_name: None, app_server_client_version: None, session_source, dynamic_tools, persist_extended_history, inherited_shell_snapshot, user_shell_override, }; // Generate a unique ID for the lifetime of this Codex session. let session_source_clone = session_configuration.session_source.clone(); let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit); let session = Session::new( session_configuration, config.clone(), auth_manager.clone(), models_manager.clone(), exec_policy, tx_event.clone(), agent_status_tx.clone(), conversation_history, session_source_clone, skills_manager, plugins_manager, mcp_manager.clone(), skills_watcher, agent_control, environment, analytics_events_client, ) .await .map_err(|e| { error!("Failed to create session: {e:#}"); map_session_init_error(&e, &config.codex_home) })?; let thread_id = session.conversation_id; // This task will run until Op::Shutdown is received. let session_for_loop = Arc::clone(&session); let session_loop_handle = tokio::spawn(async move { submission_loop(session_for_loop, config, rx_sub) .instrument(info_span!("session_loop", thread_id = %thread_id)) .await; }); let codex = Codex { tx_sub, rx_event, agent_status: agent_status_rx, session, session_loop_termination: session_loop_termination_from_handle(session_loop_handle), }; Ok(CodexSpawnOk { codex, thread_id }) } /// Submit the `op` wrapped in a `Submission` with a unique ID. pub async fn submit(&self, op: Op) -> CodexResult { self.submit_with_trace(op, /*trace*/ None).await } pub async fn submit_with_trace( &self, op: Op, trace: Option, ) -> CodexResult { let id = Uuid::now_v7().to_string(); let sub = Submission { id: id.clone(), op, trace, }; self.submit_with_id(sub).await?; Ok(id) } /// Use sparingly: prefer `submit()` so Codex is responsible for generating /// unique IDs for each submission. pub async fn submit_with_id(&self, mut sub: Submission) -> CodexResult<()> { if sub.trace.is_none() { sub.trace = current_span_w3c_trace_context(); } self.tx_sub .send(sub) .await .map_err(|_| CodexErr::InternalAgentDied)?; Ok(()) } /// Persist a thread-level memory mode update for the active session. /// /// This is a local-only operation that updates rollout metadata directly /// and does not involve the model. pub async fn set_thread_memory_mode( &self, mode: codex_protocol::protocol::ThreadMemoryMode, ) -> anyhow::Result<()> { handlers::persist_thread_memory_mode_update(&self.session, mode).await } pub async fn shutdown_and_wait(&self) -> CodexResult<()> { let session_loop_termination = self.session_loop_termination.clone(); match self.submit(Op::Shutdown).await { Ok(_) => {} Err(CodexErr::InternalAgentDied) => {} Err(err) => return Err(err), } session_loop_termination.await; Ok(()) } pub async fn next_event(&self) -> CodexResult { let event = self .rx_event .recv() .await .map_err(|_| CodexErr::InternalAgentDied)?; Ok(event) } pub async fn steer_input( &self, input: Vec, expected_turn_id: Option<&str>, responsesapi_client_metadata: Option>, ) -> Result { self.session .steer_input(input, expected_turn_id, responsesapi_client_metadata) .await } pub(crate) async fn set_app_server_client_info( &self, app_server_client_name: Option, app_server_client_version: Option, ) -> ConstraintResult<()> { self.session .update_settings(SessionSettingsUpdate { app_server_client_name, app_server_client_version, ..Default::default() }) .await } pub(crate) async fn agent_status(&self) -> AgentStatus { self.agent_status.borrow().clone() } pub(crate) async fn thread_config_snapshot(&self) -> ThreadConfigSnapshot { let state = self.session.state.lock().await; state.session_configuration.thread_config_snapshot() } pub(crate) fn state_db(&self) -> Option { self.session.state_db() } pub(crate) fn enabled(&self, feature: Feature) -> bool { self.session.enabled(feature) } } #[cfg(test)] pub(crate) fn completed_session_loop_termination() -> SessionLoopTermination { futures::future::ready(()).boxed().shared() } pub(crate) fn session_loop_termination_from_handle( handle: JoinHandle<()>, ) -> SessionLoopTermination { async move { let _ = handle.await; } .boxed() .shared() } async fn thread_title_from_state_db( state_db: Option<&state_db::StateDbHandle>, codex_home: &AbsolutePathBuf, conversation_id: ThreadId, ) -> Option { if let Some(metadata) = state_db && let Some(metadata) = metadata.get_thread(conversation_id).await.ok().flatten() { let title = metadata.title.trim(); if !title.is_empty() && metadata.first_user_message.as_deref().map(str::trim) != Some(title) { return Some(title.to_string()); } } find_thread_name_by_id(codex_home, &conversation_id) .await .ok() .flatten() } impl Session { pub(crate) async fn app_server_client_metadata(&self) -> AppServerClientMetadata { let state = self.state.lock().await; AppServerClientMetadata { client_name: state.session_configuration.app_server_client_name.clone(), client_version: state .session_configuration .app_server_client_version .clone(), } } fn managed_network_proxy_active_for_sandbox_policy(sandbox_policy: &SandboxPolicy) -> bool { !matches!(sandbox_policy, SandboxPolicy::DangerFullAccess) } /// Builds the `x-codex-beta-features` header value for this session. /// /// `ModelClient` is session-scoped and intentionally does not depend on the full `Config`, so /// we precompute the comma-separated list of enabled experimental feature keys at session /// creation time and thread it into the client. fn build_model_client_beta_features_header(config: &Config) -> Option { let beta_features_header = FEATURES .iter() .filter_map(|spec| { if spec.stage.experimental_menu_description().is_some() && config.features.enabled(spec.id) { Some(spec.key) } else { None } }) .collect::>() .join(","); if beta_features_header.is_empty() { None } else { Some(beta_features_header) } } async fn start_managed_network_proxy( spec: &crate::config::NetworkProxySpec, exec_policy: &codex_execpolicy::Policy, sandbox_policy: &SandboxPolicy, network_policy_decider: Option>, blocked_request_observer: Option>, managed_network_requirements_enabled: bool, audit_metadata: NetworkProxyAuditMetadata, ) -> anyhow::Result<(StartedNetworkProxy, SessionNetworkProxyRuntime)> { let spec = spec .with_exec_policy_network_rules(exec_policy) .map_err(|err| { tracing::warn!( "failed to apply execpolicy network rules to managed proxy; continuing with configured network policy: {err}" ); err }) .unwrap_or_else(|_| spec.clone()); let network_proxy = spec .start_proxy( sandbox_policy, network_policy_decider, blocked_request_observer, managed_network_requirements_enabled, audit_metadata, ) .await .map_err(|err| anyhow::anyhow!("failed to start managed network proxy: {err}"))?; let session_network_proxy = { let proxy = network_proxy.proxy(); SessionNetworkProxyRuntime { http_addr: proxy.http_addr().to_string(), socks_addr: proxy.socks_addr().to_string(), } }; Ok((network_proxy, session_network_proxy)) } async fn refresh_managed_network_proxy_for_current_sandbox_policy(&self) { let Some(started_proxy) = self.services.network_proxy.as_ref() else { return; }; let _refresh_guard = self.managed_network_proxy_refresh_lock.lock().await; let session_configuration = { let state = self.state.lock().await; state.session_configuration.clone() }; let Some(spec) = session_configuration .original_config_do_not_use .permissions .network .as_ref() else { return; }; let spec = match spec .recompute_for_sandbox_policy(session_configuration.sandbox_policy.get()) { Ok(spec) => spec, Err(err) => { warn!("failed to rebuild managed network proxy policy for sandbox change: {err}"); return; } }; let current_exec_policy = self.services.exec_policy.current(); let spec = match spec.with_exec_policy_network_rules(current_exec_policy.as_ref()) { Ok(spec) => spec, Err(err) => { warn!( "failed to apply execpolicy network rules while refreshing managed network proxy: {err}" ); spec } }; if let Err(err) = spec.apply_to_started_proxy(started_proxy).await { warn!("failed to refresh managed network proxy for sandbox change: {err}"); } } pub(crate) async fn codex_home(&self) -> AbsolutePathBuf { let state = self.state.lock().await; state.session_configuration.codex_home().clone() } pub(crate) fn subscribe_out_of_band_elicitation_pause_state(&self) -> watch::Receiver { self.out_of_band_elicitation_paused.subscribe() } pub(crate) fn set_out_of_band_elicitation_pause_state(&self, paused: bool) { self.out_of_band_elicitation_paused.send_replace(paused); } fn start_skills_watcher_listener(self: &Arc) { let mut rx = self.services.skills_watcher.subscribe(); let weak_sess = Arc::downgrade(self); tokio::spawn(async move { loop { match rx.recv().await { Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { let Some(sess) = weak_sess.upgrade() else { break; }; let event = Event { id: sess.next_internal_sub_id(), msg: EventMsg::SkillsUpdateAvailable, }; sess.send_event_raw(event).await; } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, } } }); } fn start_agent_identity_registration(self: &Arc) { if !self.services.agent_identity_manager.is_enabled() { return; } let weak_sess = Arc::downgrade(self); let mut auth_state_rx = self.services.auth_manager.subscribe_auth_state(); tokio::spawn(async move { loop { let Some(sess) = weak_sess.upgrade() else { return; }; match sess .services .agent_identity_manager .ensure_registered_identity() .await { Ok(Some(_)) => return, Ok(None) => { drop(sess); if auth_state_rx.changed().await.is_err() { return; } } Err(error) => { sess.fail_agent_identity_registration(error).await; return; } } } }); } async fn fail_agent_identity_registration(self: &Arc, error: anyhow::Error) { warn!(error = %error, "agent identity registration failed"); let message = format!( "Agent identity registration failed while `features.use_agent_identity` is enabled: {error}" ); self.send_event_raw(Event { id: self.next_internal_sub_id(), msg: EventMsg::Error(ErrorEvent { message, codex_error_info: Some(CodexErrorInfo::Other), }), }) .await; } async fn cached_agent_task_for_current_binding(&self) -> Option { let agent_task = { let state = self.state.lock().await; state.agent_task() }?; if self .services .agent_identity_manager .task_matches_current_binding(&agent_task) .await { debug!( agent_runtime_id = %agent_task.agent_runtime_id, task_id = %agent_task.task_id, "reusing cached agent task" ); return Some(agent_task); } debug!( agent_runtime_id = %agent_task.agent_runtime_id, task_id = %agent_task.task_id, "discarding cached agent task because auth binding changed" ); let mut state = self.state.lock().await; if state.agent_task().as_ref() == Some(&agent_task) { state.clear_agent_task(); } None } async fn ensure_agent_task_registered(&self) -> anyhow::Result> { if let Some(agent_task) = self.cached_agent_task_for_current_binding().await { return Ok(Some(agent_task)); } for _ in 0..2 { let Some(agent_task) = self.services.agent_identity_manager.register_task().await? else { return Ok(None); }; if !self .services .agent_identity_manager .task_matches_current_binding(&agent_task) .await { debug!( agent_runtime_id = %agent_task.agent_runtime_id, task_id = %agent_task.task_id, "discarding newly registered agent task because auth binding changed" ); continue; } { let mut state = self.state.lock().await; if let Some(existing_agent_task) = state.agent_task() { if existing_agent_task.has_same_binding(&agent_task) { return Ok(Some(existing_agent_task)); } debug!( agent_runtime_id = %existing_agent_task.agent_runtime_id, task_id = %existing_agent_task.task_id, "replacing cached agent task because auth binding changed" ); } state.set_agent_task(agent_task.clone()); } info!( thread_id = %self.conversation_id, agent_runtime_id = %agent_task.agent_runtime_id, task_id = %agent_task.task_id, "registered agent task for thread" ); return Ok(Some(agent_task)); } Ok(None) } pub(crate) fn get_tx_event(&self) -> Sender { self.tx_event.clone() } pub(crate) fn state_db(&self) -> Option { self.services.state_db.clone() } /// Flush rollout writes and return the final durability-barrier result. pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> { let recorder = { let guard = self.services.rollout.lock().await; guard.clone() }; if let Some(recorder) = recorder { recorder.flush().await } else { Ok(()) } } pub(crate) async fn try_ensure_rollout_materialized(&self) -> std::io::Result<()> { let recorder = { let guard = self.services.rollout.lock().await; guard.clone() }; if let Some(rec) = recorder { rec.persist().await?; } Ok(()) } pub(crate) async fn ensure_rollout_materialized(&self) { if let Err(e) = self.try_ensure_rollout_materialized().await { warn!("failed to materialize rollout recorder: {e}"); } } fn next_internal_sub_id(&self) -> String { let id = self .next_internal_sub_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst); format!("auto-compact-{id}") } pub(crate) async fn route_realtime_text_input(self: &Arc, text: String) { handlers::user_input_or_turn_inner( self, self.next_internal_sub_id(), Op::UserInput { items: vec![UserInput::Text { text, text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, }, /*mirror_user_text_to_realtime*/ None, ) .await; } pub(crate) async fn get_total_token_usage(&self) -> i64 { let state = self.state.lock().await; state.get_total_token_usage(state.server_reasoning_included()) } pub(crate) async fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown { let state = self.state.lock().await; state.history.get_total_token_usage_breakdown() } pub(crate) async fn total_token_usage(&self) -> Option { let state = self.state.lock().await; state.token_info().map(|info| info.total_token_usage) } /// Returns the complete token usage snapshot currently cached for this session. /// /// Resume and fork reconstruction seed this state from the last persisted rollout /// `TokenCount` event. Callers that need to replay restored usage to a client /// should use this accessor instead of `total_token_usage`, because the app-server /// notification includes both total and last-turn usage. pub(crate) async fn token_usage_info(&self) -> Option { let state = self.state.lock().await; state.token_info() } pub(crate) async fn get_estimated_token_count( &self, turn_context: &TurnContext, ) -> Option { let state = self.state.lock().await; state.history.estimate_token_count(turn_context) } pub(crate) async fn get_base_instructions(&self) -> BaseInstructions { let state = self.state.lock().await; BaseInstructions { text: state.session_configuration.base_instructions.clone(), } } // Merges connector IDs into the session-level explicit connector selection. pub(crate) async fn merge_connector_selection( &self, connector_ids: HashSet, ) -> HashSet { let mut state = self.state.lock().await; state.merge_connector_selection(connector_ids) } // Returns the connector IDs currently selected for this session. pub(crate) async fn get_connector_selection(&self) -> HashSet { let state = self.state.lock().await; state.get_connector_selection() } // Clears connector IDs that were accumulated for explicit selection. pub(crate) async fn clear_connector_selection(&self) { let mut state = self.state.lock().await; state.clear_connector_selection(); } async fn record_initial_history(&self, conversation_history: InitialHistory) { let turn_context = self.new_default_turn().await; let is_subagent = { let state = self.state.lock().await; matches!( state.session_configuration.session_source, SessionSource::SubAgent(_) ) }; let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history); { let mut state = self.state.lock().await; state.set_next_turn_is_first(!has_prior_user_turns); } match conversation_history { InitialHistory::New | InitialHistory::Cleared => { // Defer initial context insertion until the first real turn starts so // turn/start overrides can be merged before we write model-visible context. self.set_previous_turn_settings(/*previous_turn_settings*/ None) .await; } InitialHistory::Resumed(resumed_history) => { let rollout_items = resumed_history.history; let previous_turn_settings = self .apply_rollout_reconstruction(&turn_context, &rollout_items) .await; // If resuming, warn when the last recorded model differs from the current one. let curr: &str = turn_context.model_info.slug.as_str(); if let Some(prev) = previous_turn_settings .as_ref() .map(|settings| settings.model.as_str()) .filter(|model| *model != curr) { warn!("resuming session with different model: previous={prev}, current={curr}"); self.send_event( &turn_context, EventMsg::Warning(WarningEvent { message: format!( "This session was recorded with model `{prev}` but is resuming with `{curr}`. \ Consider switching back to `{prev}` as it may affect Codex performance." ), }), ) .await; } // Seed usage info from the recorded rollout so UIs can show token counts // immediately on resume/fork. if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) { let mut state = self.state.lock().await; state.set_token_info(Some(info)); } // Defer seeding the session's initial context until the first turn starts so // turn/start overrides can be merged before we write to the rollout. if !is_subagent { let _ = self.flush_rollout().await; } } InitialHistory::Forked(rollout_items) => { self.apply_rollout_reconstruction(&turn_context, &rollout_items) .await; // Seed usage info from the recorded rollout so UIs can show token counts // immediately on resume/fork. if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) { let mut state = self.state.lock().await; state.set_token_info(Some(info)); } // If persisting, persist all rollout items as-is (recorder filters) if !rollout_items.is_empty() { self.persist_rollout_items(&rollout_items).await; } // Forked threads should remain file-backed immediately after startup. self.ensure_rollout_materialized().await; // Flush after seeding history and any persisted rollout copy. if !is_subagent { let _ = self.flush_rollout().await; } } } } async fn apply_rollout_reconstruction( &self, turn_context: &TurnContext, rollout_items: &[RolloutItem], ) -> Option { let reconstructed_rollout = self .reconstruct_history_from_rollout(turn_context, rollout_items) .await; let previous_turn_settings = reconstructed_rollout.previous_turn_settings.clone(); self.replace_history( reconstructed_rollout.history, reconstructed_rollout.reference_context_item, ) .await; self.set_previous_turn_settings(previous_turn_settings.clone()) .await; previous_turn_settings } fn last_token_info_from_rollout(rollout_items: &[RolloutItem]) -> Option { rollout_items.iter().rev().find_map(|item| match item { RolloutItem::EventMsg(EventMsg::TokenCount(ev)) => ev.info.clone(), _ => None, }) } async fn previous_turn_settings(&self) -> Option { let state = self.state.lock().await; state.previous_turn_settings() } pub(crate) async fn set_previous_turn_settings( &self, previous_turn_settings: Option, ) { let mut state = self.state.lock().await; state.set_previous_turn_settings(previous_turn_settings); } fn maybe_refresh_shell_snapshot_for_cwd( &self, previous_cwd: &AbsolutePathBuf, next_cwd: &AbsolutePathBuf, codex_home: &AbsolutePathBuf, session_source: &SessionSource, ) { if previous_cwd == next_cwd { return; } if !self.features.enabled(Feature::ShellSnapshot) { return; } if matches!( session_source, SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) ) { return; } ShellSnapshot::refresh_snapshot( codex_home.clone(), self.conversation_id, next_cwd.clone(), self.services.user_shell.as_ref().clone(), self.services.shell_snapshot_tx.clone(), self.services.session_telemetry.clone(), ); } pub(crate) async fn update_settings( &self, updates: SessionSettingsUpdate, ) -> ConstraintResult<()> { let mut state = self.state.lock().await; match state.session_configuration.apply(&updates) { Ok(updated) => { let previous_cwd = state.session_configuration.cwd.clone(); let sandbox_policy_changed = state.session_configuration.sandbox_policy != updated.sandbox_policy; let next_cwd = updated.cwd.clone(); let codex_home = updated.codex_home.clone(); let session_source = updated.session_source.clone(); state.session_configuration = updated; drop(state); self.maybe_refresh_shell_snapshot_for_cwd( &previous_cwd, &next_cwd, &codex_home, &session_source, ); if sandbox_policy_changed { self.refresh_managed_network_proxy_for_current_sandbox_policy() .await; } Ok(()) } Err(err) => { warn!("rejected session settings update: {err}"); Err(err) } } } pub(crate) async fn set_session_startup_prewarm( &self, startup_prewarm: SessionStartupPrewarmHandle, ) { let mut state = self.state.lock().await; state.set_session_startup_prewarm(startup_prewarm); } pub(crate) async fn take_session_startup_prewarm(&self) -> Option { let mut state = self.state.lock().await; state.take_session_startup_prewarm() } pub(crate) async fn get_config(&self) -> std::sync::Arc { let state = self.state.lock().await; state .session_configuration .original_config_do_not_use .clone() } pub(crate) async fn provider(&self) -> ModelProviderInfo { let state = self.state.lock().await; state.session_configuration.provider.clone() } pub(crate) async fn reload_user_config_layer(&self) { let config_toml_path = { let state = self.state.lock().await; state .session_configuration .codex_home .join(CONFIG_TOML_FILE) }; let user_config = match std::fs::read_to_string(&config_toml_path) { Ok(contents) => match toml::from_str::(&contents) { Ok(config) => config, Err(err) => { warn!("failed to parse user config while reloading layer: {err}"); return; } }, Err(err) if err.kind() == std::io::ErrorKind::NotFound => { toml::Value::Table(Default::default()) } Err(err) => { warn!("failed to read user config while reloading layer: {err}"); return; } }; let mut state = self.state.lock().await; let mut config = (*state.session_configuration.original_config_do_not_use).clone(); config.config_layer_stack = config .config_layer_stack .with_user_config(&config_toml_path, user_config); state.session_configuration.original_config_do_not_use = Arc::new(config); self.services.skills_manager.clear_cache(); self.services.plugins_manager.clear_cache(); } async fn build_settings_update_items( &self, reference_context_item: Option<&TurnContextItem>, current_context: &TurnContext, ) -> Vec { // TODO: Make context updates a pure diff of persisted previous/current TurnContextItem // state so replay/backtracking is deterministic. Runtime inputs that affect model-visible // context (shell, exec policy, feature gates, previous-turn bridge) should be persisted // state or explicit non-state replay events. let previous_turn_settings = { let state = self.state.lock().await; state.previous_turn_settings() }; let shell = self.user_shell(); let exec_policy = self.services.exec_policy.current(); crate::context_manager::updates::build_settings_update_items( reference_context_item, previous_turn_settings.as_ref(), current_context, shell.as_ref(), exec_policy.as_ref(), self.features.enabled(Feature::Personality), ) } /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) { let legacy_source = msg.clone(); let event = Event { id: turn_context.sub_id.clone(), msg, }; self.send_event_raw(event).await; self.maybe_notify_parent_of_terminal_turn(turn_context, &legacy_source) .await; self.maybe_mirror_event_text_to_realtime(&legacy_source) .await; self.maybe_clear_realtime_handoff_for_event(&legacy_source) .await; let show_raw_agent_reasoning = self.show_raw_agent_reasoning(); for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) { let legacy_event = Event { id: turn_context.sub_id.clone(), msg: legacy, }; self.send_event_raw(legacy_event).await; } } /// Forwards terminal turn events from spawned MultiAgentV2 children to their direct parent. async fn maybe_notify_parent_of_terminal_turn( &self, turn_context: &TurnContext, msg: &EventMsg, ) { if !self.enabled(Feature::MultiAgentV2) { return; } if !matches!(msg, EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) { return; } let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, agent_path: Some(child_agent_path), .. }) = &turn_context.session_source else { return; }; let Some(status) = agent_status_from_event(msg) else { return; }; if !is_final(&status) { return; } self.forward_child_completion_to_parent(*parent_thread_id, child_agent_path, status) .await; } /// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent. async fn forward_child_completion_to_parent( &self, parent_thread_id: ThreadId, child_agent_path: &codex_protocol::AgentPath, status: AgentStatus, ) { let Some(parent_agent_path) = child_agent_path .as_str() .rsplit_once('/') .and_then(|(parent, _)| codex_protocol::AgentPath::try_from(parent).ok()) else { return; }; let message = format_subagent_notification_message(child_agent_path.as_str(), &status); let communication = InterAgentCommunication::new( child_agent_path.clone(), parent_agent_path, Vec::new(), message, /*trigger_turn*/ false, ); if let Err(err) = self .services .agent_control .send_inter_agent_communication(parent_thread_id, communication) .await { debug!("failed to notify parent thread {parent_thread_id}: {err}"); } } async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) { let Some(text) = realtime_text_for_event(msg) else { return; }; if self.conversation.running_state().await.is_none() || self.conversation.active_handoff_id().await.is_none() { return; } if let Err(err) = self.conversation.handoff_out(text).await { debug!("failed to mirror event text to realtime conversation: {err}"); } } async fn maybe_clear_realtime_handoff_for_event(&self, msg: &EventMsg) { if !matches!(msg, EventMsg::TurnComplete(_)) { return; } if let Err(err) = self.conversation.handoff_complete().await { debug!("failed to finalize realtime handoff output: {err}"); } self.conversation.clear_active_handoff().await; } pub(crate) async fn send_event_raw(&self, event: Event) { // Persist the event into rollout (recorder filters as needed) let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; self.persist_rollout_items(&rollout_items).await; self.deliver_event_raw(event).await; } async fn deliver_event_raw(&self, event: Event) { // Record the last known agent status. if let Some(status) = agent_status_from_event(&event.msg) { self.agent_status.send_replace(status); } if let Err(e) = self.tx_event.send(event).await { debug!("dropping event because channel is closed: {e}"); } } pub(crate) async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) { self.send_event( turn_context, EventMsg::ItemStarted(ItemStartedEvent { thread_id: self.conversation_id, turn_id: turn_context.sub_id.clone(), item: item.clone(), }), ) .await; } pub(crate) async fn emit_turn_item_completed( &self, turn_context: &TurnContext, item: TurnItem, ) { record_turn_ttfm_metric(turn_context, &item).await; self.send_event( turn_context, EventMsg::ItemCompleted(ItemCompletedEvent { thread_id: self.conversation_id, turn_id: turn_context.sub_id.clone(), item, }), ) .await; } /// Adds an execpolicy amendment to both the in-memory and on-disk policies so future /// commands can use the newly approved prefix. pub(crate) async fn persist_execpolicy_amendment( &self, amendment: &ExecPolicyAmendment, ) -> Result<(), ExecPolicyUpdateError> { let codex_home = self .state .lock() .await .session_configuration .codex_home() .clone(); self.services .exec_policy .append_amendment_and_update(&codex_home, amendment) .await?; Ok(()) } pub(crate) async fn turn_context_for_sub_id(&self, sub_id: &str) -> Option> { let active = self.active_turn.lock().await; active .as_ref() .and_then(|turn| turn.tasks.get(sub_id)) .map(|task| Arc::clone(&task.turn_context)) } async fn active_turn_context_and_cancellation_token( &self, ) -> Option<(Arc, CancellationToken)> { let active = self.active_turn.lock().await; let (_, task) = active.as_ref()?.tasks.first()?; Some(( Arc::clone(&task.turn_context), task.cancellation_token.child_token(), )) } pub(crate) async fn record_execpolicy_amendment_message( &self, sub_id: &str, amendment: &ExecPolicyAmendment, ) { let Some(prefixes) = format_allow_prefixes(vec![amendment.command.clone()]) else { warn!("execpolicy amendment for {sub_id} had no command prefix"); return; }; let text = format!("Approved command prefix saved:\n{prefixes}"); let message: ResponseItem = DeveloperInstructions::new(text.clone()).into(); if let Some(turn_context) = self.turn_context_for_sub_id(sub_id).await { self.record_conversation_items(&turn_context, std::slice::from_ref(&message)) .await; return; } if self .inject_response_items(vec![ResponseInputItem::Message { role: "developer".to_string(), content: vec![ContentItem::InputText { text }], }]) .await .is_err() { warn!("no active turn found to record execpolicy amendment message for {sub_id}"); } } pub(crate) async fn persist_network_policy_amendment( &self, amendment: &NetworkPolicyAmendment, network_approval_context: &NetworkApprovalContext, ) -> anyhow::Result<()> { let _refresh_guard = self.managed_network_proxy_refresh_lock.lock().await; let host = Self::validated_network_policy_amendment_host(amendment, network_approval_context)?; let codex_home = self .state .lock() .await .session_configuration .codex_home() .clone(); let execpolicy_amendment = execpolicy_network_rule_amendment(amendment, network_approval_context, &host); if let Some(started_network_proxy) = self.services.network_proxy.as_ref() { let proxy = started_network_proxy.proxy(); match amendment.action { NetworkPolicyRuleAction::Allow => proxy .add_allowed_domain(&host) .await .map_err(|err| anyhow::anyhow!("failed to update runtime allowlist: {err}"))?, NetworkPolicyRuleAction::Deny => proxy .add_denied_domain(&host) .await .map_err(|err| anyhow::anyhow!("failed to update runtime denylist: {err}"))?, } } self.services .exec_policy .append_network_rule_and_update( &codex_home, &host, execpolicy_amendment.protocol, execpolicy_amendment.decision, Some(execpolicy_amendment.justification), ) .await .map_err(|err| { anyhow::anyhow!("failed to persist network policy amendment to execpolicy: {err}") })?; Ok(()) } fn validated_network_policy_amendment_host( amendment: &NetworkPolicyAmendment, network_approval_context: &NetworkApprovalContext, ) -> anyhow::Result { let approved_host = normalize_host(&network_approval_context.host); let amendment_host = normalize_host(&amendment.host); if amendment_host != approved_host { return Err(anyhow::anyhow!( "network policy amendment host '{}' does not match approved host '{}'", amendment.host, network_approval_context.host )); } Ok(approved_host) } pub(crate) async fn record_network_policy_amendment_message( &self, sub_id: &str, amendment: &NetworkPolicyAmendment, ) { let (action, list_name) = match amendment.action { NetworkPolicyRuleAction::Allow => ("Allowed", "allowlist"), NetworkPolicyRuleAction::Deny => ("Denied", "denylist"), }; let text = format!( "{action} network rule saved in execpolicy ({list_name}): {}", amendment.host ); let message: ResponseItem = DeveloperInstructions::new(text.clone()).into(); if let Some(turn_context) = self.turn_context_for_sub_id(sub_id).await { self.record_conversation_items(&turn_context, std::slice::from_ref(&message)) .await; return; } if self .inject_response_items(vec![ResponseInputItem::Message { role: "developer".to_string(), content: vec![ContentItem::InputText { text }], }]) .await .is_err() { warn!("no active turn found to record network policy amendment message for {sub_id}"); } } /// Emit an exec approval request event and await the user's decision. /// /// The request is keyed by `call_id` + `approval_id` so matching responses /// are delivered to the correct in-flight turn. If the pending approval is /// cleared before a response arrives, treat it as an abort so interrupted /// turns do not continue on a synthetic denial. /// /// Note that if `available_decisions` is `None`, then the other fields will /// be used to derive the available decisions via /// [ExecApprovalRequestEvent::default_available_decisions]. #[allow(clippy::too_many_arguments)] pub async fn request_command_approval( &self, turn_context: &TurnContext, call_id: String, approval_id: Option, command: Vec, cwd: AbsolutePathBuf, reason: Option, network_approval_context: Option, proposed_execpolicy_amendment: Option, additional_permissions: Option, available_decisions: Option>, ) -> ReviewDecision { // command-level approvals use `call_id`. // `approval_id` is only present for subcommand callbacks (execve intercept) let effective_approval_id = approval_id.clone().unwrap_or_else(|| call_id.clone()); // Add the tx_approve callback to the map before sending the request. let (tx_approve, rx_approve) = oneshot::channel(); let prev_entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.insert_pending_approval(effective_approval_id.clone(), tx_approve) } None => None, } }; if prev_entry.is_some() { warn!("Overwriting existing pending approval for call_id: {effective_approval_id}"); } let parsed_cmd = parse_command(&command); let proposed_network_policy_amendments = network_approval_context.as_ref().map(|context| { vec![ NetworkPolicyAmendment { host: context.host.clone(), action: NetworkPolicyRuleAction::Allow, }, NetworkPolicyAmendment { host: context.host.clone(), action: NetworkPolicyRuleAction::Deny, }, ] }); let available_decisions = available_decisions.unwrap_or_else(|| { ExecApprovalRequestEvent::default_available_decisions( network_approval_context.as_ref(), proposed_execpolicy_amendment.as_ref(), proposed_network_policy_amendments.as_deref(), additional_permissions.as_ref(), ) }); let event = EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { call_id, approval_id, turn_id: turn_context.sub_id.clone(), command, cwd, reason, network_approval_context, proposed_execpolicy_amendment, proposed_network_policy_amendments, additional_permissions, available_decisions: Some(available_decisions), parsed_cmd, }); self.send_event(turn_context, event).await; rx_approve.await.unwrap_or(ReviewDecision::Abort) } pub async fn request_patch_approval( &self, turn_context: &TurnContext, call_id: String, changes: HashMap, reason: Option, grant_root: Option, ) -> oneshot::Receiver { // Add the tx_approve callback to the map before sending the request. let (tx_approve, rx_approve) = oneshot::channel(); let approval_id = call_id.clone(); let prev_entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.insert_pending_approval(approval_id.clone(), tx_approve) } None => None, } }; if prev_entry.is_some() { warn!("Overwriting existing pending approval for call_id: {approval_id}"); } let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, turn_id: turn_context.sub_id.clone(), changes, reason, grant_root, }); self.send_event(turn_context, event).await; rx_approve } pub async fn request_permissions( &self, turn_context: &TurnContext, call_id: String, args: RequestPermissionsArgs, ) -> Option { match turn_context.approval_policy.value() { AskForApproval::Never => { return Some(RequestPermissionsResponse { permissions: RequestPermissionProfile::default(), scope: PermissionGrantScope::Turn, }); } AskForApproval::Granular(granular_config) if !granular_config.allows_request_permissions() => { return Some(RequestPermissionsResponse { permissions: RequestPermissionProfile::default(), scope: PermissionGrantScope::Turn, }); } AskForApproval::OnFailure | AskForApproval::OnRequest | AskForApproval::UnlessTrusted | AskForApproval::Granular(_) => {} } let (tx_response, rx_response) = oneshot::channel(); let prev_entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.insert_pending_request_permissions(call_id.clone(), tx_response) } None => None, } }; if prev_entry.is_some() { warn!("Overwriting existing pending request_permissions for call_id: {call_id}"); } // TODO(ccunningham): Support auto-review for request_permissions / // with_additional_permissions. V0 still routes this surface through // the existing manual RequestPermissions event flow. let event = EventMsg::RequestPermissions(RequestPermissionsEvent { call_id, turn_id: turn_context.sub_id.clone(), reason: args.reason, permissions: args.permissions, }); self.send_event(turn_context, event).await; rx_response.await.ok() } pub async fn request_user_input( &self, turn_context: &TurnContext, call_id: String, args: RequestUserInputArgs, ) -> Option { let sub_id = turn_context.sub_id.clone(); let (tx_response, rx_response) = oneshot::channel(); let event_id = sub_id.clone(); let prev_entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.insert_pending_user_input(sub_id, tx_response) } None => None, } }; if prev_entry.is_some() { warn!("Overwriting existing pending user input for sub_id: {event_id}"); } let event = EventMsg::RequestUserInput(RequestUserInputEvent { call_id, turn_id: turn_context.sub_id.clone(), questions: args.questions, }); self.send_event(turn_context, event).await; rx_response.await.ok() } pub async fn notify_user_input_response( &self, sub_id: &str, response: RequestUserInputResponse, ) { let entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.remove_pending_user_input(sub_id) } None => None, } }; match entry { Some(tx_response) => { tx_response.send(response).ok(); } None => { warn!("No pending user input found for sub_id: {sub_id}"); } } } pub async fn notify_request_permissions_response( &self, call_id: &str, response: RequestPermissionsResponse, ) { let mut granted_for_session = None; let entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; let entry = ts.remove_pending_request_permissions(call_id); if entry.is_some() && !response.permissions.is_empty() { match response.scope { PermissionGrantScope::Turn => { ts.record_granted_permissions(response.permissions.clone().into()); } PermissionGrantScope::Session => { granted_for_session = Some(response.permissions.clone()); } } } entry } None => None, } }; if let Some(permissions) = granted_for_session { let mut state = self.state.lock().await; state.record_granted_permissions(permissions.into()); } match entry { Some(tx_response) => { tx_response.send(response).ok(); } None => { warn!("No pending request_permissions found for call_id: {call_id}"); } } } pub(crate) async fn granted_turn_permissions(&self) -> Option { let active = self.active_turn.lock().await; let active = active.as_ref()?; let ts = active.turn_state.lock().await; ts.granted_permissions() } pub(crate) async fn granted_session_permissions(&self) -> Option { let state = self.state.lock().await; state.granted_permissions() } pub async fn notify_dynamic_tool_response(&self, call_id: &str, response: DynamicToolResponse) { let entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.remove_pending_dynamic_tool(call_id) } None => None, } }; match entry { Some(tx_response) => { tx_response.send(response).ok(); } None => { warn!("No pending dynamic tool call found for call_id: {call_id}"); } } } pub async fn notify_approval(&self, approval_id: &str, decision: ReviewDecision) { let entry = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.remove_pending_approval(approval_id) } None => None, } }; match entry { Some(tx_approve) => { tx_approve.send(decision).ok(); } None => { warn!("No pending approval found for call_id: {approval_id}"); } } } /// Records input items: always append to conversation history and /// persist these response items to rollout. pub(crate) async fn record_conversation_items( &self, turn_context: &TurnContext, items: &[ResponseItem], ) { self.record_into_history(items, turn_context).await; self.persist_rollout_response_items(items).await; self.send_raw_response_items(turn_context, items).await; } /// Append ResponseItems to the in-memory conversation history only. pub(crate) async fn record_into_history( &self, items: &[ResponseItem], turn_context: &TurnContext, ) { let mut state = self.state.lock().await; state.record_items(items.iter(), turn_context.truncation_policy); } pub(crate) async fn record_model_warning(&self, message: impl Into, ctx: &TurnContext) { self.services .session_telemetry .counter("codex.model_warning", /*inc*/ 1, &[]); let item = ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: format!("Warning: {}", message.into()), }], end_turn: None, phase: None, }; self.record_conversation_items(ctx, &[item]).await; } async fn maybe_warn_on_server_model_mismatch( self: &Arc, turn_context: &Arc, server_model: String, ) -> bool { let requested_model = turn_context.model_info.slug.clone(); let server_model_normalized = server_model.to_ascii_lowercase(); let requested_model_normalized = requested_model.to_ascii_lowercase(); if server_model_normalized == requested_model_normalized { info!("server reported model {server_model} (matches requested model)"); return false; } warn!("server reported model {server_model} while requested model was {requested_model}"); let warning_message = format!( "Your account was flagged for potentially high-risk cyber activity and this request was routed to gpt-5.2 as a fallback. To regain access to gpt-5.3-codex, apply for trusted access: {CYBER_VERIFY_URL} or learn more: {CYBER_SAFETY_URL}" ); self.send_event( turn_context, EventMsg::ModelReroute(ModelRerouteEvent { from_model: requested_model.clone(), to_model: server_model.clone(), reason: ModelRerouteReason::HighRiskCyberActivity, }), ) .await; self.send_event( turn_context, EventMsg::Warning(WarningEvent { message: warning_message.clone(), }), ) .await; self.record_model_warning(warning_message, turn_context) .await; true } pub(crate) async fn replace_history( &self, items: Vec, reference_context_item: Option, ) { let mut state = self.state.lock().await; state.replace_history(items, reference_context_item); } pub(crate) async fn replace_compacted_history( &self, items: Vec, reference_context_item: Option, compacted_item: CompactedItem, ) { self.replace_history(items, reference_context_item.clone()) .await; self.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; if let Some(turn_context_item) = reference_context_item { self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item)]) .await; } self.services.model_client.advance_window_generation(); } async fn persist_rollout_response_items(&self, items: &[ResponseItem]) { let rollout_items: Vec = items .iter() .cloned() .map(RolloutItem::ResponseItem) .collect(); self.persist_rollout_items(&rollout_items).await; } pub fn enabled(&self, feature: Feature) -> bool { self.features.enabled(feature) } pub(crate) fn features(&self) -> ManagedFeatures { self.features.clone() } pub(crate) async fn collaboration_mode(&self) -> CollaborationMode { let state = self.state.lock().await; state.session_configuration.collaboration_mode.clone() } async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) { for item in items { self.send_event( turn_context, EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }), ) .await; } } pub(crate) async fn build_initial_context( &self, turn_context: &TurnContext, ) -> Vec { let mut developer_sections = Vec::::with_capacity(8); let mut contextual_user_sections = Vec::::with_capacity(2); let shell = self.user_shell(); let ( reference_context_item, previous_turn_settings, collaboration_mode, base_instructions, session_source, ) = { let state = self.state.lock().await; ( state.reference_context_item(), state.previous_turn_settings(), state.session_configuration.collaboration_mode.clone(), state.session_configuration.base_instructions.clone(), state.session_configuration.session_source.clone(), ) }; if let Some(model_switch_message) = crate::context_manager::updates::build_model_instructions_update_item( previous_turn_settings.as_ref(), turn_context, ) { developer_sections.push(model_switch_message.into_text()); } if turn_context.config.include_permissions_instructions { developer_sections.push( DeveloperInstructions::from_policy( turn_context.sandbox_policy.get(), turn_context.approval_policy.value(), turn_context.config.approvals_reviewer, self.services.exec_policy.current().as_ref(), &turn_context.cwd, turn_context .features .enabled(Feature::ExecPermissionApprovals), turn_context .features .enabled(Feature::RequestPermissionsTool), ) .into_text(), ); } let separate_guardian_developer_message = crate::guardian::is_guardian_reviewer_source(&session_source); // Keep the guardian policy prompt out of the aggregated developer bundle so it // stays isolated as its own top-level developer message for guardian subagents. if !separate_guardian_developer_message && let Some(developer_instructions) = turn_context.developer_instructions.as_deref() && !developer_instructions.is_empty() { developer_sections.push(developer_instructions.to_string()); } // Add developer instructions for memories. if turn_context.features.enabled(Feature::MemoryTool) && turn_context.config.memories.use_memories && let Some(memory_prompt) = build_memory_tool_developer_instructions(&turn_context.config.codex_home).await { developer_sections.push(memory_prompt); } // Add developer instructions from collaboration_mode if they exist and are non-empty if let Some(collab_instructions) = DeveloperInstructions::from_collaboration_mode(&collaboration_mode) { developer_sections.push(collab_instructions.into_text()); } if let Some(realtime_update) = crate::context_manager::updates::build_initial_realtime_item( reference_context_item.as_ref(), previous_turn_settings.as_ref(), turn_context, ) { developer_sections.push(realtime_update.into_text()); } if self.features.enabled(Feature::Personality) && let Some(personality) = turn_context.personality { let model_info = turn_context.model_info.clone(); let has_baked_personality = model_info.supports_personality() && base_instructions == model_info.get_model_instructions(Some(personality)); if !has_baked_personality && let Some(personality_message) = crate::context_manager::updates::personality_message_for( &model_info, personality, ) { developer_sections.push( DeveloperInstructions::personality_spec_message(personality_message) .into_text(), ); } } if turn_context.config.include_apps_instructions && turn_context.apps_enabled() { let mcp_connection_manager = self.services.mcp_connection_manager.read().await; let accessible_and_enabled_connectors = connectors::list_accessible_and_enabled_connectors_from_manager( &mcp_connection_manager, &turn_context.config, ) .await; if let Some(apps_section) = render_apps_section(&accessible_and_enabled_connectors) { developer_sections.push(apps_section); } } let implicit_skills = turn_context .turn_skills .outcome .allowed_skills_for_implicit_invocation(); if let Some(skills_section) = render_skills_section(&implicit_skills) { developer_sections.push(skills_section); } let loaded_plugins = self .services .plugins_manager .plugins_for_config(&turn_context.config) .await; if let Some(plugin_section) = render_plugins_section(loaded_plugins.capability_summaries()) { developer_sections.push(plugin_section); } if turn_context.features.enabled(Feature::CodexGitCommit) && let Some(commit_message_instruction) = commit_message_trailer_instruction( turn_context.config.commit_attribution.as_deref(), ) { developer_sections.push(commit_message_instruction); } if let Some(user_instructions) = turn_context.user_instructions.as_deref() { contextual_user_sections.push( UserInstructions { text: user_instructions.to_string(), directory: turn_context.cwd.to_string_lossy().into_owned(), } .serialize_to_text(), ); } if turn_context.config.include_environment_context { let subagents = self .services .agent_control .format_environment_context_subagents(self.conversation_id) .await; contextual_user_sections.push( EnvironmentContext::from_turn_context(turn_context, shell.as_ref()) .with_subagents(subagents) .serialize_to_xml(), ); } let mut items = Vec::with_capacity(3); if let Some(developer_message) = crate::context_manager::updates::build_developer_update_item(developer_sections) { items.push(developer_message); } if let Some(contextual_user_message) = crate::context_manager::updates::build_contextual_user_message(contextual_user_sections) { items.push(contextual_user_message); } // Emit the guardian policy prompt as a separate developer item so the guardian // subagent sees a distinct, easy-to-audit instruction block. if separate_guardian_developer_message && let Some(developer_instructions) = turn_context.developer_instructions.as_deref() && !developer_instructions.is_empty() && let Some(guardian_developer_message) = crate::context_manager::updates::build_developer_update_item(vec![ developer_instructions.to_string(), ]) { items.push(guardian_developer_message); } items } pub(crate) async fn persist_rollout_items(&self, items: &[RolloutItem]) { let recorder = { let guard = self.services.rollout.lock().await; guard.clone() }; if let Some(rec) = recorder && let Err(e) = rec.record_items(items).await { error!("failed to record rollout items: {e:#}"); } } pub(crate) async fn clone_history(&self) -> ContextManager { let state = self.state.lock().await; state.clone_history() } pub(crate) async fn reference_context_item(&self) -> Option { let state = self.state.lock().await; state.reference_context_item() } /// Persist the latest turn context snapshot for the first real user turn and for /// steady-state turns that emit model-visible context updates. /// /// When the reference snapshot is missing, this injects full initial context. Otherwise, it /// emits only settings diff items. /// /// If full context is injected and a model switch occurred, this prepends the /// `` developer message so model-specific instructions are not lost. /// /// This is the normal runtime path that establishes a new `reference_context_item`. /// Mid-turn compaction is the other path that can re-establish that baseline when it /// reinjects full initial context into replacement history. Other non-regular tasks /// intentionally do not update the baseline. pub(crate) async fn record_context_updates_and_set_reference_context_item( &self, turn_context: &TurnContext, ) { let reference_context_item = { let state = self.state.lock().await; state.reference_context_item() }; let should_inject_full_context = reference_context_item.is_none(); let context_items = if should_inject_full_context { self.build_initial_context(turn_context).await } else { // Steady-state path: append only context diffs to minimize token overhead. self.build_settings_update_items(reference_context_item.as_ref(), turn_context) .await }; let turn_context_item = turn_context.to_turn_context_item(); if !context_items.is_empty() { self.record_conversation_items(turn_context, &context_items) .await; } // Persist one `TurnContextItem` per real user turn so resume/lazy replay can recover the // latest durable baseline even when this turn emitted no model-visible context diffs. self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item.clone())]) .await; // Advance the in-memory diff baseline even when this turn emitted no model-visible // context items. This keeps later runtime diffing aligned with the current turn state. let mut state = self.state.lock().await; state.set_reference_context_item(Some(turn_context_item)); } pub(crate) async fn update_token_usage_info( &self, turn_context: &TurnContext, token_usage: Option<&TokenUsage>, ) { if let Some(token_usage) = token_usage { let mut state = self.state.lock().await; state.update_token_info_from_usage(token_usage, turn_context.model_context_window()); } self.send_token_count_event(turn_context).await; } pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) { let history = self.clone_history().await; let base_instructions = self.get_base_instructions().await; let Some(estimated_total_tokens) = history.estimate_token_count_with_base_instructions(&base_instructions) else { return; }; { let mut state = self.state.lock().await; let mut info = state.token_info().unwrap_or(TokenUsageInfo { total_token_usage: TokenUsage::default(), last_token_usage: TokenUsage::default(), model_context_window: None, }); info.last_token_usage = TokenUsage { input_tokens: 0, cached_input_tokens: 0, output_tokens: 0, reasoning_output_tokens: 0, total_tokens: estimated_total_tokens.max(0), }; if let Some(model_context_window) = turn_context.model_context_window() { info.model_context_window = Some(model_context_window); } state.set_token_info(Some(info)); } self.send_token_count_event(turn_context).await; } pub(crate) async fn update_rate_limits( &self, turn_context: &TurnContext, new_rate_limits: RateLimitSnapshot, ) { { let mut state = self.state.lock().await; state.set_rate_limits(new_rate_limits); } self.send_token_count_event(turn_context).await; } pub(crate) async fn mcp_dependency_prompted(&self) -> HashSet { let state = self.state.lock().await; state.mcp_dependency_prompted() } pub(crate) async fn record_mcp_dependency_prompted(&self, names: I) where I: IntoIterator, { let mut state = self.state.lock().await; state.record_mcp_dependency_prompted(names); } pub async fn dependency_env(&self) -> HashMap { let state = self.state.lock().await; state.dependency_env() } pub async fn set_dependency_env(&self, values: HashMap) { let mut state = self.state.lock().await; state.set_dependency_env(values); } pub(crate) async fn set_server_reasoning_included(&self, included: bool) { let mut state = self.state.lock().await; state.set_server_reasoning_included(included); } async fn send_token_count_event(&self, turn_context: &TurnContext) { let (info, rate_limits) = { let state = self.state.lock().await; state.token_info_and_rate_limits() }; let event = EventMsg::TokenCount(TokenCountEvent { info, rate_limits }); self.send_event(turn_context, event).await; } pub(crate) async fn set_total_tokens_full(&self, turn_context: &TurnContext) { if let Some(context_window) = turn_context.model_context_window() { let mut state = self.state.lock().await; state.set_token_usage_full(context_window); } self.send_token_count_event(turn_context).await; } pub(crate) async fn record_response_item_and_emit_turn_item( &self, turn_context: &TurnContext, response_item: ResponseItem, ) { // Add to conversation history and persist response item to rollout. self.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) .await; // Derive a turn item and emit lifecycle events if applicable. if let Some(item) = parse_turn_item(&response_item) { self.emit_turn_item_started(turn_context, &item).await; self.emit_turn_item_completed(turn_context, item).await; } } pub(crate) async fn record_user_prompt_and_emit_turn_item( &self, turn_context: &TurnContext, input: &[UserInput], response_item: ResponseItem, ) { // Persist the user message to history, but emit the turn item from `UserInput` so // UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry // those spans, and `record_response_item_and_emit_turn_item` would drop them. self.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) .await; let turn_item = TurnItem::UserMessage(UserMessageItem::new(input)); self.emit_turn_item_started(turn_context, &turn_item).await; self.emit_turn_item_completed(turn_context, turn_item).await; self.ensure_rollout_materialized().await; } pub(crate) async fn notify_background_event( &self, turn_context: &TurnContext, message: impl Into, ) { let event = EventMsg::BackgroundEvent(BackgroundEventEvent { message: message.into(), }); self.send_event(turn_context, event).await; } pub(crate) async fn notify_stream_error( &self, turn_context: &TurnContext, message: impl Into, codex_error: CodexErr, ) { let additional_details = codex_error.to_string(); let codex_error_info = CodexErrorInfo::ResponseStreamDisconnected { http_status_code: codex_error.http_status_code_value(), }; let event = EventMsg::StreamError(StreamErrorEvent { message: message.into(), codex_error_info: Some(codex_error_info), additional_details: Some(additional_details), }); self.send_event(turn_context, event).await; } async fn maybe_start_ghost_snapshot( self: &Arc, turn_context: Arc, cancellation_token: CancellationToken, ) { if !self.enabled(Feature::GhostCommit) { return; } let token = match turn_context.tool_call_gate.subscribe().await { Ok(token) => token, Err(err) => { warn!("failed to subscribe to ghost snapshot readiness: {err}"); return; } }; info!("spawning ghost snapshot task"); let task = GhostSnapshotTask::new(token); Arc::new(task) .run( Arc::new(SessionTaskContext::new(self.clone())), turn_context.clone(), Vec::new(), cancellation_token, ) .await; } /// Inject additional user input into the currently active turn. /// /// Returns the active turn id when accepted. pub async fn steer_input( &self, input: Vec, expected_turn_id: Option<&str>, responsesapi_client_metadata: Option>, ) -> Result { if input.is_empty() { return Err(SteerInputError::EmptyInput); } let mut active = self.active_turn.lock().await; let Some(active_turn) = active.as_mut() else { return Err(SteerInputError::NoActiveTurn(input)); }; let Some((active_turn_id, _)) = active_turn.tasks.first() else { return Err(SteerInputError::NoActiveTurn(input)); }; if let Some(expected_turn_id) = expected_turn_id && expected_turn_id != active_turn_id { return Err(SteerInputError::ExpectedTurnMismatch { expected: expected_turn_id.to_string(), actual: active_turn_id.clone(), }); } match active_turn.tasks.first().map(|(_, task)| task.kind) { Some(crate::state::TaskKind::Regular) => {} Some(crate::state::TaskKind::Review) => { return Err(SteerInputError::ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind::Review, }); } Some(crate::state::TaskKind::Compact) => { return Err(SteerInputError::ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind::Compact, }); } None => return Err(SteerInputError::NoActiveTurn(input)), } if let Some(responsesapi_client_metadata) = responsesapi_client_metadata && let Some((_, active_task)) = active_turn.tasks.first() { active_task .turn_context .turn_metadata_state .set_responsesapi_client_metadata(responsesapi_client_metadata); } let mut turn_state = active_turn.turn_state.lock().await; turn_state.push_pending_input(input.into()); turn_state.accept_mailbox_delivery_for_current_turn(); Ok(active_turn_id.clone()) } /// Returns the input if there was no task running to inject into. pub async fn inject_response_items( &self, input: Vec, ) -> Result<(), Vec> { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; for item in input { ts.push_pending_input(item); } Ok(()) } None => Err(input), } } pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) { let turn_state = self.turn_state_for_sub_id(sub_id).await; let Some(turn_state) = turn_state else { return; }; let mut turn_state = turn_state.lock().await; if turn_state.has_pending_input() { return; } turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn); } pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) { let turn_state = self.turn_state_for_sub_id(sub_id).await; let Some(turn_state) = turn_state else { return; }; turn_state .lock() .await .set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); } async fn turn_state_for_sub_id( &self, sub_id: &str, ) -> Option>> { let active = self.active_turn.lock().await; active.as_ref().and_then(|active_turn| { active_turn .tasks .contains_key(sub_id) .then(|| Arc::clone(&active_turn.turn_state)) }) } pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver { self.mailbox.subscribe() } pub(crate) fn enqueue_mailbox_communication(&self, communication: InterAgentCommunication) { self.mailbox.send(communication); } pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool { self.mailbox_rx.lock().await.has_pending_trigger_turn() } pub async fn prepend_pending_input(&self, input: Vec) -> Result<(), ()> { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ts.prepend_pending_input(input); Ok(()) } None => Err(()), } } pub async fn get_pending_input(&self) -> Vec { let (pending_input, accepts_mailbox_delivery) = { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; ( ts.take_pending_input(), ts.accepts_mailbox_delivery_for_current_turn(), ) } None => (Vec::new(), true), } }; if !accepts_mailbox_delivery { return pending_input; } let mailbox_items = { let mut mailbox_rx = self.mailbox_rx.lock().await; mailbox_rx .drain() .into_iter() .map(|mail| mail.to_response_input_item()) .collect::>() }; if pending_input.is_empty() { mailbox_items } else if mailbox_items.is_empty() { pending_input } else { let mut pending_input = pending_input; pending_input.extend(mailbox_items); pending_input } } /// Queue response items to be injected into the next active turn created for this session. #[cfg(test)] pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec) { if items.is_empty() { return; } let mut idle_pending_input = self.idle_pending_input.lock().await; idle_pending_input.extend(items); } pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec { std::mem::take(&mut *self.idle_pending_input.lock().await) } pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool { !self.idle_pending_input.lock().await.is_empty() } pub async fn has_pending_input(&self) -> bool { let (has_turn_pending_input, accepts_mailbox_delivery) = { let active = self.active_turn.lock().await; match active.as_ref() { Some(at) => { let ts = at.turn_state.lock().await; ( ts.has_pending_input(), ts.accepts_mailbox_delivery_for_current_turn(), ) } None => (false, true), } }; if has_turn_pending_input { return true; } if !accepts_mailbox_delivery { return false; } self.mailbox_rx.lock().await.has_pending() } pub async fn interrupt_task(self: &Arc) { info!("interrupt received: abort current task, if any"); let has_active_turn = { self.active_turn.lock().await.is_some() }; if has_active_turn { self.abort_all_tasks(TurnAbortReason::Interrupted).await; } else { self.cancel_mcp_startup().await; } } pub(crate) fn hooks(&self) -> &Hooks { &self.services.hooks } pub(crate) fn user_shell(&self) -> Arc { Arc::clone(&self.services.user_shell) } pub(crate) async fn current_rollout_path(&self) -> Option { let recorder = { let guard = self.services.rollout.lock().await; guard.clone() }; recorder.map(|recorder| recorder.rollout_path().to_path_buf()) } pub(crate) async fn hook_transcript_path(&self) -> Option { self.ensure_rollout_materialized().await; self.current_rollout_path().await } pub(crate) async fn take_pending_session_start_source( &self, ) -> Option { let mut state = self.state.lock().await; state.take_pending_session_start_source() } fn show_raw_agent_reasoning(&self) -> bool { self.services.show_raw_agent_reasoning } } pub(crate) fn emit_subagent_session_started( analytics_events_client: &AnalyticsEventsClient, client_metadata: AppServerClientMetadata, thread_id: ThreadId, parent_thread_id: Option, thread_config: ThreadConfigSnapshot, subagent_source: SubAgentSource, ) { let AppServerClientMetadata { client_name, client_version, } = client_metadata; let (Some(client_name), Some(client_version)) = (client_name, client_version) else { tracing::warn!("skipping subagent thread analytics: missing inherited client metadata"); return; }; let created_at = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); analytics_events_client.track_subagent_thread_started(SubAgentThreadStartedInput { thread_id: thread_id.to_string(), parent_thread_id: parent_thread_id.map(|thread_id| thread_id.to_string()), product_client_id: client_name.clone(), client_name, client_version, model: thread_config.model, ephemeral: thread_config.ephemeral, subagent_source, created_at, }); } fn skills_to_info( skills: &[SkillMetadata], disabled_paths: &HashSet, ) -> Vec { skills .iter() .map(|skill| ProtocolSkillMetadata { name: skill.name.clone(), description: skill.description.clone(), short_description: skill.short_description.clone(), interface: skill .interface .clone() .map(|interface| ProtocolSkillInterface { display_name: interface.display_name, short_description: interface.short_description, icon_small: interface.icon_small, icon_large: interface.icon_large, brand_color: interface.brand_color, default_prompt: interface.default_prompt, }), dependencies: skill.dependencies.clone().map(|dependencies| { ProtocolSkillDependencies { tools: dependencies .tools .into_iter() .map(|tool| ProtocolSkillToolDependency { r#type: tool.r#type, value: tool.value, description: tool.description, transport: tool.transport, command: tool.command, url: tool.url, }) .collect(), } }), path: skill.path_to_skills_md.clone(), scope: skill.scope, enabled: !disabled_paths.contains(&skill.path_to_skills_md), }) .collect() } fn errors_to_info(errors: &[SkillError]) -> Vec { errors .iter() .map(|err| SkillErrorInfo { path: err.path.to_path_buf(), message: err.message.clone(), }) .collect() } use crate::memories::prompts::build_memory_tool_developer_instructions; #[cfg(test)] pub(crate) use tests::make_session_and_context; #[cfg(test)] pub(crate) use tests::make_session_and_context_with_dynamic_tools_and_rx; #[cfg(test)] pub(crate) use tests::make_session_and_context_with_rx; #[cfg(test)] pub(crate) use tests::make_session_configuration_for_tests; #[cfg(test)] #[path = "codex_tests.rs"] mod tests;