From a1736fcd20396976daa631bd4ed53ca7d066bbb6 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Thu, 16 Apr 2026 15:28:59 -0700 Subject: [PATCH] [codex] Split codex turn logic (#18206) ## Summary - Move Codex turn execution logic from `codex.rs` into `codex/turn.rs`. - Keep the existing crate-visible `run_turn`, `build_prompt`, `built_tools`, and `get_last_assistant_message_from_turn` surface re-exported from `codex.rs`. - Preserve test access for moved turn helpers while reducing the main `codex.rs` orchestration footprint. ## Stack - Base: #18200 (`pakrym/split-codex-handlers`) ## Testing - `CARGO_INCREMENTAL=0 cargo test -p codex-core --lib` - `just fix -p codex-core` - `just fmt` - `git diff --check` --- codex-rs/core/src/codex.rs | 2152 +----------------------------- codex-rs/core/src/codex/turn.rs | 2169 +++++++++++++++++++++++++++++++ 2 files changed, 2188 insertions(+), 2133 deletions(-) create mode 100644 codex-rs/core/src/codex/turn.rs diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index f356b3fff7..3d50f7735e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -18,16 +18,10 @@ use crate::agent_identity::RegisteredAgentTask; use crate::apps::render_apps_section; use crate::commit_attribution::commit_message_trailer_instruction; use crate::compact; -use crate::compact::InitialContextInjection; -use crate::compact::run_inline_auto_compact_task; -use crate::compact::should_use_remote_compact_task; -use crate::compact_remote::run_inline_remote_auto_compact_task; use crate::config::ManagedFeatures; use crate::connectors; use crate::exec_policy::ExecPolicyManager; -use crate::hook_runtime::emit_hook_completed_events; use crate::installation_id::resolve_installation_id; -use crate::mcp_tool_exposure::build_mcp_tool_exposure; use crate::parse_turn_item; use crate::path_utils::normalize_for_native_workdir; use crate::realtime_conversation::RealtimeConversationManager; @@ -35,28 +29,13 @@ use crate::render_skills_section; use crate::rollout::find_thread_name_by_id; use crate::session_prefix::format_subagent_notification_message; use crate::skills_load_input_from_config; -use crate::stream_events_utils::HandleOutputCtx; -use crate::stream_events_utils::handle_non_tool_response_item; -use crate::stream_events_utils::handle_output_item_done; -use crate::stream_events_utils::last_assistant_message_from_item; -use crate::stream_events_utils::mark_thread_memory_mode_polluted_if_external_context; -use crate::stream_events_utils::raw_assistant_output_text_from_item; -use crate::stream_events_utils::record_completed_response_item; use crate::turn_metadata::TurnMetadataState; -use crate::unavailable_tool::collect_unavailable_called_tools; -use crate::util::error_or_panic; use async_channel::Receiver; use async_channel::Sender; use chrono::Local; use chrono::Utc; use codex_analytics::AnalyticsEventsClient; -use codex_analytics::AppInvocation; -use codex_analytics::CompactionPhase; -use codex_analytics::CompactionReason; -use codex_analytics::InvocationType; use codex_analytics::SubAgentThreadStartedInput; -use codex_analytics::TurnResolvedConfigFact; -use codex_analytics::build_track_events_context; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::McpServerElicitationRequest; use codex_app_server_protocol::McpServerElicitationRequestParams; @@ -67,10 +46,6 @@ use codex_exec_server::FileSystemSandboxContext; use codex_features::FEATURES; use codex_features::Feature; use codex_features::unstable_features_warning_event; -use codex_hooks::HookEvent; -use codex_hooks::HookEventAfterAgent; -use codex_hooks::HookPayload; -use codex_hooks::HookResult; use codex_hooks::Hooks; use codex_hooks::HooksConfig; use codex_login::AuthManager; @@ -102,10 +77,8 @@ use codex_protocol::config_types::Settings; use codex_protocol::config_types::WebSearchMode; use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; -use codex_protocol::items::PlanItem; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; -use codex_protocol::items::build_hook_prompt_message; use codex_protocol::mcp::CallToolResult; use codex_protocol::models::BaseInstructions; use codex_protocol::models::PermissionProfile; @@ -140,17 +113,10 @@ use codex_rollout::state_db; use codex_shell_command::parse_command::parse_command; use codex_terminal_detection::user_agent; use codex_thread_store::LocalThreadStore; -use codex_tools::filter_tool_suggest_discoverable_tools_for_client; use codex_utils_output_truncation::TruncationPolicy; -use codex_utils_stream_parser::AssistantTextChunk; -use codex_utils_stream_parser::AssistantTextStreamParser; -use codex_utils_stream_parser::ProposedPlanSegment; -use codex_utils_stream_parser::extract_proposed_plan_text; -use codex_utils_stream_parser::strip_citations; use futures::future::BoxFuture; use futures::future::Shared; use futures::prelude::*; -use futures::stream::FuturesOrdered; use rmcp::model::ListResourceTemplatesResult; use rmcp::model::ListResourcesResult; use rmcp::model::PaginatedRequestParams; @@ -168,19 +134,13 @@ use toml::Value as TomlValue; use tracing::Instrument; use tracing::debug; use tracing::error; -use tracing::field; use tracing::info; use tracing::info_span; use tracing::instrument; -use tracing::trace; -use tracing::trace_span; use tracing::warn; use uuid::Uuid; use crate::client::ModelClient; -use crate::client::ModelClientSession; -use crate::client_common::Prompt; -use crate::client_common::ResponseEvent; use crate::codex_thread::ThreadConfigSnapshot; use crate::compact::collect_user_messages; use crate::config::Config; @@ -204,10 +164,22 @@ use codex_protocol::exec_output::StreamOutput; mod handlers; mod rollout_reconstruction; +mod turn; #[cfg(test)] use self::handlers::submission_dispatch_span; use self::handlers::submission_loop; #[cfg(test)] +use self::turn::AssistantMessageStreamParsers; +pub(crate) use self::turn::build_prompt; +pub(crate) use self::turn::built_tools; +#[cfg(test)] +use self::turn::collect_explicit_app_ids_from_skill_items; +#[cfg(test)] +use self::turn::filter_connectors_for_input; +pub(crate) use self::turn::get_last_assistant_message_from_turn; +use self::turn::realtime_text_for_event; +pub(crate) use self::turn::run_turn; +#[cfg(test)] mod rollout_reconstruction_tests; #[derive(Debug, PartialEq)] @@ -262,40 +234,18 @@ pub(crate) struct PreviousTurnSettings { } use crate::SkillError; -use crate::SkillInjections; use crate::SkillLoadOutcome; use crate::SkillMetadata; use crate::SkillsManager; use crate::agents_md::AgentsMdManager; -use crate::build_skill_injections; -use crate::collect_env_var_dependencies; -use crate::collect_explicit_skill_mentions; use crate::exec_policy::ExecPolicyUpdateError; -use crate::feedback_tags; use crate::guardian::GuardianReviewSessionManager; -use crate::hook_runtime::PendingInputHookDisposition; -use crate::hook_runtime::inspect_pending_input; -use crate::hook_runtime::record_additional_contexts; -use crate::hook_runtime::record_pending_input; -use crate::hook_runtime::run_pending_session_start_hooks; -use crate::hook_runtime::run_user_prompt_submit_hooks; -use crate::injection::ToolMentionKind; -use crate::injection::app_id_from_path; -use crate::injection::tool_kind_for_path; use crate::instructions::UserInstructions; use crate::mcp::McpManager; -use crate::mcp_skill_dependencies::maybe_prompt_and_install_mcp_dependencies; use crate::memories; -use crate::mentions::build_connector_slug_counts; -use crate::mentions::build_skill_name_counts; -use crate::mentions::collect_explicit_app_ids; -use crate::mentions::collect_explicit_plugin_mentions; -use crate::mentions::collect_tool_mentions_from_messages; use crate::network_policy_decision::execpolicy_network_rule_amendment; use crate::plugins::PluginsManager; -use crate::plugins::build_plugin_injections; use crate::plugins::render_plugins_section; -use crate::resolve_skill_dependencies_for_turn; use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; use crate::rollout::map_session_init_error; @@ -310,28 +260,26 @@ use crate::state::ActiveTurn; use crate::state::MailboxDeliveryPhase; use crate::state::SessionServices; use crate::state::SessionState; +#[cfg(test)] +use crate::stream_events_utils::HandleOutputCtx; +#[cfg(test)] +use crate::stream_events_utils::handle_output_item_done; use crate::tasks::GhostSnapshotTask; use crate::tasks::ReviewTask; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; -use crate::tools::ToolRouter; -use crate::tools::context::SharedTurnDiffTracker; use crate::tools::js_repl::JsReplHandle; use crate::tools::js_repl::resolve_compatible_node; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::network_approval::build_blocked_request_observer; use crate::tools::network_approval::build_network_policy_decider; +#[cfg(test)] use crate::tools::parallel::ToolCallRuntime; -use crate::tools::router::ToolRouterParams; use crate::tools::sandboxing::ApprovalStore; -use crate::turn_diff_tracker::TurnDiffTracker; use crate::turn_timing::TurnTimingState; use crate::turn_timing::record_turn_ttfm_metric; -use crate::turn_timing::record_turn_ttft_metric; use crate::unified_exec::UnifiedExecProcessManager; -use crate::util::backoff; use crate::windows_sandbox::WindowsSandboxLevelExt; -use codex_async_utils::OrCancelExt; use codex_git_utils::get_git_repo_root; use codex_mcp::compute_auth_statuses; use codex_mcp::with_codex_apps_mcp; @@ -348,8 +296,6 @@ use codex_protocol::models::DeveloperInstructions; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; -use codex_protocol::protocol::AgentMessageContentDeltaEvent; -use codex_protocol::protocol::AgentReasoningSectionBreakEvent; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::BackgroundEventEvent; @@ -367,10 +313,7 @@ use codex_protocol::protocol::ModelRerouteReason; use codex_protocol::protocol::NetworkApprovalContext; use codex_protocol::protocol::NonSteerableTurnKind; use codex_protocol::protocol::Op; -use codex_protocol::protocol::PlanDeltaEvent; use codex_protocol::protocol::RateLimitSnapshot; -use codex_protocol::protocol::ReasoningContentDeltaEvent; -use codex_protocol::protocol::ReasoningRawContentDeltaEvent; use codex_protocol::protocol::RequestUserInputEvent; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SandboxPolicy; @@ -386,7 +329,6 @@ use codex_protocol::protocol::Submission; use codex_protocol::protocol::TokenCountEvent; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; -use codex_protocol::protocol::TurnDiffEvent; use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; use codex_tools::ToolsConfig; @@ -394,6 +336,8 @@ use codex_tools::ToolsConfigParams; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; +#[cfg(test)] +use codex_utils_stream_parser::ProposedPlanSegment; fn image_generation_tool_auth_allowed(auth_manager: Option<&AuthManager>) -> bool { matches!( @@ -5032,2064 +4976,6 @@ fn errors_to_info(errors: &[SkillError]) -> Vec { .collect() } -/// Takes a user message as input and runs a loop where, at each sampling request, the model -/// replies with either: -/// -/// - requested function calls -/// - an assistant message -/// -/// While it is possible for the model to return multiple of these items in a -/// single sampling request, in practice, we generally one item per sampling request: -/// -/// - If the model requests a function call, we execute it and send the output -/// back to the model in the next sampling request. -/// - If the model sends only an assistant message, we record it in the -/// conversation history and consider the turn complete. -/// -pub(crate) async fn run_turn( - sess: Arc, - turn_context: Arc, - input: Vec, - prewarmed_client_session: Option, - cancellation_token: CancellationToken, -) -> Option { - if input.is_empty() && !sess.has_pending_input().await { - return None; - } - - let model_info = turn_context.model_info.clone(); - let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX); - let mut prewarmed_client_session = prewarmed_client_session; - // TODO(ccunningham): Pre-turn compaction runs before context updates and the - // new user message are recorded. Estimate pending incoming items (context - // diffs/full reinjection + user input) and trigger compaction preemptively - // when they would push the thread over the compaction threshold. - let pre_sampling_compacted = match run_pre_sampling_compact(&sess, &turn_context).await { - Ok(pre_sampling_compacted) => pre_sampling_compacted, - Err(_) => { - error!("Failed to run pre-sampling compact"); - return None; - } - }; - if pre_sampling_compacted && let Some(mut client_session) = prewarmed_client_session.take() { - client_session.reset_websocket_session(); - } - - let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); - - sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) - .await; - - let loaded_plugins = sess - .services - .plugins_manager - .plugins_for_config(&turn_context.config) - .await; - // Structured plugin:// mentions are resolved from the current session's - // enabled plugins, then converted into turn-scoped guidance below. - let mentioned_plugins = - collect_explicit_plugin_mentions(&input, loaded_plugins.capability_summaries()); - let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { - // Plugin mentions need raw MCP/app inventory even when app tools - // are normally hidden so we can describe the plugin's currently - // usable capabilities for this turn. - match sess - .services - .mcp_connection_manager - .read() - .await - .list_all_tools() - .or_cancel(&cancellation_token) - .await - { - Ok(mcp_tools) => mcp_tools, - Err(_) if turn_context.apps_enabled() => return None, - Err(_) => HashMap::new(), - } - } else { - HashMap::new() - }; - let available_connectors = if turn_context.apps_enabled() { - let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible( - loaded_plugins - .effective_apps() - .into_iter() - .map(|connector_id| connector_id.0), - connectors::accessible_connectors_from_mcp_tools(&mcp_tools), - ); - connectors::with_app_enabled_state(connectors, &turn_context.config) - } else { - Vec::new() - }; - let connector_slug_counts = build_connector_slug_counts(&available_connectors); - let skill_name_counts_lower = skills_outcome - .as_ref() - .map_or_else(HashMap::new, |outcome| { - build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1 - }); - let mentioned_skills = skills_outcome.as_ref().map_or_else(Vec::new, |outcome| { - collect_explicit_skill_mentions( - &input, - &outcome.skills, - &outcome.disabled_paths, - &connector_slug_counts, - ) - }); - let config = turn_context.config.clone(); - if config - .features - .enabled(Feature::SkillEnvVarDependencyPrompt) - { - let env_var_dependencies = collect_env_var_dependencies(&mentioned_skills); - resolve_skill_dependencies_for_turn(&sess, &turn_context, &env_var_dependencies).await; - } - - maybe_prompt_and_install_mcp_dependencies( - sess.as_ref(), - turn_context.as_ref(), - &cancellation_token, - &mentioned_skills, - ) - .await; - - let session_telemetry = turn_context.session_telemetry.clone(); - let thread_id = sess.conversation_id.to_string(); - let tracking = build_track_events_context( - turn_context.model_info.slug.clone(), - thread_id, - turn_context.sub_id.clone(), - ); - let SkillInjections { - items: skill_items, - warnings: skill_warnings, - } = build_skill_injections( - &mentioned_skills, - skills_outcome, - Some(&session_telemetry), - &sess.services.analytics_events_client, - tracking.clone(), - ) - .await; - - for message in skill_warnings { - sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message })) - .await; - } - - let plugin_items = - build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors); - let mentioned_plugin_metadata = mentioned_plugins - .iter() - .filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata) - .collect::>(); - - let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input); - explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items( - &skill_items, - &available_connectors, - &skill_name_counts_lower, - )); - let connector_names_by_id = available_connectors - .iter() - .map(|connector| (connector.id.as_str(), connector.name.as_str())) - .collect::>(); - let mentioned_app_invocations = explicitly_enabled_connectors - .iter() - .map(|connector_id| AppInvocation { - connector_id: Some(connector_id.clone()), - app_name: connector_names_by_id - .get(connector_id.as_str()) - .map(|name| (*name).to_string()), - invocation_type: Some(InvocationType::Explicit), - }) - .collect::>(); - - if run_pending_session_start_hooks(&sess, &turn_context).await { - return None; - } - let additional_contexts = if input.is_empty() { - Vec::new() - } else { - let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone()); - let response_item: ResponseItem = initial_input_for_turn.clone().into(); - let user_prompt_submit_outcome = run_user_prompt_submit_hooks( - &sess, - &turn_context, - UserMessageItem::new(&input).message(), - ) - .await; - if user_prompt_submit_outcome.should_stop { - record_additional_contexts( - &sess, - &turn_context, - user_prompt_submit_outcome.additional_contexts, - ) - .await; - return None; - } - sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item) - .await; - user_prompt_submit_outcome.additional_contexts - }; - sess.services - .analytics_events_client - .track_app_mentioned(tracking.clone(), mentioned_app_invocations); - for plugin in mentioned_plugin_metadata { - sess.services - .analytics_events_client - .track_plugin_used(tracking.clone(), plugin); - } - sess.merge_connector_selection(explicitly_enabled_connectors.clone()) - .await; - record_additional_contexts(&sess, &turn_context, additional_contexts).await; - if !input.is_empty() { - // Track the previous-turn baseline from the regular user-turn path only so - // standalone tasks (compact/shell/review/undo) cannot suppress future - // model/realtime injections. - sess.set_previous_turn_settings(Some(PreviousTurnSettings { - model: turn_context.model_info.slug.clone(), - realtime_active: Some(turn_context.realtime_active), - })) - .await; - } - if let Err(error) = sess.ensure_agent_task_registered().await { - warn!(error = %error, "agent task registration failed"); - sess.send_event( - turn_context.as_ref(), - EventMsg::Error(ErrorEvent { - message: format!( - "Agent task registration failed. Please try again; Codex will attempt to register the task again on the next turn: {error}" - ), - codex_error_info: Some(CodexErrorInfo::Other), - }), - ) - .await; - return None; - } - - if !skill_items.is_empty() { - sess.record_conversation_items(&turn_context, &skill_items) - .await; - } - if !plugin_items.is_empty() { - sess.record_conversation_items(&turn_context, &plugin_items) - .await; - } - - track_turn_resolved_config_analytics(&sess, &turn_context, &input).await; - - let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); - sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token()) - .await; - let mut last_agent_message: Option = None; - let mut stop_hook_active = false; - // Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains - // many turns, from the perspective of the user, it is a single turn. - let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())); - let mut server_model_warning_emitted_for_turn = false; - - // `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse - // one instance across retries within this turn. - let mut client_session = - prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session()); - // Pending input is drained into history before building the next model request. - // However, we defer that drain until after sampling in two cases: - // 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first. - // 2. After auto-compact, when model/tool continuation needs to resume before any steer. - let mut can_drain_pending_input = input.is_empty(); - - loop { - if run_pending_session_start_hooks(&sess, &turn_context).await { - break; - } - - // Note that pending_input would be something like a message the user - // submitted through the UI while the model was running. Though the UI - // may support this, the model might not. - let pending_input = if can_drain_pending_input { - sess.get_pending_input().await - } else { - Vec::new() - }; - - let mut blocked_pending_input = false; - let mut blocked_pending_input_contexts = Vec::new(); - let mut requeued_pending_input = false; - let mut accepted_pending_input = Vec::new(); - if !pending_input.is_empty() { - let mut pending_input_iter = pending_input.into_iter(); - while let Some(pending_input_item) = pending_input_iter.next() { - match inspect_pending_input(&sess, &turn_context, pending_input_item).await { - PendingInputHookDisposition::Accepted(pending_input) => { - accepted_pending_input.push(*pending_input); - } - PendingInputHookDisposition::Blocked { - additional_contexts, - } => { - let remaining_pending_input = pending_input_iter.collect::>(); - if !remaining_pending_input.is_empty() { - let _ = sess.prepend_pending_input(remaining_pending_input).await; - requeued_pending_input = true; - } - blocked_pending_input_contexts = additional_contexts; - blocked_pending_input = true; - break; - } - } - } - } - - let has_accepted_pending_input = !accepted_pending_input.is_empty(); - for pending_input in accepted_pending_input { - record_pending_input(&sess, &turn_context, pending_input).await; - } - record_additional_contexts(&sess, &turn_context, blocked_pending_input_contexts).await; - - if blocked_pending_input && !has_accepted_pending_input { - if requeued_pending_input { - continue; - } - break; - } - - // Construct the input that we will send to the model. - let sampling_request_input: Vec = { - sess.clone_history() - .await - .for_prompt(&turn_context.model_info.input_modalities) - }; - - let sampling_request_input_messages = sampling_request_input - .iter() - .filter_map(|item| match parse_turn_item(item) { - Some(TurnItem::UserMessage(user_message)) => Some(user_message), - _ => None, - }) - .map(|user_message| user_message.message()) - .collect::>(); - let turn_metadata_header = turn_context.turn_metadata_state.current_header_value(); - match run_sampling_request( - Arc::clone(&sess), - Arc::clone(&turn_context), - Arc::clone(&turn_diff_tracker), - &mut client_session, - turn_metadata_header.as_deref(), - sampling_request_input, - &explicitly_enabled_connectors, - skills_outcome, - &mut server_model_warning_emitted_for_turn, - cancellation_token.child_token(), - ) - .await - { - Ok(sampling_request_output) => { - let SamplingRequestResult { - needs_follow_up: model_needs_follow_up, - last_agent_message: sampling_request_last_agent_message, - } = sampling_request_output; - can_drain_pending_input = true; - let has_pending_input = sess.has_pending_input().await; - let needs_follow_up = model_needs_follow_up || has_pending_input; - let total_usage_tokens = sess.get_total_token_usage().await; - let token_limit_reached = total_usage_tokens >= auto_compact_limit; - - let estimated_token_count = - sess.get_estimated_token_count(turn_context.as_ref()).await; - - trace!( - turn_id = %turn_context.sub_id, - total_usage_tokens, - estimated_token_count = ?estimated_token_count, - auto_compact_limit, - token_limit_reached, - model_needs_follow_up, - has_pending_input, - needs_follow_up, - "post sampling token usage" - ); - - // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. - if token_limit_reached && needs_follow_up { - if run_auto_compact( - &sess, - &turn_context, - InitialContextInjection::BeforeLastUserMessage, - CompactionReason::ContextLimit, - CompactionPhase::MidTurn, - ) - .await - .is_err() - { - return None; - } - client_session.reset_websocket_session(); - can_drain_pending_input = !model_needs_follow_up; - continue; - } - - if !needs_follow_up { - last_agent_message = sampling_request_last_agent_message; - let stop_hook_permission_mode = match turn_context.approval_policy.value() { - AskForApproval::Never => "bypassPermissions", - AskForApproval::UnlessTrusted - | AskForApproval::OnFailure - | AskForApproval::OnRequest - | AskForApproval::Granular(_) => "default", - } - .to_string(); - let stop_request = codex_hooks::StopRequest { - session_id: sess.conversation_id, - turn_id: turn_context.sub_id.clone(), - cwd: turn_context.cwd.clone(), - transcript_path: sess.hook_transcript_path().await, - model: turn_context.model_info.slug.clone(), - permission_mode: stop_hook_permission_mode, - stop_hook_active, - last_assistant_message: last_agent_message.clone(), - }; - for run in sess.hooks().preview_stop(&stop_request) { - sess.send_event( - &turn_context, - EventMsg::HookStarted(codex_protocol::protocol::HookStartedEvent { - turn_id: Some(turn_context.sub_id.clone()), - run, - }), - ) - .await; - } - let stop_outcome = sess.hooks().run_stop(stop_request).await; - emit_hook_completed_events(&sess, &turn_context, stop_outcome.hook_events) - .await; - if stop_outcome.should_block { - if let Some(hook_prompt_message) = - build_hook_prompt_message(&stop_outcome.continuation_fragments) - { - sess.record_conversation_items( - &turn_context, - std::slice::from_ref(&hook_prompt_message), - ) - .await; - stop_hook_active = true; - continue; - } else { - sess.send_event( - &turn_context, - EventMsg::Warning(WarningEvent { - message: "Stop hook requested continuation without a prompt; ignoring the block.".to_string(), - }), - ) - .await; - } - } - if stop_outcome.should_stop { - break; - } - let hook_outcomes = sess - .hooks() - .dispatch(HookPayload { - session_id: sess.conversation_id, - cwd: turn_context.cwd.clone(), - client: turn_context.app_server_client_name.clone(), - triggered_at: chrono::Utc::now(), - hook_event: HookEvent::AfterAgent { - event: HookEventAfterAgent { - thread_id: sess.conversation_id, - turn_id: turn_context.sub_id.clone(), - input_messages: sampling_request_input_messages, - last_assistant_message: last_agent_message.clone(), - }, - }, - }) - .await; - - let mut abort_message = None; - for hook_outcome in hook_outcomes { - let hook_name = hook_outcome.hook_name; - match hook_outcome.result { - HookResult::Success => {} - HookResult::FailedContinue(error) => { - warn!( - turn_id = %turn_context.sub_id, - hook_name = %hook_name, - error = %error, - "after_agent hook failed; continuing" - ); - } - HookResult::FailedAbort(error) => { - let message = format!( - "after_agent hook '{hook_name}' failed and aborted turn completion: {error}" - ); - warn!( - turn_id = %turn_context.sub_id, - hook_name = %hook_name, - error = %error, - "after_agent hook failed; aborting operation" - ); - if abort_message.is_none() { - abort_message = Some(message); - } - } - } - } - if let Some(message) = abort_message { - sess.send_event( - &turn_context, - EventMsg::Error(ErrorEvent { - message, - codex_error_info: None, - }), - ) - .await; - return None; - } - break; - } - continue; - } - Err(CodexErr::TurnAborted) => { - // Aborted turn is reported via a different event. - break; - } - Err(CodexErr::InvalidImageRequest()) => { - { - let mut state = sess.state.lock().await; - error_or_panic( - "Invalid image detected; sanitizing tool output to prevent poisoning", - ); - if state.history.replace_last_turn_images("Invalid image") { - continue; - } - } - - let event = EventMsg::Error(ErrorEvent { - message: "Invalid image in your last message. Please remove it and try again." - .to_string(), - codex_error_info: Some(CodexErrorInfo::BadRequest), - }); - sess.send_event(&turn_context, event).await; - break; - } - Err(e) => { - info!("Turn error: {e:#}"); - let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None)); - sess.send_event(&turn_context, event).await; - // let the user continue the conversation - break; - } - } - } - - last_agent_message -} - -async fn track_turn_resolved_config_analytics( - sess: &Session, - turn_context: &TurnContext, - input: &[UserInput], -) { - if !sess.enabled(Feature::GeneralAnalytics) { - return; - } - - let thread_config = { - let state = sess.state.lock().await; - state.session_configuration.thread_config_snapshot() - }; - let is_first_turn = { - let mut state = sess.state.lock().await; - state.take_next_turn_is_first() - }; - sess.services - .analytics_events_client - .track_turn_resolved_config(TurnResolvedConfigFact { - turn_id: turn_context.sub_id.clone(), - thread_id: sess.conversation_id.to_string(), - num_input_images: input - .iter() - .filter(|item| { - matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. }) - }) - .count(), - submission_type: None, - ephemeral: thread_config.ephemeral, - session_source: thread_config.session_source, - model: turn_context.model_info.slug.clone(), - model_provider: turn_context.config.model_provider_id.clone(), - sandbox_policy: turn_context.sandbox_policy.get().clone(), - reasoning_effort: turn_context.reasoning_effort, - reasoning_summary: Some(turn_context.reasoning_summary), - service_tier: turn_context.config.service_tier, - approval_policy: turn_context.approval_policy.value(), - approvals_reviewer: turn_context.config.approvals_reviewer, - sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(), - collaboration_mode: turn_context.collaboration_mode.mode, - personality: turn_context.personality, - is_first_turn, - }); -} - -async fn run_pre_sampling_compact( - sess: &Arc, - turn_context: &Arc, -) -> CodexResult { - let total_usage_tokens_before_compaction = sess.get_total_token_usage().await; - let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact( - sess, - turn_context, - total_usage_tokens_before_compaction, - ) - .await?; - let total_usage_tokens = sess.get_total_token_usage().await; - let auto_compact_limit = turn_context - .model_info - .auto_compact_token_limit() - .unwrap_or(i64::MAX); - // Compact if the total usage tokens are greater than the auto compact limit - if total_usage_tokens >= auto_compact_limit { - run_auto_compact( - sess, - turn_context, - InitialContextInjection::DoNotInject, - CompactionReason::ContextLimit, - CompactionPhase::PreTurn, - ) - .await?; - pre_sampling_compacted = true; - } - Ok(pre_sampling_compacted) -} - -/// Runs pre-sampling compaction against the previous model when switching to a smaller -/// context-window model. -/// -/// Returns `Ok(true)` when compaction ran successfully, `Ok(false)` when compaction was skipped -/// because the model/context-window preconditions were not met, and `Err(_)` only when compaction -/// was attempted and failed. -async fn maybe_run_previous_model_inline_compact( - sess: &Arc, - turn_context: &Arc, - total_usage_tokens: i64, -) -> CodexResult { - let Some(previous_turn_settings) = sess.previous_turn_settings().await else { - return Ok(false); - }; - let previous_model_turn_context = Arc::new( - turn_context - .with_model(previous_turn_settings.model, &sess.services.models_manager) - .await, - ); - - let Some(old_context_window) = previous_model_turn_context.model_context_window() else { - return Ok(false); - }; - let Some(new_context_window) = turn_context.model_context_window() else { - return Ok(false); - }; - let new_auto_compact_limit = turn_context - .model_info - .auto_compact_token_limit() - .unwrap_or(i64::MAX); - let should_run = total_usage_tokens > new_auto_compact_limit - && previous_model_turn_context.model_info.slug != turn_context.model_info.slug - && old_context_window > new_context_window; - if should_run { - run_auto_compact( - sess, - &previous_model_turn_context, - InitialContextInjection::DoNotInject, - CompactionReason::ModelDownshift, - CompactionPhase::PreTurn, - ) - .await?; - return Ok(true); - } - Ok(false) -} - -async fn run_auto_compact( - sess: &Arc, - turn_context: &Arc, - initial_context_injection: InitialContextInjection, - reason: CompactionReason, - phase: CompactionPhase, -) -> CodexResult<()> { - if should_use_remote_compact_task(&turn_context.provider) { - run_inline_remote_auto_compact_task( - Arc::clone(sess), - Arc::clone(turn_context), - initial_context_injection, - reason, - phase, - ) - .await?; - } else { - run_inline_auto_compact_task( - Arc::clone(sess), - Arc::clone(turn_context), - initial_context_injection, - reason, - phase, - ) - .await?; - } - Ok(()) -} - -fn collect_explicit_app_ids_from_skill_items( - skill_items: &[ResponseItem], - connectors: &[connectors::AppInfo], - skill_name_counts_lower: &HashMap, -) -> HashSet { - if skill_items.is_empty() || connectors.is_empty() { - return HashSet::new(); - } - - let skill_messages = skill_items - .iter() - .filter_map(|item| match item { - ResponseItem::Message { content, .. } => { - content.iter().find_map(|content_item| match content_item { - ContentItem::InputText { text } => Some(text.clone()), - _ => None, - }) - } - _ => None, - }) - .collect::>(); - if skill_messages.is_empty() { - return HashSet::new(); - } - - let mentions = collect_tool_mentions_from_messages(&skill_messages); - let mention_names_lower = mentions - .plain_names - .iter() - .map(|name| name.to_ascii_lowercase()) - .collect::>(); - let mut connector_ids = mentions - .paths - .iter() - .filter(|path| tool_kind_for_path(path) == ToolMentionKind::App) - .filter_map(|path| app_id_from_path(path).map(str::to_string)) - .collect::>(); - - let connector_slug_counts = build_connector_slug_counts(connectors); - for connector in connectors { - let slug = codex_connectors::metadata::connector_mention_slug(connector); - let connector_count = connector_slug_counts.get(&slug).copied().unwrap_or(0); - let skill_count = skill_name_counts_lower.get(&slug).copied().unwrap_or(0); - if connector_count == 1 && skill_count == 0 && mention_names_lower.contains(&slug) { - connector_ids.insert(connector.id.clone()); - } - } - - connector_ids -} - -fn filter_connectors_for_input( - connectors: &[connectors::AppInfo], - input: &[ResponseItem], - explicitly_enabled_connectors: &HashSet, - skill_name_counts_lower: &HashMap, -) -> Vec { - let connectors: Vec = connectors - .iter() - .filter(|connector| connector.is_enabled) - .cloned() - .collect::>(); - if connectors.is_empty() { - return Vec::new(); - } - - let user_messages = collect_user_messages(input); - if user_messages.is_empty() && explicitly_enabled_connectors.is_empty() { - return Vec::new(); - } - - let mentions = collect_tool_mentions_from_messages(&user_messages); - let mention_names_lower = mentions - .plain_names - .iter() - .map(|name| name.to_ascii_lowercase()) - .collect::>(); - - let connector_slug_counts = build_connector_slug_counts(&connectors); - let mut allowed_connector_ids = explicitly_enabled_connectors.clone(); - for path in mentions - .paths - .iter() - .filter(|path| tool_kind_for_path(path) == ToolMentionKind::App) - { - if let Some(connector_id) = app_id_from_path(path) { - allowed_connector_ids.insert(connector_id.to_string()); - } - } - - connectors - .into_iter() - .filter(|connector| { - connector_inserted_in_messages( - connector, - &mention_names_lower, - &allowed_connector_ids, - &connector_slug_counts, - skill_name_counts_lower, - ) - }) - .collect() -} - -fn connector_inserted_in_messages( - connector: &connectors::AppInfo, - mention_names_lower: &HashSet, - allowed_connector_ids: &HashSet, - connector_slug_counts: &HashMap, - skill_name_counts_lower: &HashMap, -) -> bool { - if allowed_connector_ids.contains(&connector.id) { - return true; - } - - let mention_slug = codex_connectors::metadata::connector_mention_slug(connector); - let connector_count = connector_slug_counts - .get(&mention_slug) - .copied() - .unwrap_or(0); - let skill_count = skill_name_counts_lower - .get(&mention_slug) - .copied() - .unwrap_or(0); - connector_count == 1 && skill_count == 0 && mention_names_lower.contains(&mention_slug) -} - -pub(crate) fn build_prompt( - input: Vec, - router: &ToolRouter, - turn_context: &TurnContext, - base_instructions: BaseInstructions, -) -> Prompt { - let deferred_dynamic_tools = turn_context - .dynamic_tools - .iter() - .filter(|tool| tool.defer_loading) - .map(|tool| tool.name.as_str()) - .collect::>(); - let tools = if deferred_dynamic_tools.is_empty() { - router.model_visible_specs() - } else { - router - .model_visible_specs() - .into_iter() - .filter(|spec| !deferred_dynamic_tools.contains(spec.name())) - .collect() - }; - - Prompt { - input, - tools, - parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls, - base_instructions, - personality: turn_context.personality, - output_schema: turn_context.final_output_json_schema.clone(), - } -} - -#[allow(clippy::too_many_arguments)] -#[instrument(level = "trace", - skip_all, - fields( - turn_id = %turn_context.sub_id, - model = %turn_context.model_info.slug, - cwd = %turn_context.cwd.display() - ) -)] -async fn run_sampling_request( - sess: Arc, - turn_context: Arc, - turn_diff_tracker: SharedTurnDiffTracker, - client_session: &mut ModelClientSession, - turn_metadata_header: Option<&str>, - input: Vec, - explicitly_enabled_connectors: &HashSet, - skills_outcome: Option<&SkillLoadOutcome>, - server_model_warning_emitted_for_turn: &mut bool, - cancellation_token: CancellationToken, -) -> CodexResult { - let router = built_tools( - sess.as_ref(), - turn_context.as_ref(), - &input, - explicitly_enabled_connectors, - skills_outcome, - &cancellation_token, - ) - .await?; - - let base_instructions = sess.get_base_instructions().await; - - let tool_runtime = ToolCallRuntime::new( - Arc::clone(&router), - Arc::clone(&sess), - Arc::clone(&turn_context), - Arc::clone(&turn_diff_tracker), - ); - let _code_mode_worker = sess - .services - .code_mode_service - .start_turn_worker( - &sess, - &turn_context, - Arc::clone(&router), - Arc::clone(&turn_diff_tracker), - ) - .await; - let mut retries = 0; - let mut initial_input = Some(input); - loop { - let prompt_input = if let Some(input) = initial_input.take() { - input - } else { - sess.clone_history() - .await - .for_prompt(&turn_context.model_info.input_modalities) - }; - let prompt = build_prompt( - prompt_input, - router.as_ref(), - turn_context.as_ref(), - base_instructions.clone(), - ); - let err = match try_run_sampling_request( - tool_runtime.clone(), - Arc::clone(&sess), - Arc::clone(&turn_context), - client_session, - turn_metadata_header, - Arc::clone(&turn_diff_tracker), - server_model_warning_emitted_for_turn, - &prompt, - cancellation_token.child_token(), - ) - .await - { - Ok(output) => { - return Ok(output); - } - Err(CodexErr::ContextWindowExceeded) => { - sess.set_total_tokens_full(&turn_context).await; - return Err(CodexErr::ContextWindowExceeded); - } - Err(CodexErr::UsageLimitReached(e)) => { - let rate_limits = e.rate_limits.clone(); - if let Some(rate_limits) = rate_limits { - sess.update_rate_limits(&turn_context, *rate_limits).await; - } - return Err(CodexErr::UsageLimitReached(e)); - } - Err(err) => err, - }; - - if !err.is_retryable() { - return Err(err); - } - - // Use the configured provider-specific stream retry budget. - let max_retries = turn_context.provider.stream_max_retries(); - if retries >= max_retries - && client_session.try_switch_fallback_transport( - &turn_context.session_telemetry, - &turn_context.model_info, - ) - { - sess.send_event( - &turn_context, - EventMsg::Warning(WarningEvent { - message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"), - }), - ) - .await; - retries = 0; - continue; - } - if retries < max_retries { - retries += 1; - let delay = match &err { - CodexErr::Stream(_, requested_delay) => { - requested_delay.unwrap_or_else(|| backoff(retries)) - } - _ => backoff(retries), - }; - warn!( - "stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...", - ); - - // In release builds, hide the first websocket retry notification to reduce noisy - // transient reconnect messages. In debug builds, keep full visibility for diagnosis. - let report_error = retries > 1 - || cfg!(debug_assertions) - || !sess.services.model_client.responses_websocket_enabled(); - if report_error { - // Surface retry information to any UI/front‑end so the - // user understands what is happening instead of staring - // at a seemingly frozen screen. - sess.notify_stream_error( - &turn_context, - format!("Reconnecting... {retries}/{max_retries}"), - err, - ) - .await; - } - tokio::time::sleep(delay).await; - } else { - return Err(err); - } - } -} - -pub(crate) async fn built_tools( - sess: &Session, - turn_context: &TurnContext, - input: &[ResponseItem], - explicitly_enabled_connectors: &HashSet, - skills_outcome: Option<&SkillLoadOutcome>, - cancellation_token: &CancellationToken, -) -> CodexResult> { - let mcp_connection_manager = sess.services.mcp_connection_manager.read().await; - let has_mcp_servers = mcp_connection_manager.has_servers(); - let all_mcp_tools = mcp_connection_manager - .list_all_tools() - .or_cancel(cancellation_token) - .await?; - drop(mcp_connection_manager); - let loaded_plugins = sess - .services - .plugins_manager - .plugins_for_config(&turn_context.config) - .await; - - let mut effective_explicitly_enabled_connectors = explicitly_enabled_connectors.clone(); - effective_explicitly_enabled_connectors.extend(sess.get_connector_selection().await); - - let apps_enabled = turn_context.apps_enabled(); - let accessible_connectors = - apps_enabled.then(|| connectors::accessible_connectors_from_mcp_tools(&all_mcp_tools)); - let accessible_connectors_with_enabled_state = - accessible_connectors.as_ref().map(|connectors| { - connectors::with_app_enabled_state(connectors.clone(), &turn_context.config) - }); - let connectors = if apps_enabled { - let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible( - loaded_plugins - .effective_apps() - .into_iter() - .map(|connector_id| connector_id.0), - accessible_connectors.clone().unwrap_or_default(), - ); - Some(connectors::with_app_enabled_state( - connectors, - &turn_context.config, - )) - } else { - None - }; - let auth = sess.services.auth_manager.auth().await; - let discoverable_tools = if apps_enabled && turn_context.tools_config.tool_suggest { - if let Some(accessible_connectors) = accessible_connectors_with_enabled_state.as_ref() { - match connectors::list_tool_suggest_discoverable_tools_with_auth( - &turn_context.config, - auth.as_ref(), - accessible_connectors.as_slice(), - ) - .await - .map(|discoverable_tools| { - filter_tool_suggest_discoverable_tools_for_client( - discoverable_tools, - turn_context.app_server_client_name.as_deref(), - ) - }) { - Ok(discoverable_tools) if discoverable_tools.is_empty() => None, - Ok(discoverable_tools) => Some(discoverable_tools), - Err(err) => { - warn!("failed to load discoverable tool suggestions: {err:#}"); - None - } - } - } else { - None - } - } else { - None - }; - - let explicitly_enabled = if let Some(connectors) = connectors.as_ref() { - let skill_name_counts_lower = skills_outcome.map_or_else(HashMap::new, |outcome| { - build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1 - }); - - filter_connectors_for_input( - connectors, - input, - &effective_explicitly_enabled_connectors, - &skill_name_counts_lower, - ) - } else { - Vec::new() - }; - let mcp_tool_exposure = build_mcp_tool_exposure( - &all_mcp_tools, - connectors.as_deref(), - explicitly_enabled.as_slice(), - &turn_context.config, - &turn_context.tools_config, - ); - let mcp_tools = has_mcp_servers.then_some(mcp_tool_exposure.direct_tools); - let deferred_mcp_tools = mcp_tool_exposure.deferred_tools; - let unavailable_called_tools = if turn_context - .config - .features - .enabled(Feature::UnavailableDummyTools) - { - let exposed_tool_names = mcp_tools - .iter() - .chain(deferred_mcp_tools.iter()) - .flat_map(|tools| tools.keys().map(String::as_str)) - .collect::>(); - collect_unavailable_called_tools(input, &exposed_tool_names) - } else { - Vec::new() - }; - - let parallel_mcp_server_names = turn_context - .config - .mcp_servers - .get() - .iter() - .filter_map(|(server_name, server_config)| { - server_config - .supports_parallel_tool_calls - .then_some(server_name.clone()) - }) - .collect::>(); - - Ok(Arc::new(ToolRouter::from_config( - &turn_context.tools_config, - ToolRouterParams { - mcp_tools, - deferred_mcp_tools, - unavailable_called_tools, - parallel_mcp_server_names, - discoverable_tools, - dynamic_tools: turn_context.dynamic_tools.as_slice(), - }, - ))) -} - -#[derive(Debug)] -struct SamplingRequestResult { - needs_follow_up: bool, - last_agent_message: Option, -} - -/// Ephemeral per-response state for streaming a single proposed plan. -/// This is intentionally not persisted or stored in session/state since it -/// only exists while a response is actively streaming. The final plan text -/// is extracted from the completed assistant message. -/// Tracks a single proposed plan item across a streaming response. -struct ProposedPlanItemState { - item_id: String, - started: bool, - completed: bool, -} - -/// Aggregated state used only while streaming a plan-mode response. -/// Includes per-item parsers, deferred agent message bookkeeping, and the plan item lifecycle. -struct PlanModeStreamState { - /// Agent message items started by the model but deferred until we see non-plan text. - pending_agent_message_items: HashMap, - /// Agent message items whose start notification has been emitted. - started_agent_message_items: HashSet, - /// Leading whitespace buffered until we see non-whitespace text for an item. - leading_whitespace_by_item: HashMap, - /// Tracks plan item lifecycle while streaming plan output. - plan_item_state: ProposedPlanItemState, -} - -impl PlanModeStreamState { - fn new(turn_id: &str) -> Self { - Self { - pending_agent_message_items: HashMap::new(), - started_agent_message_items: HashSet::new(), - leading_whitespace_by_item: HashMap::new(), - plan_item_state: ProposedPlanItemState::new(turn_id), - } - } -} - -#[derive(Debug, Default)] -struct AssistantMessageStreamParsers { - plan_mode: bool, - parsers_by_item: HashMap, -} - -type ParsedAssistantTextDelta = AssistantTextChunk; - -impl AssistantMessageStreamParsers { - fn new(plan_mode: bool) -> Self { - Self { - plan_mode, - parsers_by_item: HashMap::new(), - } - } - - fn parser_mut(&mut self, item_id: &str) -> &mut AssistantTextStreamParser { - let plan_mode = self.plan_mode; - self.parsers_by_item - .entry(item_id.to_string()) - .or_insert_with(|| AssistantTextStreamParser::new(plan_mode)) - } - - fn seed_item_text(&mut self, item_id: &str, text: &str) -> ParsedAssistantTextDelta { - if text.is_empty() { - return ParsedAssistantTextDelta::default(); - } - self.parser_mut(item_id).push_str(text) - } - - fn parse_delta(&mut self, item_id: &str, delta: &str) -> ParsedAssistantTextDelta { - self.parser_mut(item_id).push_str(delta) - } - - fn finish_item(&mut self, item_id: &str) -> ParsedAssistantTextDelta { - let Some(mut parser) = self.parsers_by_item.remove(item_id) else { - return ParsedAssistantTextDelta::default(); - }; - parser.finish() - } - - fn drain_finished(&mut self) -> Vec<(String, ParsedAssistantTextDelta)> { - let parsers_by_item = std::mem::take(&mut self.parsers_by_item); - parsers_by_item - .into_iter() - .map(|(item_id, mut parser)| (item_id, parser.finish())) - .collect() - } -} - -impl ProposedPlanItemState { - fn new(turn_id: &str) -> Self { - Self { - item_id: format!("{turn_id}-plan"), - started: false, - completed: false, - } - } - - async fn start(&mut self, sess: &Session, turn_context: &TurnContext) { - if self.started || self.completed { - return; - } - self.started = true; - let item = TurnItem::Plan(PlanItem { - id: self.item_id.clone(), - text: String::new(), - }); - sess.emit_turn_item_started(turn_context, &item).await; - } - - async fn push_delta(&mut self, sess: &Session, turn_context: &TurnContext, delta: &str) { - if self.completed { - return; - } - if delta.is_empty() { - return; - } - let event = PlanDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: self.item_id.clone(), - delta: delta.to_string(), - }; - sess.send_event(turn_context, EventMsg::PlanDelta(event)) - .await; - } - - async fn complete_with_text( - &mut self, - sess: &Session, - turn_context: &TurnContext, - text: String, - ) { - if self.completed || !self.started { - return; - } - self.completed = true; - let item = TurnItem::Plan(PlanItem { - id: self.item_id.clone(), - text, - }); - sess.emit_turn_item_completed(turn_context, item).await; - } -} - -/// In plan mode we defer agent message starts until the parser emits non-plan -/// text. The parser buffers each line until it can rule out a tag prefix, so -/// plan-only outputs never show up as empty assistant messages. -async fn maybe_emit_pending_agent_message_start( - sess: &Session, - turn_context: &TurnContext, - state: &mut PlanModeStreamState, - item_id: &str, -) { - if state.started_agent_message_items.contains(item_id) { - return; - } - if let Some(item) = state.pending_agent_message_items.remove(item_id) { - sess.emit_turn_item_started(turn_context, &item).await; - state - .started_agent_message_items - .insert(item_id.to_string()); - } -} - -/// Agent messages are text-only today; concatenate all text entries. -fn agent_message_text(item: &codex_protocol::items::AgentMessageItem) -> String { - item.content - .iter() - .map(|entry| match entry { - codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), - }) - .collect() -} - -fn realtime_text_for_event(msg: &EventMsg) -> Option { - match msg { - EventMsg::AgentMessage(event) => Some(event.message.clone()), - EventMsg::ItemCompleted(event) => match &event.item { - TurnItem::AgentMessage(item) => Some(agent_message_text(item)), - _ => None, - }, - EventMsg::Error(_) - | EventMsg::Warning(_) - | EventMsg::RealtimeConversationStarted(_) - | EventMsg::RealtimeConversationSdp(_) - | EventMsg::RealtimeConversationRealtime(_) - | EventMsg::RealtimeConversationClosed(_) - | EventMsg::ModelReroute(_) - | EventMsg::ContextCompacted(_) - | EventMsg::ThreadRolledBack(_) - | EventMsg::TurnStarted(_) - | EventMsg::TurnComplete(_) - | EventMsg::TokenCount(_) - | EventMsg::UserMessage(_) - | EventMsg::AgentMessageDelta(_) - | EventMsg::AgentReasoning(_) - | EventMsg::AgentReasoningDelta(_) - | EventMsg::AgentReasoningRawContent(_) - | EventMsg::AgentReasoningRawContentDelta(_) - | EventMsg::AgentReasoningSectionBreak(_) - | EventMsg::SessionConfigured(_) - | EventMsg::ThreadNameUpdated(_) - | EventMsg::McpStartupUpdate(_) - | EventMsg::McpStartupComplete(_) - | EventMsg::McpToolCallBegin(_) - | EventMsg::McpToolCallEnd(_) - | EventMsg::WebSearchBegin(_) - | EventMsg::WebSearchEnd(_) - | EventMsg::ExecCommandBegin(_) - | EventMsg::ExecCommandOutputDelta(_) - | EventMsg::TerminalInteraction(_) - | EventMsg::ExecCommandEnd(_) - | EventMsg::PatchApplyBegin(_) - | EventMsg::PatchApplyEnd(_) - | EventMsg::ViewImageToolCall(_) - | EventMsg::ImageGenerationBegin(_) - | EventMsg::ImageGenerationEnd(_) - | EventMsg::ExecApprovalRequest(_) - | EventMsg::RequestPermissions(_) - | EventMsg::RequestUserInput(_) - | EventMsg::DynamicToolCallRequest(_) - | EventMsg::DynamicToolCallResponse(_) - | EventMsg::GuardianAssessment(_) - | EventMsg::ElicitationRequest(_) - | EventMsg::ApplyPatchApprovalRequest(_) - | EventMsg::DeprecationNotice(_) - | EventMsg::BackgroundEvent(_) - | EventMsg::UndoStarted(_) - | EventMsg::UndoCompleted(_) - | EventMsg::StreamError(_) - | EventMsg::TurnDiff(_) - | EventMsg::GetHistoryEntryResponse(_) - | EventMsg::McpListToolsResponse(_) - | EventMsg::ListSkillsResponse(_) - | EventMsg::RealtimeConversationListVoicesResponse(_) - | EventMsg::SkillsUpdateAvailable - | EventMsg::PlanUpdate(_) - | EventMsg::TurnAborted(_) - | EventMsg::ShutdownComplete - | EventMsg::EnteredReviewMode(_) - | EventMsg::ExitedReviewMode(_) - | EventMsg::RawResponseItem(_) - | EventMsg::ItemStarted(_) - | EventMsg::HookStarted(_) - | EventMsg::HookCompleted(_) - | EventMsg::AgentMessageContentDelta(_) - | EventMsg::PlanDelta(_) - | EventMsg::ReasoningContentDelta(_) - | EventMsg::ReasoningRawContentDelta(_) - | EventMsg::CollabAgentSpawnBegin(_) - | EventMsg::CollabAgentSpawnEnd(_) - | EventMsg::CollabAgentInteractionBegin(_) - | EventMsg::CollabAgentInteractionEnd(_) - | EventMsg::CollabWaitingBegin(_) - | EventMsg::CollabWaitingEnd(_) - | EventMsg::CollabCloseBegin(_) - | EventMsg::CollabCloseEnd(_) - | EventMsg::CollabResumeBegin(_) - | EventMsg::CollabResumeEnd(_) => None, - } -} - -/// Split the stream into normal assistant text vs. proposed plan content. -/// Normal text becomes AgentMessage deltas; plan content becomes PlanDelta + -/// TurnItem::Plan. -async fn handle_plan_segments( - sess: &Session, - turn_context: &TurnContext, - state: &mut PlanModeStreamState, - item_id: &str, - segments: Vec, -) { - for segment in segments { - match segment { - ProposedPlanSegment::Normal(delta) => { - if delta.is_empty() { - continue; - } - let has_non_whitespace = delta.chars().any(|ch| !ch.is_whitespace()); - if !has_non_whitespace && !state.started_agent_message_items.contains(item_id) { - let entry = state - .leading_whitespace_by_item - .entry(item_id.to_string()) - .or_default(); - entry.push_str(&delta); - continue; - } - let delta = if !state.started_agent_message_items.contains(item_id) { - if let Some(prefix) = state.leading_whitespace_by_item.remove(item_id) { - format!("{prefix}{delta}") - } else { - delta - } - } else { - delta - }; - maybe_emit_pending_agent_message_start(sess, turn_context, state, item_id).await; - - let event = AgentMessageContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: item_id.to_string(), - delta, - }; - sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event)) - .await; - } - ProposedPlanSegment::ProposedPlanStart => { - if !state.plan_item_state.completed { - state.plan_item_state.start(sess, turn_context).await; - } - } - ProposedPlanSegment::ProposedPlanDelta(delta) => { - if !state.plan_item_state.completed { - if !state.plan_item_state.started { - state.plan_item_state.start(sess, turn_context).await; - } - state - .plan_item_state - .push_delta(sess, turn_context, &delta) - .await; - } - } - ProposedPlanSegment::ProposedPlanEnd => {} - } - } -} - -async fn emit_streamed_assistant_text_delta( - sess: &Session, - turn_context: &TurnContext, - plan_mode_state: Option<&mut PlanModeStreamState>, - item_id: &str, - parsed: ParsedAssistantTextDelta, -) { - if parsed.is_empty() { - return; - } - if !parsed.citations.is_empty() { - // Citation extraction is intentionally local for now; we strip citations from display text - // but do not yet surface them in protocol events. - let _citations = parsed.citations; - } - if let Some(state) = plan_mode_state { - if !parsed.plan_segments.is_empty() { - handle_plan_segments(sess, turn_context, state, item_id, parsed.plan_segments).await; - } - return; - } - if parsed.visible_text.is_empty() { - return; - } - let event = AgentMessageContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: item_id.to_string(), - delta: parsed.visible_text, - }; - sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event)) - .await; -} - -/// Flush buffered assistant text parser state when an assistant message item ends. -async fn flush_assistant_text_segments_for_item( - sess: &Session, - turn_context: &TurnContext, - plan_mode_state: Option<&mut PlanModeStreamState>, - parsers: &mut AssistantMessageStreamParsers, - item_id: &str, -) { - let parsed = parsers.finish_item(item_id); - emit_streamed_assistant_text_delta(sess, turn_context, plan_mode_state, item_id, parsed).await; -} - -/// Flush any remaining buffered assistant text parser state at response completion. -async fn flush_assistant_text_segments_all( - sess: &Session, - turn_context: &TurnContext, - mut plan_mode_state: Option<&mut PlanModeStreamState>, - parsers: &mut AssistantMessageStreamParsers, -) { - for (item_id, parsed) in parsers.drain_finished() { - emit_streamed_assistant_text_delta( - sess, - turn_context, - plan_mode_state.as_deref_mut(), - &item_id, - parsed, - ) - .await; - } -} - -/// Emit completion for plan items by parsing the finalized assistant message. -async fn maybe_complete_plan_item_from_message( - sess: &Session, - turn_context: &TurnContext, - state: &mut PlanModeStreamState, - item: &ResponseItem, -) { - if let ResponseItem::Message { role, content, .. } = item - && role == "assistant" - { - let mut text = String::new(); - for entry in content { - if let ContentItem::OutputText { text: chunk } = entry { - text.push_str(chunk); - } - } - if let Some(plan_text) = extract_proposed_plan_text(&text) { - let (plan_text, _citations) = strip_citations(&plan_text); - if !state.plan_item_state.started { - state.plan_item_state.start(sess, turn_context).await; - } - state - .plan_item_state - .complete_with_text(sess, turn_context, plan_text) - .await; - } - } -} - -/// Emit a completed agent message in plan mode, respecting deferred starts. -async fn emit_agent_message_in_plan_mode( - sess: &Session, - turn_context: &TurnContext, - agent_message: codex_protocol::items::AgentMessageItem, - state: &mut PlanModeStreamState, -) { - let agent_message_id = agent_message.id.clone(); - let text = agent_message_text(&agent_message); - if text.trim().is_empty() { - state.pending_agent_message_items.remove(&agent_message_id); - state.started_agent_message_items.remove(&agent_message_id); - return; - } - - maybe_emit_pending_agent_message_start(sess, turn_context, state, &agent_message_id).await; - - if !state - .started_agent_message_items - .contains(&agent_message_id) - { - let start_item = state - .pending_agent_message_items - .remove(&agent_message_id) - .unwrap_or_else(|| { - TurnItem::AgentMessage(codex_protocol::items::AgentMessageItem { - id: agent_message_id.clone(), - content: Vec::new(), - phase: None, - memory_citation: None, - }) - }); - sess.emit_turn_item_started(turn_context, &start_item).await; - state - .started_agent_message_items - .insert(agent_message_id.clone()); - } - - sess.emit_turn_item_completed(turn_context, TurnItem::AgentMessage(agent_message)) - .await; - state.started_agent_message_items.remove(&agent_message_id); -} - -/// Emit completion for a plan-mode turn item, handling agent messages specially. -async fn emit_turn_item_in_plan_mode( - sess: &Session, - turn_context: &TurnContext, - turn_item: TurnItem, - previously_active_item: Option<&TurnItem>, - state: &mut PlanModeStreamState, -) { - match turn_item { - TurnItem::AgentMessage(agent_message) => { - emit_agent_message_in_plan_mode(sess, turn_context, agent_message, state).await; - } - _ => { - if previously_active_item.is_none() { - sess.emit_turn_item_started(turn_context, &turn_item).await; - } - sess.emit_turn_item_completed(turn_context, turn_item).await; - } - } -} - -/// Handle a completed assistant response item in plan mode, returning true if handled. -async fn handle_assistant_item_done_in_plan_mode( - sess: &Session, - turn_context: &TurnContext, - item: &ResponseItem, - state: &mut PlanModeStreamState, - previously_active_item: Option<&TurnItem>, - last_agent_message: &mut Option, -) -> bool { - if let ResponseItem::Message { role, .. } = item - && role == "assistant" - { - maybe_complete_plan_item_from_message(sess, turn_context, state, item).await; - - if let Some(turn_item) = - handle_non_tool_response_item(sess, turn_context, item, /*plan_mode*/ true).await - { - emit_turn_item_in_plan_mode( - sess, - turn_context, - turn_item, - previously_active_item, - state, - ) - .await; - } - - record_completed_response_item(sess, turn_context, item).await; - if let Some(agent_message) = last_assistant_message_from_item(item, /*plan_mode*/ true) { - *last_agent_message = Some(agent_message); - } - return true; - } - false -} - -async fn drain_in_flight( - in_flight: &mut FuturesOrdered>>, - sess: Arc, - turn_context: Arc, -) -> CodexResult<()> { - while let Some(res) = in_flight.next().await { - match res { - Ok(response_input) => { - let response_item = response_input.into(); - sess.record_conversation_items(&turn_context, std::slice::from_ref(&response_item)) - .await; - mark_thread_memory_mode_polluted_if_external_context( - sess.as_ref(), - turn_context.as_ref(), - &response_item, - ) - .await; - } - Err(err) => { - error_or_panic(format!("in-flight tool future failed during drain: {err}")); - } - } - } - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -#[instrument(level = "trace", - skip_all, - fields( - turn_id = %turn_context.sub_id, - model = %turn_context.model_info.slug - ) -)] -async fn try_run_sampling_request( - tool_runtime: ToolCallRuntime, - sess: Arc, - turn_context: Arc, - client_session: &mut ModelClientSession, - turn_metadata_header: Option<&str>, - turn_diff_tracker: SharedTurnDiffTracker, - server_model_warning_emitted_for_turn: &mut bool, - prompt: &Prompt, - cancellation_token: CancellationToken, -) -> CodexResult { - feedback_tags!( - model = turn_context.model_info.slug.clone(), - approval_policy = turn_context.approval_policy.value(), - sandbox_policy = turn_context.sandbox_policy.get(), - effort = turn_context.reasoning_effort, - auth_mode = sess.services.auth_manager.auth_mode(), - features = sess.features.enabled_features(), - ); - let mut stream = client_session - .stream( - prompt, - &turn_context.model_info, - &turn_context.session_telemetry, - turn_context.reasoning_effort, - turn_context.reasoning_summary, - turn_context.config.service_tier, - turn_metadata_header, - ) - .instrument(trace_span!("stream_request")) - .or_cancel(&cancellation_token) - .await??; - let mut in_flight: FuturesOrdered>> = - FuturesOrdered::new(); - let mut needs_follow_up = false; - let mut last_agent_message: Option = None; - let mut active_item: Option = None; - let mut should_emit_turn_diff = false; - let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan; - let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode); - let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id)); - let receiving_span = trace_span!("receiving_stream"); - let outcome: CodexResult = loop { - let handle_responses = trace_span!( - parent: &receiving_span, - "handle_responses", - otel.name = field::Empty, - tool_name = field::Empty, - from = field::Empty, - ); - - let event = match stream - .next() - .instrument(trace_span!(parent: &handle_responses, "receiving")) - .or_cancel(&cancellation_token) - .await - { - Ok(event) => event, - Err(codex_async_utils::CancelErr::Cancelled) => break Err(CodexErr::TurnAborted), - }; - - let event = match event { - Some(Ok(event)) => event, - Some(Err(err)) => break Err(err), - None => { - break Err(CodexErr::Stream( - "stream closed before response.completed".into(), - None, - )); - } - }; - - sess.services - .session_telemetry - .record_responses(&handle_responses, &event); - record_turn_ttft_metric(&turn_context, &event).await; - - match event { - ResponseEvent::Created => {} - ResponseEvent::OutputItemDone(item) => { - let previously_active_item = active_item.take(); - if let Some(previous) = previously_active_item.as_ref() - && matches!(previous, TurnItem::AgentMessage(_)) - { - let item_id = previous.id(); - flush_assistant_text_segments_for_item( - &sess, - &turn_context, - plan_mode_state.as_mut(), - &mut assistant_message_stream_parsers, - &item_id, - ) - .await; - } - if let Some(state) = plan_mode_state.as_mut() - && handle_assistant_item_done_in_plan_mode( - &sess, - &turn_context, - &item, - state, - previously_active_item.as_ref(), - &mut last_agent_message, - ) - .await - { - continue; - } - - let mut ctx = HandleOutputCtx { - sess: sess.clone(), - turn_context: turn_context.clone(), - tool_runtime: tool_runtime.clone(), - cancellation_token: cancellation_token.child_token(), - }; - - let output_result = - match handle_output_item_done(&mut ctx, item, previously_active_item) - .instrument(handle_responses) - .await - { - Ok(output_result) => output_result, - Err(err) => break Err(err), - }; - if let Some(tool_future) = output_result.tool_future { - in_flight.push_back(tool_future); - } - if let Some(agent_message) = output_result.last_agent_message { - last_agent_message = Some(agent_message); - } - needs_follow_up |= output_result.needs_follow_up; - } - ResponseEvent::OutputItemAdded(item) => { - if let Some(turn_item) = handle_non_tool_response_item( - sess.as_ref(), - turn_context.as_ref(), - &item, - plan_mode, - ) - .await - { - let mut turn_item = turn_item; - let mut seeded_parsed: Option = None; - let mut seeded_item_id: Option = None; - if matches!(turn_item, TurnItem::AgentMessage(_)) - && let Some(raw_text) = raw_assistant_output_text_from_item(&item) - { - let item_id = turn_item.id(); - let mut seeded = - assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text); - if let TurnItem::AgentMessage(agent_message) = &mut turn_item { - agent_message.content = - vec![codex_protocol::items::AgentMessageContent::Text { - text: if plan_mode { - String::new() - } else { - std::mem::take(&mut seeded.visible_text) - }, - }]; - } - seeded_parsed = plan_mode.then_some(seeded); - seeded_item_id = Some(item_id); - } - if let Some(state) = plan_mode_state.as_mut() - && matches!(turn_item, TurnItem::AgentMessage(_)) - { - let item_id = turn_item.id(); - state - .pending_agent_message_items - .insert(item_id, turn_item.clone()); - } else { - sess.emit_turn_item_started(&turn_context, &turn_item).await; - } - if let (Some(state), Some(item_id), Some(parsed)) = ( - plan_mode_state.as_mut(), - seeded_item_id.as_deref(), - seeded_parsed, - ) { - emit_streamed_assistant_text_delta( - &sess, - &turn_context, - Some(state), - item_id, - parsed, - ) - .await; - } - active_item = Some(turn_item); - } - } - ResponseEvent::ServerModel(server_model) => { - if !*server_model_warning_emitted_for_turn - && sess - .maybe_warn_on_server_model_mismatch(&turn_context, server_model) - .await - { - *server_model_warning_emitted_for_turn = true; - } - } - ResponseEvent::ServerReasoningIncluded(included) => { - sess.set_server_reasoning_included(included).await; - } - ResponseEvent::RateLimits(snapshot) => { - // Update internal state with latest rate limits, but defer sending until - // token usage is available to avoid duplicate TokenCount events. - sess.update_rate_limits(&turn_context, snapshot).await; - } - ResponseEvent::ModelsEtag(etag) => { - // Update internal state with latest models etag - sess.services.models_manager.refresh_if_new_etag(etag).await; - } - ResponseEvent::Completed { - response_id: _, - token_usage, - } => { - flush_assistant_text_segments_all( - &sess, - &turn_context, - plan_mode_state.as_mut(), - &mut assistant_message_stream_parsers, - ) - .await; - sess.update_token_usage_info(&turn_context, token_usage.as_ref()) - .await; - should_emit_turn_diff = true; - - break Ok(SamplingRequestResult { - needs_follow_up, - last_agent_message, - }); - } - ResponseEvent::OutputTextDelta(delta) => { - // In review child threads, suppress assistant text deltas; the - // UI will show a selection popup from the final ReviewOutput. - if let Some(active) = active_item.as_ref() { - let item_id = active.id(); - if matches!(active, TurnItem::AgentMessage(_)) { - let parsed = assistant_message_stream_parsers.parse_delta(&item_id, &delta); - emit_streamed_assistant_text_delta( - &sess, - &turn_context, - plan_mode_state.as_mut(), - &item_id, - parsed, - ) - .await; - } else { - let event = AgentMessageContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id, - delta, - }; - sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) - .await; - } - } else { - error_or_panic("OutputTextDelta without active item".to_string()); - } - } - ResponseEvent::ReasoningSummaryDelta { - delta, - summary_index, - } => { - if let Some(active) = active_item.as_ref() { - let event = ReasoningContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: active.id(), - delta, - summary_index, - }; - sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) - .await; - } else { - error_or_panic("ReasoningSummaryDelta without active item".to_string()); - } - } - ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { - if let Some(active) = active_item.as_ref() { - let event = - EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { - item_id: active.id(), - summary_index, - }); - sess.send_event(&turn_context, event).await; - } else { - error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); - } - } - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - if let Some(active) = active_item.as_ref() { - let event = ReasoningRawContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: active.id(), - delta, - content_index, - }; - sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) - .await; - } else { - error_or_panic("ReasoningRawContentDelta without active item".to_string()); - } - } - } - }; - - flush_assistant_text_segments_all( - &sess, - &turn_context, - plan_mode_state.as_mut(), - &mut assistant_message_stream_parsers, - ) - .await; - - drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?; - - if cancellation_token.is_cancelled() { - return Err(CodexErr::TurnAborted); - } - - if should_emit_turn_diff { - let unified_diff = { - let mut tracker = turn_diff_tracker.lock().await; - tracker.get_unified_diff() - }; - if let Ok(Some(unified_diff)) = unified_diff { - let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff }); - sess.clone().send_event(&turn_context, msg).await; - } - } - - outcome -} - -pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { - for item in responses.iter().rev() { - if let Some(message) = last_assistant_message_from_item(item, /*plan_mode*/ false) { - return Some(message); - } - } - None -} - use crate::memories::prompts::build_memory_tool_developer_instructions; #[cfg(test)] pub(crate) use tests::make_session_and_context; diff --git a/codex-rs/core/src/codex/turn.rs b/codex-rs/core/src/codex/turn.rs new file mode 100644 index 0000000000..fa24bd6924 --- /dev/null +++ b/codex-rs/core/src/codex/turn.rs @@ -0,0 +1,2169 @@ +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + +use crate::SkillInjections; +use crate::SkillLoadOutcome; +use crate::build_skill_injections; +use crate::client::ModelClientSession; +use crate::client_common::Prompt; +use crate::client_common::ResponseEvent; +use crate::codex::PreviousTurnSettings; +use crate::codex::Session; +use crate::codex::TurnContext; +use crate::collect_env_var_dependencies; +use crate::collect_explicit_skill_mentions; +use crate::compact::InitialContextInjection; +use crate::compact::collect_user_messages; +use crate::compact::run_inline_auto_compact_task; +use crate::compact::should_use_remote_compact_task; +use crate::compact_remote::run_inline_remote_auto_compact_task; +use crate::connectors; +use crate::feedback_tags; +use crate::hook_runtime::PendingInputHookDisposition; +use crate::hook_runtime::emit_hook_completed_events; +use crate::hook_runtime::inspect_pending_input; +use crate::hook_runtime::record_additional_contexts; +use crate::hook_runtime::record_pending_input; +use crate::hook_runtime::run_pending_session_start_hooks; +use crate::hook_runtime::run_user_prompt_submit_hooks; +use crate::injection::ToolMentionKind; +use crate::injection::app_id_from_path; +use crate::injection::tool_kind_for_path; +use crate::mcp_skill_dependencies::maybe_prompt_and_install_mcp_dependencies; +use crate::mcp_tool_exposure::build_mcp_tool_exposure; +use crate::mentions::build_connector_slug_counts; +use crate::mentions::build_skill_name_counts; +use crate::mentions::collect_explicit_app_ids; +use crate::mentions::collect_explicit_plugin_mentions; +use crate::mentions::collect_tool_mentions_from_messages; +use crate::parse_turn_item; +use crate::plugins::build_plugin_injections; +use crate::resolve_skill_dependencies_for_turn; +use crate::stream_events_utils::HandleOutputCtx; +use crate::stream_events_utils::handle_non_tool_response_item; +use crate::stream_events_utils::handle_output_item_done; +use crate::stream_events_utils::last_assistant_message_from_item; +use crate::stream_events_utils::mark_thread_memory_mode_polluted_if_external_context; +use crate::stream_events_utils::raw_assistant_output_text_from_item; +use crate::stream_events_utils::record_completed_response_item; +use crate::tools::ToolRouter; +use crate::tools::context::SharedTurnDiffTracker; +use crate::tools::parallel::ToolCallRuntime; +use crate::tools::router::ToolRouterParams; +use crate::turn_diff_tracker::TurnDiffTracker; +use crate::turn_timing::record_turn_ttft_metric; +use crate::unavailable_tool::collect_unavailable_called_tools; +use crate::util::backoff; +use crate::util::error_or_panic; +use codex_analytics::AppInvocation; +use codex_analytics::CompactionPhase; +use codex_analytics::CompactionReason; +use codex_analytics::InvocationType; +use codex_analytics::TurnResolvedConfigFact; +use codex_analytics::build_track_events_context; +use codex_async_utils::OrCancelExt; +use codex_features::Feature; +use codex_hooks::HookEvent; +use codex_hooks::HookEventAfterAgent; +use codex_hooks::HookPayload; +use codex_hooks::HookResult; +use codex_protocol::config_types::ModeKind; +use codex_protocol::error::CodexErr; +use codex_protocol::error::Result as CodexResult; +use codex_protocol::items::PlanItem; +use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; +use codex_protocol::items::build_hook_prompt_message; +use codex_protocol::models::BaseInstructions; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseInputItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::AgentMessageContentDeltaEvent; +use codex_protocol::protocol::AgentReasoningSectionBreakEvent; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::CodexErrorInfo; +use codex_protocol::protocol::ErrorEvent; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::PlanDeltaEvent; +use codex_protocol::protocol::ReasoningContentDeltaEvent; +use codex_protocol::protocol::ReasoningRawContentDeltaEvent; +use codex_protocol::protocol::TurnDiffEvent; +use codex_protocol::protocol::WarningEvent; +use codex_protocol::user_input::UserInput; +use codex_tools::filter_tool_suggest_discoverable_tools_for_client; +use codex_utils_stream_parser::AssistantTextChunk; +use codex_utils_stream_parser::AssistantTextStreamParser; +use codex_utils_stream_parser::ProposedPlanSegment; +use codex_utils_stream_parser::extract_proposed_plan_text; +use codex_utils_stream_parser::strip_citations; +use futures::future::BoxFuture; +use futures::prelude::*; +use futures::stream::FuturesOrdered; +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use tracing::error; +use tracing::field; +use tracing::info; +use tracing::instrument; +use tracing::trace; +use tracing::trace_span; +use tracing::warn; + +/// Takes a user message as input and runs a loop where, at each sampling request, the model +/// replies with either: +/// +/// - requested function calls +/// - an assistant message +/// +/// While it is possible for the model to return multiple of these items in a +/// single sampling request, in practice, we generally one item per sampling request: +/// +/// - If the model requests a function call, we execute it and send the output +/// back to the model in the next sampling request. +/// - If the model sends only an assistant message, we record it in the +/// conversation history and consider the turn complete. +/// +pub(crate) async fn run_turn( + sess: Arc, + turn_context: Arc, + input: Vec, + prewarmed_client_session: Option, + cancellation_token: CancellationToken, +) -> Option { + if input.is_empty() && !sess.has_pending_input().await { + return None; + } + + let model_info = turn_context.model_info.clone(); + let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX); + let mut prewarmed_client_session = prewarmed_client_session; + // TODO(ccunningham): Pre-turn compaction runs before context updates and the + // new user message are recorded. Estimate pending incoming items (context + // diffs/full reinjection + user input) and trigger compaction preemptively + // when they would push the thread over the compaction threshold. + let pre_sampling_compacted = match run_pre_sampling_compact(&sess, &turn_context).await { + Ok(pre_sampling_compacted) => pre_sampling_compacted, + Err(_) => { + error!("Failed to run pre-sampling compact"); + return None; + } + }; + if pre_sampling_compacted && let Some(mut client_session) = prewarmed_client_session.take() { + client_session.reset_websocket_session(); + } + + let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); + + sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) + .await; + + let loaded_plugins = sess + .services + .plugins_manager + .plugins_for_config(&turn_context.config) + .await; + // Structured plugin:// mentions are resolved from the current session's + // enabled plugins, then converted into turn-scoped guidance below. + let mentioned_plugins = + collect_explicit_plugin_mentions(&input, loaded_plugins.capability_summaries()); + let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { + // Plugin mentions need raw MCP/app inventory even when app tools + // are normally hidden so we can describe the plugin's currently + // usable capabilities for this turn. + match sess + .services + .mcp_connection_manager + .read() + .await + .list_all_tools() + .or_cancel(&cancellation_token) + .await + { + Ok(mcp_tools) => mcp_tools, + Err(_) if turn_context.apps_enabled() => return None, + Err(_) => HashMap::new(), + } + } else { + HashMap::new() + }; + let available_connectors = if turn_context.apps_enabled() { + let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible( + loaded_plugins + .effective_apps() + .into_iter() + .map(|connector_id| connector_id.0), + connectors::accessible_connectors_from_mcp_tools(&mcp_tools), + ); + connectors::with_app_enabled_state(connectors, &turn_context.config) + } else { + Vec::new() + }; + let connector_slug_counts = build_connector_slug_counts(&available_connectors); + let skill_name_counts_lower = skills_outcome + .as_ref() + .map_or_else(HashMap::new, |outcome| { + build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1 + }); + let mentioned_skills = skills_outcome.as_ref().map_or_else(Vec::new, |outcome| { + collect_explicit_skill_mentions( + &input, + &outcome.skills, + &outcome.disabled_paths, + &connector_slug_counts, + ) + }); + let config = turn_context.config.clone(); + if config + .features + .enabled(Feature::SkillEnvVarDependencyPrompt) + { + let env_var_dependencies = collect_env_var_dependencies(&mentioned_skills); + resolve_skill_dependencies_for_turn(&sess, &turn_context, &env_var_dependencies).await; + } + + maybe_prompt_and_install_mcp_dependencies( + sess.as_ref(), + turn_context.as_ref(), + &cancellation_token, + &mentioned_skills, + ) + .await; + + let session_telemetry = turn_context.session_telemetry.clone(); + let thread_id = sess.conversation_id.to_string(); + let tracking = build_track_events_context( + turn_context.model_info.slug.clone(), + thread_id, + turn_context.sub_id.clone(), + ); + let SkillInjections { + items: skill_items, + warnings: skill_warnings, + } = build_skill_injections( + &mentioned_skills, + skills_outcome, + Some(&session_telemetry), + &sess.services.analytics_events_client, + tracking.clone(), + ) + .await; + + for message in skill_warnings { + sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message })) + .await; + } + + let plugin_items = + build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors); + let mentioned_plugin_metadata = mentioned_plugins + .iter() + .filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata) + .collect::>(); + + let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input); + explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items( + &skill_items, + &available_connectors, + &skill_name_counts_lower, + )); + let connector_names_by_id = available_connectors + .iter() + .map(|connector| (connector.id.as_str(), connector.name.as_str())) + .collect::>(); + let mentioned_app_invocations = explicitly_enabled_connectors + .iter() + .map(|connector_id| AppInvocation { + connector_id: Some(connector_id.clone()), + app_name: connector_names_by_id + .get(connector_id.as_str()) + .map(|name| (*name).to_string()), + invocation_type: Some(InvocationType::Explicit), + }) + .collect::>(); + + if run_pending_session_start_hooks(&sess, &turn_context).await { + return None; + } + let additional_contexts = if input.is_empty() { + Vec::new() + } else { + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone()); + let response_item: ResponseItem = initial_input_for_turn.clone().into(); + let user_prompt_submit_outcome = run_user_prompt_submit_hooks( + &sess, + &turn_context, + UserMessageItem::new(&input).message(), + ) + .await; + if user_prompt_submit_outcome.should_stop { + record_additional_contexts( + &sess, + &turn_context, + user_prompt_submit_outcome.additional_contexts, + ) + .await; + return None; + } + sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item) + .await; + user_prompt_submit_outcome.additional_contexts + }; + sess.services + .analytics_events_client + .track_app_mentioned(tracking.clone(), mentioned_app_invocations); + for plugin in mentioned_plugin_metadata { + sess.services + .analytics_events_client + .track_plugin_used(tracking.clone(), plugin); + } + sess.merge_connector_selection(explicitly_enabled_connectors.clone()) + .await; + record_additional_contexts(&sess, &turn_context, additional_contexts).await; + if !input.is_empty() { + // Track the previous-turn baseline from the regular user-turn path only so + // standalone tasks (compact/shell/review/undo) cannot suppress future + // model/realtime injections. + sess.set_previous_turn_settings(Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + })) + .await; + } + if let Err(error) = sess.ensure_agent_task_registered().await { + warn!(error = %error, "agent task registration failed"); + sess.send_event( + turn_context.as_ref(), + EventMsg::Error(ErrorEvent { + message: format!( + "Agent task registration failed. Please try again; Codex will attempt to register the task again on the next turn: {error}" + ), + codex_error_info: Some(CodexErrorInfo::Other), + }), + ) + .await; + return None; + } + + if !skill_items.is_empty() { + sess.record_conversation_items(&turn_context, &skill_items) + .await; + } + if !plugin_items.is_empty() { + sess.record_conversation_items(&turn_context, &plugin_items) + .await; + } + + track_turn_resolved_config_analytics(&sess, &turn_context, &input).await; + + let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); + sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token()) + .await; + let mut last_agent_message: Option = None; + let mut stop_hook_active = false; + // Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains + // many turns, from the perspective of the user, it is a single turn. + let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())); + let mut server_model_warning_emitted_for_turn = false; + + // `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse + // one instance across retries within this turn. + let mut client_session = + prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session()); + // Pending input is drained into history before building the next model request. + // However, we defer that drain until after sampling in two cases: + // 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first. + // 2. After auto-compact, when model/tool continuation needs to resume before any steer. + let mut can_drain_pending_input = input.is_empty(); + + loop { + if run_pending_session_start_hooks(&sess, &turn_context).await { + break; + } + + // Note that pending_input would be something like a message the user + // submitted through the UI while the model was running. Though the UI + // may support this, the model might not. + let pending_input = if can_drain_pending_input { + sess.get_pending_input().await + } else { + Vec::new() + }; + + let mut blocked_pending_input = false; + let mut blocked_pending_input_contexts = Vec::new(); + let mut requeued_pending_input = false; + let mut accepted_pending_input = Vec::new(); + if !pending_input.is_empty() { + let mut pending_input_iter = pending_input.into_iter(); + while let Some(pending_input_item) = pending_input_iter.next() { + match inspect_pending_input(&sess, &turn_context, pending_input_item).await { + PendingInputHookDisposition::Accepted(pending_input) => { + accepted_pending_input.push(*pending_input); + } + PendingInputHookDisposition::Blocked { + additional_contexts, + } => { + let remaining_pending_input = pending_input_iter.collect::>(); + if !remaining_pending_input.is_empty() { + let _ = sess.prepend_pending_input(remaining_pending_input).await; + requeued_pending_input = true; + } + blocked_pending_input_contexts = additional_contexts; + blocked_pending_input = true; + break; + } + } + } + } + + let has_accepted_pending_input = !accepted_pending_input.is_empty(); + for pending_input in accepted_pending_input { + record_pending_input(&sess, &turn_context, pending_input).await; + } + record_additional_contexts(&sess, &turn_context, blocked_pending_input_contexts).await; + + if blocked_pending_input && !has_accepted_pending_input { + if requeued_pending_input { + continue; + } + break; + } + + // Construct the input that we will send to the model. + let sampling_request_input: Vec = { + sess.clone_history() + .await + .for_prompt(&turn_context.model_info.input_modalities) + }; + + let sampling_request_input_messages = sampling_request_input + .iter() + .filter_map(|item| match parse_turn_item(item) { + Some(TurnItem::UserMessage(user_message)) => Some(user_message), + _ => None, + }) + .map(|user_message| user_message.message()) + .collect::>(); + let turn_metadata_header = turn_context.turn_metadata_state.current_header_value(); + match run_sampling_request( + Arc::clone(&sess), + Arc::clone(&turn_context), + Arc::clone(&turn_diff_tracker), + &mut client_session, + turn_metadata_header.as_deref(), + sampling_request_input, + &explicitly_enabled_connectors, + skills_outcome, + &mut server_model_warning_emitted_for_turn, + cancellation_token.child_token(), + ) + .await + { + Ok(sampling_request_output) => { + let SamplingRequestResult { + needs_follow_up: model_needs_follow_up, + last_agent_message: sampling_request_last_agent_message, + } = sampling_request_output; + can_drain_pending_input = true; + let has_pending_input = sess.has_pending_input().await; + let needs_follow_up = model_needs_follow_up || has_pending_input; + let total_usage_tokens = sess.get_total_token_usage().await; + let token_limit_reached = total_usage_tokens >= auto_compact_limit; + + let estimated_token_count = + sess.get_estimated_token_count(turn_context.as_ref()).await; + + trace!( + turn_id = %turn_context.sub_id, + total_usage_tokens, + estimated_token_count = ?estimated_token_count, + auto_compact_limit, + token_limit_reached, + model_needs_follow_up, + has_pending_input, + needs_follow_up, + "post sampling token usage" + ); + + // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. + if token_limit_reached && needs_follow_up { + if run_auto_compact( + &sess, + &turn_context, + InitialContextInjection::BeforeLastUserMessage, + CompactionReason::ContextLimit, + CompactionPhase::MidTurn, + ) + .await + .is_err() + { + return None; + } + client_session.reset_websocket_session(); + can_drain_pending_input = !model_needs_follow_up; + continue; + } + + if !needs_follow_up { + last_agent_message = sampling_request_last_agent_message; + let stop_hook_permission_mode = match turn_context.approval_policy.value() { + AskForApproval::Never => "bypassPermissions", + AskForApproval::UnlessTrusted + | AskForApproval::OnFailure + | AskForApproval::OnRequest + | AskForApproval::Granular(_) => "default", + } + .to_string(); + let stop_request = codex_hooks::StopRequest { + session_id: sess.conversation_id, + turn_id: turn_context.sub_id.clone(), + cwd: turn_context.cwd.clone(), + transcript_path: sess.hook_transcript_path().await, + model: turn_context.model_info.slug.clone(), + permission_mode: stop_hook_permission_mode, + stop_hook_active, + last_assistant_message: last_agent_message.clone(), + }; + for run in sess.hooks().preview_stop(&stop_request) { + sess.send_event( + &turn_context, + EventMsg::HookStarted(codex_protocol::protocol::HookStartedEvent { + turn_id: Some(turn_context.sub_id.clone()), + run, + }), + ) + .await; + } + let stop_outcome = sess.hooks().run_stop(stop_request).await; + emit_hook_completed_events(&sess, &turn_context, stop_outcome.hook_events) + .await; + if stop_outcome.should_block { + if let Some(hook_prompt_message) = + build_hook_prompt_message(&stop_outcome.continuation_fragments) + { + sess.record_conversation_items( + &turn_context, + std::slice::from_ref(&hook_prompt_message), + ) + .await; + stop_hook_active = true; + continue; + } else { + sess.send_event( + &turn_context, + EventMsg::Warning(WarningEvent { + message: "Stop hook requested continuation without a prompt; ignoring the block.".to_string(), + }), + ) + .await; + } + } + if stop_outcome.should_stop { + break; + } + let hook_outcomes = sess + .hooks() + .dispatch(HookPayload { + session_id: sess.conversation_id, + cwd: turn_context.cwd.clone(), + client: turn_context.app_server_client_name.clone(), + triggered_at: chrono::Utc::now(), + hook_event: HookEvent::AfterAgent { + event: HookEventAfterAgent { + thread_id: sess.conversation_id, + turn_id: turn_context.sub_id.clone(), + input_messages: sampling_request_input_messages, + last_assistant_message: last_agent_message.clone(), + }, + }, + }) + .await; + + let mut abort_message = None; + for hook_outcome in hook_outcomes { + let hook_name = hook_outcome.hook_name; + match hook_outcome.result { + HookResult::Success => {} + HookResult::FailedContinue(error) => { + warn!( + turn_id = %turn_context.sub_id, + hook_name = %hook_name, + error = %error, + "after_agent hook failed; continuing" + ); + } + HookResult::FailedAbort(error) => { + let message = format!( + "after_agent hook '{hook_name}' failed and aborted turn completion: {error}" + ); + warn!( + turn_id = %turn_context.sub_id, + hook_name = %hook_name, + error = %error, + "after_agent hook failed; aborting operation" + ); + if abort_message.is_none() { + abort_message = Some(message); + } + } + } + } + if let Some(message) = abort_message { + sess.send_event( + &turn_context, + EventMsg::Error(ErrorEvent { + message, + codex_error_info: None, + }), + ) + .await; + return None; + } + break; + } + continue; + } + Err(CodexErr::TurnAborted) => { + // Aborted turn is reported via a different event. + break; + } + Err(CodexErr::InvalidImageRequest()) => { + { + let mut state = sess.state.lock().await; + error_or_panic( + "Invalid image detected; sanitizing tool output to prevent poisoning", + ); + if state.history.replace_last_turn_images("Invalid image") { + continue; + } + } + + let event = EventMsg::Error(ErrorEvent { + message: "Invalid image in your last message. Please remove it and try again." + .to_string(), + codex_error_info: Some(CodexErrorInfo::BadRequest), + }); + sess.send_event(&turn_context, event).await; + break; + } + Err(e) => { + info!("Turn error: {e:#}"); + let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None)); + sess.send_event(&turn_context, event).await; + // let the user continue the conversation + break; + } + } + } + + last_agent_message +} + +async fn track_turn_resolved_config_analytics( + sess: &Session, + turn_context: &TurnContext, + input: &[UserInput], +) { + if !sess.enabled(Feature::GeneralAnalytics) { + return; + } + + let thread_config = { + let state = sess.state.lock().await; + state.session_configuration.thread_config_snapshot() + }; + let is_first_turn = { + let mut state = sess.state.lock().await; + state.take_next_turn_is_first() + }; + sess.services + .analytics_events_client + .track_turn_resolved_config(TurnResolvedConfigFact { + turn_id: turn_context.sub_id.clone(), + thread_id: sess.conversation_id.to_string(), + num_input_images: input + .iter() + .filter(|item| { + matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. }) + }) + .count(), + submission_type: None, + ephemeral: thread_config.ephemeral, + session_source: thread_config.session_source, + model: turn_context.model_info.slug.clone(), + model_provider: turn_context.config.model_provider_id.clone(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + reasoning_effort: turn_context.reasoning_effort, + reasoning_summary: Some(turn_context.reasoning_summary), + service_tier: turn_context.config.service_tier, + approval_policy: turn_context.approval_policy.value(), + approvals_reviewer: turn_context.config.approvals_reviewer, + sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(), + collaboration_mode: turn_context.collaboration_mode.mode, + personality: turn_context.personality, + is_first_turn, + }); +} + +async fn run_pre_sampling_compact( + sess: &Arc, + turn_context: &Arc, +) -> CodexResult { + let total_usage_tokens_before_compaction = sess.get_total_token_usage().await; + let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact( + sess, + turn_context, + total_usage_tokens_before_compaction, + ) + .await?; + let total_usage_tokens = sess.get_total_token_usage().await; + let auto_compact_limit = turn_context + .model_info + .auto_compact_token_limit() + .unwrap_or(i64::MAX); + // Compact if the total usage tokens are greater than the auto compact limit + if total_usage_tokens >= auto_compact_limit { + run_auto_compact( + sess, + turn_context, + InitialContextInjection::DoNotInject, + CompactionReason::ContextLimit, + CompactionPhase::PreTurn, + ) + .await?; + pre_sampling_compacted = true; + } + Ok(pre_sampling_compacted) +} + +/// Runs pre-sampling compaction against the previous model when switching to a smaller +/// context-window model. +/// +/// Returns `Ok(true)` when compaction ran successfully, `Ok(false)` when compaction was skipped +/// because the model/context-window preconditions were not met, and `Err(_)` only when compaction +/// was attempted and failed. +async fn maybe_run_previous_model_inline_compact( + sess: &Arc, + turn_context: &Arc, + total_usage_tokens: i64, +) -> CodexResult { + let Some(previous_turn_settings) = sess.previous_turn_settings().await else { + return Ok(false); + }; + let previous_model_turn_context = Arc::new( + turn_context + .with_model(previous_turn_settings.model, &sess.services.models_manager) + .await, + ); + + let Some(old_context_window) = previous_model_turn_context.model_context_window() else { + return Ok(false); + }; + let Some(new_context_window) = turn_context.model_context_window() else { + return Ok(false); + }; + let new_auto_compact_limit = turn_context + .model_info + .auto_compact_token_limit() + .unwrap_or(i64::MAX); + let should_run = total_usage_tokens > new_auto_compact_limit + && previous_model_turn_context.model_info.slug != turn_context.model_info.slug + && old_context_window > new_context_window; + if should_run { + run_auto_compact( + sess, + &previous_model_turn_context, + InitialContextInjection::DoNotInject, + CompactionReason::ModelDownshift, + CompactionPhase::PreTurn, + ) + .await?; + return Ok(true); + } + Ok(false) +} + +async fn run_auto_compact( + sess: &Arc, + turn_context: &Arc, + initial_context_injection: InitialContextInjection, + reason: CompactionReason, + phase: CompactionPhase, +) -> CodexResult<()> { + if should_use_remote_compact_task(&turn_context.provider) { + run_inline_remote_auto_compact_task( + Arc::clone(sess), + Arc::clone(turn_context), + initial_context_injection, + reason, + phase, + ) + .await?; + } else { + run_inline_auto_compact_task( + Arc::clone(sess), + Arc::clone(turn_context), + initial_context_injection, + reason, + phase, + ) + .await?; + } + Ok(()) +} + +pub(super) fn collect_explicit_app_ids_from_skill_items( + skill_items: &[ResponseItem], + connectors: &[connectors::AppInfo], + skill_name_counts_lower: &HashMap, +) -> HashSet { + if skill_items.is_empty() || connectors.is_empty() { + return HashSet::new(); + } + + let skill_messages = skill_items + .iter() + .filter_map(|item| match item { + ResponseItem::Message { content, .. } => { + content.iter().find_map(|content_item| match content_item { + ContentItem::InputText { text } => Some(text.clone()), + _ => None, + }) + } + _ => None, + }) + .collect::>(); + if skill_messages.is_empty() { + return HashSet::new(); + } + + let mentions = collect_tool_mentions_from_messages(&skill_messages); + let mention_names_lower = mentions + .plain_names + .iter() + .map(|name| name.to_ascii_lowercase()) + .collect::>(); + let mut connector_ids = mentions + .paths + .iter() + .filter(|path| tool_kind_for_path(path) == ToolMentionKind::App) + .filter_map(|path| app_id_from_path(path).map(str::to_string)) + .collect::>(); + + let connector_slug_counts = build_connector_slug_counts(connectors); + for connector in connectors { + let slug = codex_connectors::metadata::connector_mention_slug(connector); + let connector_count = connector_slug_counts.get(&slug).copied().unwrap_or(0); + let skill_count = skill_name_counts_lower.get(&slug).copied().unwrap_or(0); + if connector_count == 1 && skill_count == 0 && mention_names_lower.contains(&slug) { + connector_ids.insert(connector.id.clone()); + } + } + + connector_ids +} + +pub(super) fn filter_connectors_for_input( + connectors: &[connectors::AppInfo], + input: &[ResponseItem], + explicitly_enabled_connectors: &HashSet, + skill_name_counts_lower: &HashMap, +) -> Vec { + let connectors: Vec = connectors + .iter() + .filter(|connector| connector.is_enabled) + .cloned() + .collect::>(); + if connectors.is_empty() { + return Vec::new(); + } + + let user_messages = collect_user_messages(input); + if user_messages.is_empty() && explicitly_enabled_connectors.is_empty() { + return Vec::new(); + } + + let mentions = collect_tool_mentions_from_messages(&user_messages); + let mention_names_lower = mentions + .plain_names + .iter() + .map(|name| name.to_ascii_lowercase()) + .collect::>(); + + let connector_slug_counts = build_connector_slug_counts(&connectors); + let mut allowed_connector_ids = explicitly_enabled_connectors.clone(); + for path in mentions + .paths + .iter() + .filter(|path| tool_kind_for_path(path) == ToolMentionKind::App) + { + if let Some(connector_id) = app_id_from_path(path) { + allowed_connector_ids.insert(connector_id.to_string()); + } + } + + connectors + .into_iter() + .filter(|connector| { + connector_inserted_in_messages( + connector, + &mention_names_lower, + &allowed_connector_ids, + &connector_slug_counts, + skill_name_counts_lower, + ) + }) + .collect() +} + +fn connector_inserted_in_messages( + connector: &connectors::AppInfo, + mention_names_lower: &HashSet, + allowed_connector_ids: &HashSet, + connector_slug_counts: &HashMap, + skill_name_counts_lower: &HashMap, +) -> bool { + if allowed_connector_ids.contains(&connector.id) { + return true; + } + + let mention_slug = codex_connectors::metadata::connector_mention_slug(connector); + let connector_count = connector_slug_counts + .get(&mention_slug) + .copied() + .unwrap_or(0); + let skill_count = skill_name_counts_lower + .get(&mention_slug) + .copied() + .unwrap_or(0); + connector_count == 1 && skill_count == 0 && mention_names_lower.contains(&mention_slug) +} + +pub(crate) fn build_prompt( + input: Vec, + router: &ToolRouter, + turn_context: &TurnContext, + base_instructions: BaseInstructions, +) -> Prompt { + let deferred_dynamic_tools = turn_context + .dynamic_tools + .iter() + .filter(|tool| tool.defer_loading) + .map(|tool| tool.name.as_str()) + .collect::>(); + let tools = if deferred_dynamic_tools.is_empty() { + router.model_visible_specs() + } else { + router + .model_visible_specs() + .into_iter() + .filter(|spec| !deferred_dynamic_tools.contains(spec.name())) + .collect() + }; + + Prompt { + input, + tools, + parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls, + base_instructions, + personality: turn_context.personality, + output_schema: turn_context.final_output_json_schema.clone(), + } +} + +#[allow(clippy::too_many_arguments)] +#[instrument(level = "trace", + skip_all, + fields( + turn_id = %turn_context.sub_id, + model = %turn_context.model_info.slug, + cwd = %turn_context.cwd.display() + ) +)] +async fn run_sampling_request( + sess: Arc, + turn_context: Arc, + turn_diff_tracker: SharedTurnDiffTracker, + client_session: &mut ModelClientSession, + turn_metadata_header: Option<&str>, + input: Vec, + explicitly_enabled_connectors: &HashSet, + skills_outcome: Option<&SkillLoadOutcome>, + server_model_warning_emitted_for_turn: &mut bool, + cancellation_token: CancellationToken, +) -> CodexResult { + let router = built_tools( + sess.as_ref(), + turn_context.as_ref(), + &input, + explicitly_enabled_connectors, + skills_outcome, + &cancellation_token, + ) + .await?; + + let base_instructions = sess.get_base_instructions().await; + + let tool_runtime = ToolCallRuntime::new( + Arc::clone(&router), + Arc::clone(&sess), + Arc::clone(&turn_context), + Arc::clone(&turn_diff_tracker), + ); + let _code_mode_worker = sess + .services + .code_mode_service + .start_turn_worker( + &sess, + &turn_context, + Arc::clone(&router), + Arc::clone(&turn_diff_tracker), + ) + .await; + let mut retries = 0; + let mut initial_input = Some(input); + loop { + let prompt_input = if let Some(input) = initial_input.take() { + input + } else { + sess.clone_history() + .await + .for_prompt(&turn_context.model_info.input_modalities) + }; + let prompt = build_prompt( + prompt_input, + router.as_ref(), + turn_context.as_ref(), + base_instructions.clone(), + ); + let err = match try_run_sampling_request( + tool_runtime.clone(), + Arc::clone(&sess), + Arc::clone(&turn_context), + client_session, + turn_metadata_header, + Arc::clone(&turn_diff_tracker), + server_model_warning_emitted_for_turn, + &prompt, + cancellation_token.child_token(), + ) + .await + { + Ok(output) => { + return Ok(output); + } + Err(CodexErr::ContextWindowExceeded) => { + sess.set_total_tokens_full(&turn_context).await; + return Err(CodexErr::ContextWindowExceeded); + } + Err(CodexErr::UsageLimitReached(e)) => { + let rate_limits = e.rate_limits.clone(); + if let Some(rate_limits) = rate_limits { + sess.update_rate_limits(&turn_context, *rate_limits).await; + } + return Err(CodexErr::UsageLimitReached(e)); + } + Err(err) => err, + }; + + if !err.is_retryable() { + return Err(err); + } + + // Use the configured provider-specific stream retry budget. + let max_retries = turn_context.provider.stream_max_retries(); + if retries >= max_retries + && client_session.try_switch_fallback_transport( + &turn_context.session_telemetry, + &turn_context.model_info, + ) + { + sess.send_event( + &turn_context, + EventMsg::Warning(WarningEvent { + message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"), + }), + ) + .await; + retries = 0; + continue; + } + if retries < max_retries { + retries += 1; + let delay = match &err { + CodexErr::Stream(_, requested_delay) => { + requested_delay.unwrap_or_else(|| backoff(retries)) + } + _ => backoff(retries), + }; + warn!( + "stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...", + ); + + // In release builds, hide the first websocket retry notification to reduce noisy + // transient reconnect messages. In debug builds, keep full visibility for diagnosis. + let report_error = retries > 1 + || cfg!(debug_assertions) + || !sess.services.model_client.responses_websocket_enabled(); + if report_error { + // Surface retry information to any UI/front‑end so the + // user understands what is happening instead of staring + // at a seemingly frozen screen. + sess.notify_stream_error( + &turn_context, + format!("Reconnecting... {retries}/{max_retries}"), + err, + ) + .await; + } + tokio::time::sleep(delay).await; + } else { + return Err(err); + } + } +} + +pub(crate) async fn built_tools( + sess: &Session, + turn_context: &TurnContext, + input: &[ResponseItem], + explicitly_enabled_connectors: &HashSet, + skills_outcome: Option<&SkillLoadOutcome>, + cancellation_token: &CancellationToken, +) -> CodexResult> { + let mcp_connection_manager = sess.services.mcp_connection_manager.read().await; + let has_mcp_servers = mcp_connection_manager.has_servers(); + let all_mcp_tools = mcp_connection_manager + .list_all_tools() + .or_cancel(cancellation_token) + .await?; + drop(mcp_connection_manager); + let loaded_plugins = sess + .services + .plugins_manager + .plugins_for_config(&turn_context.config) + .await; + + let mut effective_explicitly_enabled_connectors = explicitly_enabled_connectors.clone(); + effective_explicitly_enabled_connectors.extend(sess.get_connector_selection().await); + + let apps_enabled = turn_context.apps_enabled(); + let accessible_connectors = + apps_enabled.then(|| connectors::accessible_connectors_from_mcp_tools(&all_mcp_tools)); + let accessible_connectors_with_enabled_state = + accessible_connectors.as_ref().map(|connectors| { + connectors::with_app_enabled_state(connectors.clone(), &turn_context.config) + }); + let connectors = if apps_enabled { + let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible( + loaded_plugins + .effective_apps() + .into_iter() + .map(|connector_id| connector_id.0), + accessible_connectors.clone().unwrap_or_default(), + ); + Some(connectors::with_app_enabled_state( + connectors, + &turn_context.config, + )) + } else { + None + }; + let auth = sess.services.auth_manager.auth().await; + let discoverable_tools = if apps_enabled && turn_context.tools_config.tool_suggest { + if let Some(accessible_connectors) = accessible_connectors_with_enabled_state.as_ref() { + match connectors::list_tool_suggest_discoverable_tools_with_auth( + &turn_context.config, + auth.as_ref(), + accessible_connectors.as_slice(), + ) + .await + .map(|discoverable_tools| { + filter_tool_suggest_discoverable_tools_for_client( + discoverable_tools, + turn_context.app_server_client_name.as_deref(), + ) + }) { + Ok(discoverable_tools) if discoverable_tools.is_empty() => None, + Ok(discoverable_tools) => Some(discoverable_tools), + Err(err) => { + warn!("failed to load discoverable tool suggestions: {err:#}"); + None + } + } + } else { + None + } + } else { + None + }; + + let explicitly_enabled = if let Some(connectors) = connectors.as_ref() { + let skill_name_counts_lower = skills_outcome.map_or_else(HashMap::new, |outcome| { + build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1 + }); + + filter_connectors_for_input( + connectors, + input, + &effective_explicitly_enabled_connectors, + &skill_name_counts_lower, + ) + } else { + Vec::new() + }; + let mcp_tool_exposure = build_mcp_tool_exposure( + &all_mcp_tools, + connectors.as_deref(), + explicitly_enabled.as_slice(), + &turn_context.config, + &turn_context.tools_config, + ); + let mcp_tools = has_mcp_servers.then_some(mcp_tool_exposure.direct_tools); + let deferred_mcp_tools = mcp_tool_exposure.deferred_tools; + let unavailable_called_tools = if turn_context + .config + .features + .enabled(Feature::UnavailableDummyTools) + { + let exposed_tool_names = mcp_tools + .iter() + .chain(deferred_mcp_tools.iter()) + .flat_map(|tools| tools.keys().map(String::as_str)) + .collect::>(); + collect_unavailable_called_tools(input, &exposed_tool_names) + } else { + Vec::new() + }; + + let parallel_mcp_server_names = turn_context + .config + .mcp_servers + .get() + .iter() + .filter_map(|(server_name, server_config)| { + server_config + .supports_parallel_tool_calls + .then_some(server_name.clone()) + }) + .collect::>(); + + Ok(Arc::new(ToolRouter::from_config( + &turn_context.tools_config, + ToolRouterParams { + mcp_tools, + deferred_mcp_tools, + unavailable_called_tools, + parallel_mcp_server_names, + discoverable_tools, + dynamic_tools: turn_context.dynamic_tools.as_slice(), + }, + ))) +} + +#[derive(Debug)] +struct SamplingRequestResult { + needs_follow_up: bool, + last_agent_message: Option, +} + +/// Ephemeral per-response state for streaming a single proposed plan. +/// This is intentionally not persisted or stored in session/state since it +/// only exists while a response is actively streaming. The final plan text +/// is extracted from the completed assistant message. +/// Tracks a single proposed plan item across a streaming response. +struct ProposedPlanItemState { + item_id: String, + started: bool, + completed: bool, +} + +/// Aggregated state used only while streaming a plan-mode response. +/// Includes per-item parsers, deferred agent message bookkeeping, and the plan item lifecycle. +struct PlanModeStreamState { + /// Agent message items started by the model but deferred until we see non-plan text. + pending_agent_message_items: HashMap, + /// Agent message items whose start notification has been emitted. + started_agent_message_items: HashSet, + /// Leading whitespace buffered until we see non-whitespace text for an item. + leading_whitespace_by_item: HashMap, + /// Tracks plan item lifecycle while streaming plan output. + plan_item_state: ProposedPlanItemState, +} + +impl PlanModeStreamState { + fn new(turn_id: &str) -> Self { + Self { + pending_agent_message_items: HashMap::new(), + started_agent_message_items: HashSet::new(), + leading_whitespace_by_item: HashMap::new(), + plan_item_state: ProposedPlanItemState::new(turn_id), + } + } +} + +#[derive(Debug, Default)] +pub(super) struct AssistantMessageStreamParsers { + plan_mode: bool, + parsers_by_item: HashMap, +} + +type ParsedAssistantTextDelta = AssistantTextChunk; + +impl AssistantMessageStreamParsers { + pub(super) fn new(plan_mode: bool) -> Self { + Self { + plan_mode, + parsers_by_item: HashMap::new(), + } + } + + fn parser_mut(&mut self, item_id: &str) -> &mut AssistantTextStreamParser { + let plan_mode = self.plan_mode; + self.parsers_by_item + .entry(item_id.to_string()) + .or_insert_with(|| AssistantTextStreamParser::new(plan_mode)) + } + + pub(super) fn seed_item_text(&mut self, item_id: &str, text: &str) -> ParsedAssistantTextDelta { + if text.is_empty() { + return ParsedAssistantTextDelta::default(); + } + self.parser_mut(item_id).push_str(text) + } + + pub(super) fn parse_delta(&mut self, item_id: &str, delta: &str) -> ParsedAssistantTextDelta { + self.parser_mut(item_id).push_str(delta) + } + + pub(super) fn finish_item(&mut self, item_id: &str) -> ParsedAssistantTextDelta { + let Some(mut parser) = self.parsers_by_item.remove(item_id) else { + return ParsedAssistantTextDelta::default(); + }; + parser.finish() + } + + fn drain_finished(&mut self) -> Vec<(String, ParsedAssistantTextDelta)> { + let parsers_by_item = std::mem::take(&mut self.parsers_by_item); + parsers_by_item + .into_iter() + .map(|(item_id, mut parser)| (item_id, parser.finish())) + .collect() + } +} + +impl ProposedPlanItemState { + fn new(turn_id: &str) -> Self { + Self { + item_id: format!("{turn_id}-plan"), + started: false, + completed: false, + } + } + + async fn start(&mut self, sess: &Session, turn_context: &TurnContext) { + if self.started || self.completed { + return; + } + self.started = true; + let item = TurnItem::Plan(PlanItem { + id: self.item_id.clone(), + text: String::new(), + }); + sess.emit_turn_item_started(turn_context, &item).await; + } + + async fn push_delta(&mut self, sess: &Session, turn_context: &TurnContext, delta: &str) { + if self.completed { + return; + } + if delta.is_empty() { + return; + } + let event = PlanDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: self.item_id.clone(), + delta: delta.to_string(), + }; + sess.send_event(turn_context, EventMsg::PlanDelta(event)) + .await; + } + + async fn complete_with_text( + &mut self, + sess: &Session, + turn_context: &TurnContext, + text: String, + ) { + if self.completed || !self.started { + return; + } + self.completed = true; + let item = TurnItem::Plan(PlanItem { + id: self.item_id.clone(), + text, + }); + sess.emit_turn_item_completed(turn_context, item).await; + } +} + +/// In plan mode we defer agent message starts until the parser emits non-plan +/// text. The parser buffers each line until it can rule out a tag prefix, so +/// plan-only outputs never show up as empty assistant messages. +async fn maybe_emit_pending_agent_message_start( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item_id: &str, +) { + if state.started_agent_message_items.contains(item_id) { + return; + } + if let Some(item) = state.pending_agent_message_items.remove(item_id) { + sess.emit_turn_item_started(turn_context, &item).await; + state + .started_agent_message_items + .insert(item_id.to_string()); + } +} + +/// Agent messages are text-only today; concatenate all text entries. +fn agent_message_text(item: &codex_protocol::items::AgentMessageItem) -> String { + item.content + .iter() + .map(|entry| match entry { + codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), + }) + .collect() +} + +pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { + match msg { + EventMsg::AgentMessage(event) => Some(event.message.clone()), + EventMsg::ItemCompleted(event) => match &event.item { + TurnItem::AgentMessage(item) => Some(agent_message_text(item)), + _ => None, + }, + EventMsg::Error(_) + | EventMsg::Warning(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationSdp(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) + | EventMsg::ModelReroute(_) + | EventMsg::ContextCompacted(_) + | EventMsg::ThreadRolledBack(_) + | EventMsg::TurnStarted(_) + | EventMsg::TurnComplete(_) + | EventMsg::TokenCount(_) + | EventMsg::UserMessage(_) + | EventMsg::AgentMessageDelta(_) + | EventMsg::AgentReasoning(_) + | EventMsg::AgentReasoningDelta(_) + | EventMsg::AgentReasoningRawContent(_) + | EventMsg::AgentReasoningRawContentDelta(_) + | EventMsg::AgentReasoningSectionBreak(_) + | EventMsg::SessionConfigured(_) + | EventMsg::ThreadNameUpdated(_) + | EventMsg::McpStartupUpdate(_) + | EventMsg::McpStartupComplete(_) + | EventMsg::McpToolCallBegin(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::WebSearchBegin(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandOutputDelta(_) + | EventMsg::TerminalInteraction(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::PatchApplyBegin(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::ViewImageToolCall(_) + | EventMsg::ImageGenerationBegin(_) + | EventMsg::ImageGenerationEnd(_) + | EventMsg::ExecApprovalRequest(_) + | EventMsg::RequestPermissions(_) + | EventMsg::RequestUserInput(_) + | EventMsg::DynamicToolCallRequest(_) + | EventMsg::DynamicToolCallResponse(_) + | EventMsg::GuardianAssessment(_) + | EventMsg::ElicitationRequest(_) + | EventMsg::ApplyPatchApprovalRequest(_) + | EventMsg::DeprecationNotice(_) + | EventMsg::BackgroundEvent(_) + | EventMsg::UndoStarted(_) + | EventMsg::UndoCompleted(_) + | EventMsg::StreamError(_) + | EventMsg::TurnDiff(_) + | EventMsg::GetHistoryEntryResponse(_) + | EventMsg::McpListToolsResponse(_) + | EventMsg::ListSkillsResponse(_) + | EventMsg::RealtimeConversationListVoicesResponse(_) + | EventMsg::SkillsUpdateAvailable + | EventMsg::PlanUpdate(_) + | EventMsg::TurnAborted(_) + | EventMsg::ShutdownComplete + | EventMsg::EnteredReviewMode(_) + | EventMsg::ExitedReviewMode(_) + | EventMsg::RawResponseItem(_) + | EventMsg::ItemStarted(_) + | EventMsg::HookStarted(_) + | EventMsg::HookCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::PlanDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) + | EventMsg::CollabAgentSpawnBegin(_) + | EventMsg::CollabAgentSpawnEnd(_) + | EventMsg::CollabAgentInteractionBegin(_) + | EventMsg::CollabAgentInteractionEnd(_) + | EventMsg::CollabWaitingBegin(_) + | EventMsg::CollabWaitingEnd(_) + | EventMsg::CollabCloseBegin(_) + | EventMsg::CollabCloseEnd(_) + | EventMsg::CollabResumeBegin(_) + | EventMsg::CollabResumeEnd(_) => None, + } +} + +/// Split the stream into normal assistant text vs. proposed plan content. +/// Normal text becomes AgentMessage deltas; plan content becomes PlanDelta + +/// TurnItem::Plan. +async fn handle_plan_segments( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item_id: &str, + segments: Vec, +) { + for segment in segments { + match segment { + ProposedPlanSegment::Normal(delta) => { + if delta.is_empty() { + continue; + } + let has_non_whitespace = delta.chars().any(|ch| !ch.is_whitespace()); + if !has_non_whitespace && !state.started_agent_message_items.contains(item_id) { + let entry = state + .leading_whitespace_by_item + .entry(item_id.to_string()) + .or_default(); + entry.push_str(&delta); + continue; + } + let delta = if !state.started_agent_message_items.contains(item_id) { + if let Some(prefix) = state.leading_whitespace_by_item.remove(item_id) { + format!("{prefix}{delta}") + } else { + delta + } + } else { + delta + }; + maybe_emit_pending_agent_message_start(sess, turn_context, state, item_id).await; + + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: item_id.to_string(), + delta, + }; + sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; + } + ProposedPlanSegment::ProposedPlanStart => { + if !state.plan_item_state.completed { + state.plan_item_state.start(sess, turn_context).await; + } + } + ProposedPlanSegment::ProposedPlanDelta(delta) => { + if !state.plan_item_state.completed { + if !state.plan_item_state.started { + state.plan_item_state.start(sess, turn_context).await; + } + state + .plan_item_state + .push_delta(sess, turn_context, &delta) + .await; + } + } + ProposedPlanSegment::ProposedPlanEnd => {} + } + } +} + +async fn emit_streamed_assistant_text_delta( + sess: &Session, + turn_context: &TurnContext, + plan_mode_state: Option<&mut PlanModeStreamState>, + item_id: &str, + parsed: ParsedAssistantTextDelta, +) { + if parsed.is_empty() { + return; + } + if !parsed.citations.is_empty() { + // Citation extraction is intentionally local for now; we strip citations from display text + // but do not yet surface them in protocol events. + let _citations = parsed.citations; + } + if let Some(state) = plan_mode_state { + if !parsed.plan_segments.is_empty() { + handle_plan_segments(sess, turn_context, state, item_id, parsed.plan_segments).await; + } + return; + } + if parsed.visible_text.is_empty() { + return; + } + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: item_id.to_string(), + delta: parsed.visible_text, + }; + sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; +} + +/// Flush buffered assistant text parser state when an assistant message item ends. +async fn flush_assistant_text_segments_for_item( + sess: &Session, + turn_context: &TurnContext, + plan_mode_state: Option<&mut PlanModeStreamState>, + parsers: &mut AssistantMessageStreamParsers, + item_id: &str, +) { + let parsed = parsers.finish_item(item_id); + emit_streamed_assistant_text_delta(sess, turn_context, plan_mode_state, item_id, parsed).await; +} + +/// Flush any remaining buffered assistant text parser state at response completion. +async fn flush_assistant_text_segments_all( + sess: &Session, + turn_context: &TurnContext, + mut plan_mode_state: Option<&mut PlanModeStreamState>, + parsers: &mut AssistantMessageStreamParsers, +) { + for (item_id, parsed) in parsers.drain_finished() { + emit_streamed_assistant_text_delta( + sess, + turn_context, + plan_mode_state.as_deref_mut(), + &item_id, + parsed, + ) + .await; + } +} + +/// Emit completion for plan items by parsing the finalized assistant message. +async fn maybe_complete_plan_item_from_message( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item: &ResponseItem, +) { + if let ResponseItem::Message { role, content, .. } = item + && role == "assistant" + { + let mut text = String::new(); + for entry in content { + if let ContentItem::OutputText { text: chunk } = entry { + text.push_str(chunk); + } + } + if let Some(plan_text) = extract_proposed_plan_text(&text) { + let (plan_text, _citations) = strip_citations(&plan_text); + if !state.plan_item_state.started { + state.plan_item_state.start(sess, turn_context).await; + } + state + .plan_item_state + .complete_with_text(sess, turn_context, plan_text) + .await; + } + } +} + +/// Emit a completed agent message in plan mode, respecting deferred starts. +async fn emit_agent_message_in_plan_mode( + sess: &Session, + turn_context: &TurnContext, + agent_message: codex_protocol::items::AgentMessageItem, + state: &mut PlanModeStreamState, +) { + let agent_message_id = agent_message.id.clone(); + let text = agent_message_text(&agent_message); + if text.trim().is_empty() { + state.pending_agent_message_items.remove(&agent_message_id); + state.started_agent_message_items.remove(&agent_message_id); + return; + } + + maybe_emit_pending_agent_message_start(sess, turn_context, state, &agent_message_id).await; + + if !state + .started_agent_message_items + .contains(&agent_message_id) + { + let start_item = state + .pending_agent_message_items + .remove(&agent_message_id) + .unwrap_or_else(|| { + TurnItem::AgentMessage(codex_protocol::items::AgentMessageItem { + id: agent_message_id.clone(), + content: Vec::new(), + phase: None, + memory_citation: None, + }) + }); + sess.emit_turn_item_started(turn_context, &start_item).await; + state + .started_agent_message_items + .insert(agent_message_id.clone()); + } + + sess.emit_turn_item_completed(turn_context, TurnItem::AgentMessage(agent_message)) + .await; + state.started_agent_message_items.remove(&agent_message_id); +} + +/// Emit completion for a plan-mode turn item, handling agent messages specially. +async fn emit_turn_item_in_plan_mode( + sess: &Session, + turn_context: &TurnContext, + turn_item: TurnItem, + previously_active_item: Option<&TurnItem>, + state: &mut PlanModeStreamState, +) { + match turn_item { + TurnItem::AgentMessage(agent_message) => { + emit_agent_message_in_plan_mode(sess, turn_context, agent_message, state).await; + } + _ => { + if previously_active_item.is_none() { + sess.emit_turn_item_started(turn_context, &turn_item).await; + } + sess.emit_turn_item_completed(turn_context, turn_item).await; + } + } +} + +/// Handle a completed assistant response item in plan mode, returning true if handled. +async fn handle_assistant_item_done_in_plan_mode( + sess: &Session, + turn_context: &TurnContext, + item: &ResponseItem, + state: &mut PlanModeStreamState, + previously_active_item: Option<&TurnItem>, + last_agent_message: &mut Option, +) -> bool { + if let ResponseItem::Message { role, .. } = item + && role == "assistant" + { + maybe_complete_plan_item_from_message(sess, turn_context, state, item).await; + + if let Some(turn_item) = + handle_non_tool_response_item(sess, turn_context, item, /*plan_mode*/ true).await + { + emit_turn_item_in_plan_mode( + sess, + turn_context, + turn_item, + previously_active_item, + state, + ) + .await; + } + + record_completed_response_item(sess, turn_context, item).await; + if let Some(agent_message) = last_assistant_message_from_item(item, /*plan_mode*/ true) { + *last_agent_message = Some(agent_message); + } + return true; + } + false +} + +async fn drain_in_flight( + in_flight: &mut FuturesOrdered>>, + sess: Arc, + turn_context: Arc, +) -> CodexResult<()> { + while let Some(res) = in_flight.next().await { + match res { + Ok(response_input) => { + let response_item = response_input.into(); + sess.record_conversation_items(&turn_context, std::slice::from_ref(&response_item)) + .await; + mark_thread_memory_mode_polluted_if_external_context( + sess.as_ref(), + turn_context.as_ref(), + &response_item, + ) + .await; + } + Err(err) => { + error_or_panic(format!("in-flight tool future failed during drain: {err}")); + } + } + } + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +#[instrument(level = "trace", + skip_all, + fields( + turn_id = %turn_context.sub_id, + model = %turn_context.model_info.slug + ) +)] +async fn try_run_sampling_request( + tool_runtime: ToolCallRuntime, + sess: Arc, + turn_context: Arc, + client_session: &mut ModelClientSession, + turn_metadata_header: Option<&str>, + turn_diff_tracker: SharedTurnDiffTracker, + server_model_warning_emitted_for_turn: &mut bool, + prompt: &Prompt, + cancellation_token: CancellationToken, +) -> CodexResult { + feedback_tags!( + model = turn_context.model_info.slug.clone(), + approval_policy = turn_context.approval_policy.value(), + sandbox_policy = turn_context.sandbox_policy.get(), + effort = turn_context.reasoning_effort, + auth_mode = sess.services.auth_manager.auth_mode(), + features = sess.features.enabled_features(), + ); + let mut stream = client_session + .stream( + prompt, + &turn_context.model_info, + &turn_context.session_telemetry, + turn_context.reasoning_effort, + turn_context.reasoning_summary, + turn_context.config.service_tier, + turn_metadata_header, + ) + .instrument(trace_span!("stream_request")) + .or_cancel(&cancellation_token) + .await??; + let mut in_flight: FuturesOrdered>> = + FuturesOrdered::new(); + let mut needs_follow_up = false; + let mut last_agent_message: Option = None; + let mut active_item: Option = None; + let mut should_emit_turn_diff = false; + let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan; + let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode); + let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id)); + let receiving_span = trace_span!("receiving_stream"); + let outcome: CodexResult = loop { + let handle_responses = trace_span!( + parent: &receiving_span, + "handle_responses", + otel.name = field::Empty, + tool_name = field::Empty, + from = field::Empty, + ); + + let event = match stream + .next() + .instrument(trace_span!(parent: &handle_responses, "receiving")) + .or_cancel(&cancellation_token) + .await + { + Ok(event) => event, + Err(codex_async_utils::CancelErr::Cancelled) => break Err(CodexErr::TurnAborted), + }; + + let event = match event { + Some(Ok(event)) => event, + Some(Err(err)) => break Err(err), + None => { + break Err(CodexErr::Stream( + "stream closed before response.completed".into(), + None, + )); + } + }; + + sess.services + .session_telemetry + .record_responses(&handle_responses, &event); + record_turn_ttft_metric(&turn_context, &event).await; + + match event { + ResponseEvent::Created => {} + ResponseEvent::OutputItemDone(item) => { + let previously_active_item = active_item.take(); + if let Some(previous) = previously_active_item.as_ref() + && matches!(previous, TurnItem::AgentMessage(_)) + { + let item_id = previous.id(); + flush_assistant_text_segments_for_item( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &mut assistant_message_stream_parsers, + &item_id, + ) + .await; + } + if let Some(state) = plan_mode_state.as_mut() + && handle_assistant_item_done_in_plan_mode( + &sess, + &turn_context, + &item, + state, + previously_active_item.as_ref(), + &mut last_agent_message, + ) + .await + { + continue; + } + + let mut ctx = HandleOutputCtx { + sess: sess.clone(), + turn_context: turn_context.clone(), + tool_runtime: tool_runtime.clone(), + cancellation_token: cancellation_token.child_token(), + }; + + let output_result = + match handle_output_item_done(&mut ctx, item, previously_active_item) + .instrument(handle_responses) + .await + { + Ok(output_result) => output_result, + Err(err) => break Err(err), + }; + if let Some(tool_future) = output_result.tool_future { + in_flight.push_back(tool_future); + } + if let Some(agent_message) = output_result.last_agent_message { + last_agent_message = Some(agent_message); + } + needs_follow_up |= output_result.needs_follow_up; + } + ResponseEvent::OutputItemAdded(item) => { + if let Some(turn_item) = handle_non_tool_response_item( + sess.as_ref(), + turn_context.as_ref(), + &item, + plan_mode, + ) + .await + { + let mut turn_item = turn_item; + let mut seeded_parsed: Option = None; + let mut seeded_item_id: Option = None; + if matches!(turn_item, TurnItem::AgentMessage(_)) + && let Some(raw_text) = raw_assistant_output_text_from_item(&item) + { + let item_id = turn_item.id(); + let mut seeded = + assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text); + if let TurnItem::AgentMessage(agent_message) = &mut turn_item { + agent_message.content = + vec![codex_protocol::items::AgentMessageContent::Text { + text: if plan_mode { + String::new() + } else { + std::mem::take(&mut seeded.visible_text) + }, + }]; + } + seeded_parsed = plan_mode.then_some(seeded); + seeded_item_id = Some(item_id); + } + if let Some(state) = plan_mode_state.as_mut() + && matches!(turn_item, TurnItem::AgentMessage(_)) + { + let item_id = turn_item.id(); + state + .pending_agent_message_items + .insert(item_id, turn_item.clone()); + } else { + sess.emit_turn_item_started(&turn_context, &turn_item).await; + } + if let (Some(state), Some(item_id), Some(parsed)) = ( + plan_mode_state.as_mut(), + seeded_item_id.as_deref(), + seeded_parsed, + ) { + emit_streamed_assistant_text_delta( + &sess, + &turn_context, + Some(state), + item_id, + parsed, + ) + .await; + } + active_item = Some(turn_item); + } + } + ResponseEvent::ServerModel(server_model) => { + if !*server_model_warning_emitted_for_turn + && sess + .maybe_warn_on_server_model_mismatch(&turn_context, server_model) + .await + { + *server_model_warning_emitted_for_turn = true; + } + } + ResponseEvent::ServerReasoningIncluded(included) => { + sess.set_server_reasoning_included(included).await; + } + ResponseEvent::RateLimits(snapshot) => { + // Update internal state with latest rate limits, but defer sending until + // token usage is available to avoid duplicate TokenCount events. + sess.update_rate_limits(&turn_context, snapshot).await; + } + ResponseEvent::ModelsEtag(etag) => { + // Update internal state with latest models etag + sess.services.models_manager.refresh_if_new_etag(etag).await; + } + ResponseEvent::Completed { + response_id: _, + token_usage, + } => { + flush_assistant_text_segments_all( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &mut assistant_message_stream_parsers, + ) + .await; + sess.update_token_usage_info(&turn_context, token_usage.as_ref()) + .await; + should_emit_turn_diff = true; + + break Ok(SamplingRequestResult { + needs_follow_up, + last_agent_message, + }); + } + ResponseEvent::OutputTextDelta(delta) => { + // In review child threads, suppress assistant text deltas; the + // UI will show a selection popup from the final ReviewOutput. + if let Some(active) = active_item.as_ref() { + let item_id = active.id(); + if matches!(active, TurnItem::AgentMessage(_)) { + let parsed = assistant_message_stream_parsers.parse_delta(&item_id, &delta); + emit_streamed_assistant_text_delta( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &item_id, + parsed, + ) + .await; + } else { + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id, + delta, + }; + sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; + } + } else { + error_or_panic("OutputTextDelta without active item".to_string()); + } + } + ResponseEvent::ReasoningSummaryDelta { + delta, + summary_index, + } => { + if let Some(active) = active_item.as_ref() { + let event = ReasoningContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta, + summary_index, + }; + sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningSummaryDelta without active item".to_string()); + } + } + ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { + if let Some(active) = active_item.as_ref() { + let event = + EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { + item_id: active.id(), + summary_index, + }); + sess.send_event(&turn_context, event).await; + } else { + error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); + } + } + ResponseEvent::ReasoningContentDelta { + delta, + content_index, + } => { + if let Some(active) = active_item.as_ref() { + let event = ReasoningRawContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta, + content_index, + }; + sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningRawContentDelta without active item".to_string()); + } + } + } + }; + + flush_assistant_text_segments_all( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &mut assistant_message_stream_parsers, + ) + .await; + + drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?; + + if cancellation_token.is_cancelled() { + return Err(CodexErr::TurnAborted); + } + + if should_emit_turn_diff { + let unified_diff = { + let mut tracker = turn_diff_tracker.lock().await; + tracker.get_unified_diff() + }; + if let Ok(Some(unified_diff)) = unified_diff { + let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff }); + sess.clone().send_event(&turn_context, msg).await; + } + } + + outcome +} + +pub(crate) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { + for item in responses.iter().rev() { + if let Some(message) = last_assistant_message_from_item(item, /*plan_mode*/ false) { + return Some(message); + } + } + None +}