From 6ec5e2626de876d1d33945335cdf60ddf978d137 Mon Sep 17 00:00:00 2001 From: Edward Frazer Date: Mon, 11 May 2026 13:08:51 -0700 Subject: [PATCH] refactor: move queued turn dispatch into core --- codex-rs/app-server/src/message_processor.rs | 2 - codex-rs/app-server/src/request_processors.rs | 3 - .../request_processors/thread_lifecycle.rs | 7 +- .../request_processors/thread_processor.rs | 6 +- .../thread_queue_processor.rs | 97 +--- .../src/request_processors/turn_processor.rs | 254 +++------- codex-rs/core/src/codex_thread.rs | 82 +--- codex-rs/core/src/config/mod.rs | 31 ++ codex-rs/core/src/lib.rs | 2 +- codex-rs/core/src/queued_turns.rs | 122 +++++ codex-rs/core/src/session/handlers.rs | 432 +++++++++++++++--- codex-rs/core/src/session/mod.rs | 7 +- codex-rs/core/src/session/tests.rs | 25 + codex-rs/core/src/session/turn.rs | 1 + codex-rs/core/src/tasks/mod.rs | 3 + codex-rs/protocol/src/protocol.rs | 11 + codex-rs/rollout-trace/src/protocol_event.rs | 2 + codex-rs/rollout/src/policy.rs | 1 + 18 files changed, 682 insertions(+), 406 deletions(-) create mode 100644 codex-rs/core/src/queued_turns.rs diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 3a05eeb0c8..0eac719835 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -382,9 +382,7 @@ impl MessageProcessor { Arc::clone(&thread_manager), outgoing.clone(), analytics_events_client.clone(), - arg0_paths.clone(), Arc::clone(&config), - config_manager.clone(), Arc::clone(&pending_thread_unloads), thread_state_manager.clone(), thread_watch_manager.clone(), diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index f923ef0e0b..08027a0a67 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -253,7 +253,6 @@ use codex_config::ConfigLayerStack; use codex_config::loader::project_trust_key; use codex_config::types::McpServerTransportConfig; use codex_core::CodexThread; -use codex_core::CodexThreadTurnContextOverrides; use codex_core::ExternalGoalPreviousStatus; use codex_core::ExternalGoalSet; use codex_core::ForkSnapshot; @@ -339,9 +338,7 @@ use codex_mcp::resolve_oauth_scopes; use codex_memories_write::clear_memory_roots_contents; use codex_model_provider::ProviderAccountError; use codex_model_provider::create_model_provider; -use codex_models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets; use codex_protocol::ThreadId; -use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::ForcedLoginMethod; use codex_protocol::config_types::Personality; use codex_protocol::config_types::TrustLevel; diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 1a70a0e94c..6108daa9af 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -323,11 +323,14 @@ pub(super) async fn ensure_listener_task_running( fallback_model_provider.clone(), ) .await; - if matches!(event.msg, EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) + if let EventMsg::ThreadQueuedTurnDispatched(dispatched) = &event.msg && let Some(thread_queue_processor) = thread_queue_processor.as_ref() { thread_queue_processor - .drain_thread_queue_after_terminal_turn(conversation_id) + .on_queued_turn_dispatched( + conversation_id, + dispatched.turn_has_input, + ) .await; } } diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 546cccd4ec..e8f16c5b35 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2569,9 +2569,9 @@ impl ThreadRequestProcessor { self.thread_queue_processor .emit_thread_queue_snapshot(thread_id) .await; - self.thread_queue_processor - .drain_thread_queue_after_terminal_turn(thread_id) - .await; + if let Err(err) = codex_thread.continue_queued_turn_if_idle().await { + warn!("failed to continue queued turn after resume: {err}"); + } } Err(err) => { let error = internal_error(format!("error resuming thread: {err}")); diff --git a/codex-rs/app-server/src/request_processors/thread_queue_processor.rs b/codex-rs/app-server/src/request_processors/thread_queue_processor.rs index c229456e72..e1f436540f 100644 --- a/codex-rs/app-server/src/request_processors/thread_queue_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_queue_processor.rs @@ -52,6 +52,9 @@ impl ThreadQueueRequestProcessor { )); } let state_db = self.state_db_for_materialized_thread(thread_id).await?; + self.turn_processor + .validate_thread_queued_turn(params.turn_start_params.clone()) + .await?; let turn_start_params_json = serde_json::to_string(¶ms.turn_start_params).map_err(|err| { internal_error(format!("failed to serialize queued turn params: {err}")) @@ -65,7 +68,7 @@ impl ThreadQueueRequestProcessor { let queued_turns = read_api_thread_queue(&state_db, thread_id).await?; self.emit_thread_queue_changed_ordered(thread_id, queued_turns) .await; - self.drain_thread_queue_if_idle(thread_id, &state_db).await; + self.continue_queued_turn_if_idle(thread_id).await; Ok(Some(ThreadQueueAddResponse { queued_turn }.into())) } @@ -113,92 +116,30 @@ impl ThreadQueueRequestProcessor { Ok(Some(ThreadQueueReorderResponse { queued_turns }.into())) } - pub(crate) async fn drain_thread_queue_after_terminal_turn(&self, thread_id: ThreadId) { - let state_db = match self.state_db_for_materialized_thread(thread_id).await { - Ok(state_db) => state_db, - Err(err) => { - warn!( - "failed to open state db before draining thread queue for {thread_id}: {}", - err.message - ); - return; - } - }; - self.drain_thread_queue_if_idle(thread_id, &state_db).await; - } - - async fn drain_thread_queue_if_idle(&self, thread_id: ThreadId, state_db: &StateDbHandle) { + async fn continue_queued_turn_if_idle(&self, thread_id: ThreadId) { let Ok(thread) = self.thread_manager.get_thread(thread_id).await else { return; }; - if self - .thread_has_live_in_progress_turn(thread_id, thread.as_ref()) - .await - { - return; - } - let queued_turn = match state_db.first_thread_queued_turn(thread_id).await { - Ok(Some(queued_turn)) => queued_turn, - Ok(None) => return, - Err(err) => { - warn!("failed to read next queued turn for {thread_id}: {err}"); - return; - } - }; - let turn_start_params = - match serde_json::from_str(queued_turn.turn_start_params_json.as_str()) { - Ok(turn_start_params) => turn_start_params, - Err(err) => { - warn!("failed to decode next queued turn for {thread_id}: {err}"); - return; - } - }; - - match self - .turn_processor - .queued_turn_start(turn_start_params) - .await - { - Ok(_) => { - if let Err(err) = state_db - .delete_thread_queued_turn(thread_id, queued_turn.queued_turn_id.as_str()) - .await - { - warn!( - "failed to remove dispatched queued turn {} for {thread_id}: {err}", - queued_turn.queued_turn_id - ); - return; - } - } - Err(error) => { - warn!( - "failed to dispatch queued turn {} for {thread_id}: {}", - queued_turn.queued_turn_id, error.message - ); - return; - } - } - - match read_api_thread_queue(state_db, thread_id).await { - Ok(queued_turns) => { - self.emit_thread_queue_changed_ordered(thread_id, queued_turns) - .await; - } - Err(err) => warn!("{}", err.message), + if let Err(err) = thread.continue_queued_turn_if_idle().await { + warn!("failed to continue queued turn for {thread_id}: {err}"); } } - async fn thread_has_live_in_progress_turn( + pub(crate) async fn on_queued_turn_dispatched( &self, thread_id: ThreadId, - thread: &CodexThread, - ) -> bool { - if matches!(thread.agent_status().await, AgentStatus::Running) { - return true; + turn_has_input: bool, + ) { + self.emit_thread_queue_snapshot(thread_id).await; + if !turn_has_input { + return; } - let thread_state = self.thread_state_manager.thread_state(thread_id).await; - thread_state.lock().await.active_turn_snapshot().is_some() + let Ok(thread) = self.thread_manager.get_thread(thread_id).await else { + return; + }; + self.turn_processor + .start_memories_startup_task_for_thread(thread_id, thread) + .await; } async fn state_db_for_materialized_thread( diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index a2f02f1feb..a127f8a992 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -6,9 +6,7 @@ pub(crate) struct TurnRequestProcessor { thread_manager: Arc, outgoing: Arc, analytics_events_client: AnalyticsEventsClient, - arg0_paths: Arg0DispatchPaths, config: Arc, - config_manager: ConfigManager, pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, @@ -29,9 +27,7 @@ impl TurnRequestProcessor { thread_manager: Arc, outgoing: Arc, analytics_events_client: AnalyticsEventsClient, - arg0_paths: Arg0DispatchPaths, config: Arc, - config_manager: ConfigManager, pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, @@ -42,9 +38,7 @@ impl TurnRequestProcessor { thread_manager, outgoing, analytics_events_client, - arg0_paths, config, - config_manager, pending_thread_unloads, thread_state_manager, thread_watch_manager, @@ -94,13 +88,13 @@ impl TurnRequestProcessor { .map(|response| Some(response.into())) } - pub(crate) async fn queued_turn_start( + pub(crate) async fn validate_thread_queued_turn( &self, params: TurnStartParams, - ) -> Result { + ) -> Result<(), JSONRPCErrorError> { Self::validate_v2_input_limit(¶ms.input)?; - let prepared = self.prepare_turn_start(params, None, None).await?; - self.submit_prepared_turn_start(prepared, None).await + self.prepare_queued_turn_start(params).await?; + Ok(()) } pub(crate) async fn turn_steer( @@ -214,23 +208,6 @@ impl TurnRequestProcessor { Ok((thread_id, thread)) } - fn normalize_turn_start_collaboration_mode( - &self, - mut collaboration_mode: CollaborationMode, - ) -> CollaborationMode { - if collaboration_mode.settings.developer_instructions.is_none() - && let Some(instructions) = builtin_collaboration_mode_presets() - .into_iter() - .find(|preset| preset.mode == Some(collaboration_mode.mode)) - .and_then(|preset| preset.developer_instructions.flatten()) - .filter(|instructions| !instructions.is_empty()) - { - collaboration_mode.settings.developer_instructions = Some(instructions); - } - - collaboration_mode - } - fn review_request_from_target( target: ApiReviewTarget, ) -> Result<(ReviewRequest, String), JSONRPCErrorError> { @@ -282,27 +259,6 @@ impl TurnRequestProcessor { Ok((review_request, hint)) } - fn parse_environment_selections( - &self, - environments: Option>, - ) -> Result>, JSONRPCErrorError> { - let environment_selections = environments.map(|environments| { - environments - .into_iter() - .map(|environment| TurnEnvironmentSelection { - environment_id: environment.environment_id, - cwd: environment.cwd, - }) - .collect::>() - }); - if let Some(environment_selections) = environment_selections.as_ref() { - self.thread_manager - .validate_environment_selections(environment_selections) - .map_err(|err| invalid_request(environment_selection_error_message(err)))?; - } - Ok(environment_selections) - } - async fn request_trace_context( &self, request_id: &ConnectionRequestId, @@ -347,149 +303,38 @@ impl TurnRequestProcessor { app_server_client_name: Option, app_server_client_version: Option, ) -> Result { - let (thread_id, thread) = self.load_thread(¶ms.thread_id).await?; - Self::set_app_server_client_info( - thread.as_ref(), - app_server_client_name, - app_server_client_version, + self.prepare_turn_start_inner( + params, + Some((app_server_client_name, app_server_client_version)), ) - .await?; + .await + } - let collaboration_mode = params - .collaboration_mode - .map(|mode| self.normalize_turn_start_collaboration_mode(mode)); - let environment_selections = self.parse_environment_selections(params.environments)?; + async fn prepare_queued_turn_start( + &self, + params: TurnStartParams, + ) -> Result { + self.prepare_turn_start_inner(params, None).await + } - // Map v2 input items to core input items. - let mapped_items: Vec = params - .input - .into_iter() - .map(V2UserInput::into_core) - .collect(); - let turn_has_input = !mapped_items.is_empty(); - - let has_any_overrides = params.cwd.is_some() - || params.approval_policy.is_some() - || params.approvals_reviewer.is_some() - || params.sandbox_policy.is_some() - || params.permissions.is_some() - || params.model.is_some() - || params.service_tier.is_some() - || params.effort.is_some() - || params.summary.is_some() - || collaboration_mode.is_some() - || params.personality.is_some(); - - if params.sandbox_policy.is_some() && params.permissions.is_some() { - return Err(invalid_request( - "`permissions` cannot be combined with `sandboxPolicy`", - )); + async fn prepare_turn_start_inner( + &self, + params: TurnStartParams, + app_server_client_info: Option<(Option, Option)>, + ) -> Result { + let (thread_id, thread) = self.load_thread(¶ms.thread_id).await?; + if let Some((app_server_client_name, app_server_client_version)) = app_server_client_info { + Self::set_app_server_client_info( + thread.as_ref(), + app_server_client_name, + app_server_client_version, + ) + .await?; } - - let cwd = params.cwd; - let approval_policy = params.approval_policy.map(AskForApproval::to_core); - let approvals_reviewer = params - .approvals_reviewer - .map(codex_app_server_protocol::ApprovalsReviewer::to_core); - let sandbox_policy = params.sandbox_policy.map(|p| p.to_core()); - let (permission_profile, active_permission_profile) = - if let Some(permissions) = params.permissions { - let snapshot = thread.config_snapshot().await; - let mut overrides = ConfigOverrides { - cwd: cwd.clone(), - codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(), - main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(), - ..Default::default() - }; - apply_permission_profile_selection_to_config_overrides( - &mut overrides, - Some(permissions), - ); - let config = self - .config_manager - .load_for_cwd( - /*request_overrides*/ None, - overrides, - Some(snapshot.cwd.to_path_buf()), - ) - .await - .map_err(|err| config_load_error(&err))?; - // Startup config is allowed to fall back when requirements - // disallow a configured profile. An explicit turn request - // is different: reject it before accepting user input. - if let Some(warning) = config.startup_warnings.iter().find(|warning| { - warning.contains("Configured value for `permission_profile` is disallowed") - }) { - return Err(invalid_request(format!( - "invalid turn context override: {warning}" - ))); - } - ( - Some(config.permissions.permission_profile()), - config.permissions.active_permission_profile(), - ) - } else { - (None, None) - }; - let model = params.model; - let effort = params.effort.map(Some); - let summary = params.summary; - let service_tier = params.service_tier; - let personality = params.personality; - - // If any overrides are provided, validate them synchronously so the - // request can fail before accepting user input. The actual update is - // still queued together with the input below to preserve submission order. - if has_any_overrides { - thread - .validate_turn_context_overrides(CodexThreadTurnContextOverrides { - cwd: cwd.clone(), - approval_policy, - approvals_reviewer, - sandbox_policy: sandbox_policy.clone(), - permission_profile: permission_profile.clone(), - active_permission_profile: active_permission_profile.clone(), - windows_sandbox_level: None, - model: model.clone(), - effort, - summary, - service_tier: service_tier.clone(), - collaboration_mode: collaboration_mode.clone(), - personality, - }) - .await - .map_err(|err| invalid_request(format!("invalid turn context override: {err}")))?; - } - - // Start the turn by submitting the user input. Return its submission id as turn_id. - let turn_op = if has_any_overrides { - Op::UserInputWithTurnContext { - items: mapped_items, - environments: environment_selections, - final_output_json_schema: params.output_schema, - responsesapi_client_metadata: params.responsesapi_client_metadata, - cwd, - approval_policy, - approvals_reviewer, - sandbox_policy, - permission_profile, - active_permission_profile, - windows_sandbox_level: None, - model, - effort, - summary, - service_tier, - collaboration_mode, - personality, - } - } else { - Op::UserInput { - items: mapped_items, - environments: environment_selections, - final_output_json_schema: params.output_schema, - responsesapi_client_metadata: params.responsesapi_client_metadata, - } - }; + let (turn_op, turn_has_input) = thread + .prepare_turn_start_op(params) + .await + .map_err(Self::prepare_turn_start_error)?; Ok(PreparedTurnStart { thread_id, thread, @@ -498,6 +343,14 @@ impl TurnRequestProcessor { }) } + fn prepare_turn_start_error(err: CodexErr) -> JSONRPCErrorError { + match err { + CodexErr::InvalidRequest(message) => invalid_request(message), + CodexErr::Io(err) => config_load_error(&err), + err => internal_error(format!("failed to prepare turn start: {err}")), + } + } + async fn submit_prepared_turn_start( &self, prepared: PreparedTurnStart, @@ -515,15 +368,8 @@ impl TurnRequestProcessor { .map_err(|err| internal_error(format!("failed to start turn: {err}")))?; if turn_has_input { - let config_snapshot = thread.config_snapshot().await; - codex_memories_write::start_memories_startup_task( - Arc::clone(&self.thread_manager), - Arc::clone(&self.auth_manager), - thread_id, - Arc::clone(&thread), - thread.config().await, - &config_snapshot.session_source, - ); + self.start_memories_startup_task_for_thread(thread_id, Arc::clone(&thread)) + .await; } let turn = Turn { @@ -540,6 +386,22 @@ impl TurnRequestProcessor { Ok(TurnStartResponse { turn }) } + pub(crate) async fn start_memories_startup_task_for_thread( + &self, + thread_id: ThreadId, + thread: Arc, + ) { + let config_snapshot = thread.config_snapshot().await; + codex_memories_write::start_memories_startup_task( + Arc::clone(&self.thread_manager), + Arc::clone(&self.auth_manager), + thread_id, + Arc::clone(&thread), + thread.config().await, + &config_snapshot.session_source, + ); + } + async fn thread_inject_items_response_inner( &self, params: ThreadInjectItemsParams, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 9fe235640a..a80b379879 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -4,14 +4,12 @@ use crate::file_watcher::WatchRegistration; use crate::goals::ExternalGoalSet; use crate::goals::GoalRuntimeEvent; use crate::session::Codex; -use crate::session::SessionSettingsUpdate; use crate::session::SteerInputError; +use crate::session::handlers; +use codex_app_server_protocol::TurnStartParams; use codex_features::Feature; use codex_protocol::config_types::ApprovalsReviewer; -use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::Personality; -use codex_protocol::config_types::ReasoningSummary; -use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; use codex_protocol::mcp::CallToolResult; @@ -77,24 +75,6 @@ impl ThreadConfigSnapshot { } } -/// Turn context overrides that app-server validates before starting a turn. -#[derive(Clone, Default)] -pub struct CodexThreadTurnContextOverrides { - pub cwd: Option, - pub approval_policy: Option, - pub approvals_reviewer: Option, - pub sandbox_policy: Option, - pub permission_profile: Option, - pub active_permission_profile: Option, - pub windows_sandbox_level: Option, - pub model: Option, - pub effort: Option>, - pub summary: Option, - pub service_tier: Option>, - pub collaboration_mode: Option, - pub personality: Option, -} - pub struct CodexThread { pub(crate) codex: Codex, pub(crate) session_source: SessionSource, @@ -151,6 +131,17 @@ impl CodexThread { .await } + pub async fn continue_queued_turn_if_idle(&self) -> anyhow::Result<()> { + self.codex + .session + .maybe_start_thread_queued_turn_if_idle() + .await + } + + pub async fn prepare_turn_start_op(&self, params: TurnStartParams) -> CodexResult<(Op, bool)> { + handlers::prepare_turn_start_op(&self.codex.session, params).await + } + pub async fn prepare_external_goal_mutation(&self) { if let Err(err) = self .codex @@ -233,53 +224,6 @@ impl CodexThread { .await } - /// Validate persistent turn context overrides without committing them. - pub async fn validate_turn_context_overrides( - &self, - overrides: CodexThreadTurnContextOverrides, - ) -> ConstraintResult<()> { - let CodexThreadTurnContextOverrides { - cwd, - approval_policy, - approvals_reviewer, - sandbox_policy, - permission_profile, - active_permission_profile, - windows_sandbox_level, - model, - effort, - summary, - service_tier, - collaboration_mode, - personality, - } = overrides; - let collaboration_mode = if let Some(collaboration_mode) = collaboration_mode { - collaboration_mode - } else { - self.codex - .session - .collaboration_mode() - .await - .with_updates(model, effort, /*developer_instructions*/ None) - }; - - let updates = SessionSettingsUpdate { - cwd, - approval_policy, - approvals_reviewer, - sandbox_policy, - permission_profile, - active_permission_profile, - windows_sandbox_level, - collaboration_mode: Some(collaboration_mode), - reasoning_summary: summary, - service_tier, - personality, - ..Default::default() - }; - self.codex.session.validate_settings(&updates).await - } - /// Use sparingly: this is intended to be removed soon. pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.codex.submit_with_id(sub).await diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index e27002aefe..67869f63ab 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -1195,6 +1195,37 @@ impl Config { .await } + /// Rebuild this config from its current layer stack with fresh harness overrides. + pub async fn rebuild_with_overrides( + &self, + mut overrides: ConfigOverrides, + ) -> std::io::Result { + overrides.codex_self_exe = overrides + .codex_self_exe + .or_else(|| self.codex_self_exe.clone()); + overrides.codex_linux_sandbox_exe = overrides + .codex_linux_sandbox_exe + .or_else(|| self.codex_linux_sandbox_exe.clone()); + overrides.main_execve_wrapper_exe = overrides + .main_execve_wrapper_exe + .or_else(|| self.main_execve_wrapper_exe.clone()); + + let cfg: ConfigToml = self + .config_layer_stack + .effective_config() + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + + Self::load_config_with_layer_stack( + LOCAL_FS.as_ref(), + cfg, + overrides, + self.codex_home.clone(), + self.config_layer_stack.clone(), + ) + .await + } + /// This is the preferred way to create an instance of [Config]. pub async fn load_with_cli_overrides( cli_overrides: Vec<(String, TomlValue)>, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 0cdf0e2d46..116a9c282f 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -20,7 +20,6 @@ mod compact_remote; mod compact_remote_v2; mod config_lock; pub use codex_thread::CodexThread; -pub use codex_thread::CodexThreadTurnContextOverrides; pub use codex_thread::ThreadConfigSnapshot; mod agent; mod codex_delegate; @@ -57,6 +56,7 @@ pub use network_proxy_loader::MtimeConfigReloader; pub use network_proxy_loader::build_network_proxy_state; pub use network_proxy_loader::build_network_proxy_state_and_reloader; mod original_image_detail; +mod queued_turns; pub use codex_mcp::SandboxState; mod mcp_openai_file; mod mcp_tool_call; diff --git a/codex-rs/core/src/queued_turns.rs b/codex-rs/core/src/queued_turns.rs new file mode 100644 index 0000000000..5c7f2cc1b2 --- /dev/null +++ b/codex-rs/core/src/queued_turns.rs @@ -0,0 +1,122 @@ +//! Core runtime support for durable queued turns. + +use crate::StateDbHandle; +use crate::session::handlers; +use crate::session::session::Session; +use anyhow::Context; +use codex_app_server_protocol::TurnStartParams; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ThreadQueuedTurnDispatchedEvent; +use codex_rollout::state_db::reconcile_rollout; +use codex_thread_store::LocalThreadStore; +use futures::future::BoxFuture; +use std::sync::Arc; +use uuid::Uuid; + +impl Session { + /// Starts the head queued turn when this session is still idle. + pub(crate) fn maybe_start_thread_queued_turn_if_idle<'a>( + self: &'a Arc, + ) -> BoxFuture<'a, anyhow::Result<()>> { + Box::pin(async move { + let Some(state_db) = self.state_db_for_thread_queue().await? else { + return Ok(()); + }; + let Some(queued_turn) = state_db + .first_thread_queued_turn(self.conversation_id) + .await? + else { + return Ok(()); + }; + let params: TurnStartParams = + serde_json::from_str(queued_turn.turn_start_params_json.as_str()) + .context("failed to decode queued turn params")?; + let (turn_op, turn_has_input) = handlers::prepare_turn_start_op(self, params) + .await + .context("failed to prepare queued turn")?; + let turn_id = Uuid::now_v7().to_string(); + if !handlers::maybe_start_queued_turn(self, turn_id.clone(), turn_op).await { + return Ok(()); + } + + state_db + .delete_thread_queued_turn( + self.conversation_id, + queued_turn.queued_turn_id.as_str(), + ) + .await + .with_context(|| { + format!( + "failed to remove dispatched queued turn {}", + queued_turn.queued_turn_id + ) + })?; + + self.send_event_raw(Event { + id: turn_id, + msg: EventMsg::ThreadQueuedTurnDispatched(ThreadQueuedTurnDispatchedEvent { + thread_id: self.conversation_id, + turn_has_input, + }), + }) + .await; + Ok(()) + }) + } + + async fn state_db_for_thread_queue(&self) -> anyhow::Result> { + let config = self.get_config().await; + if config.ephemeral { + return Ok(None); + } + + self.try_ensure_rollout_materialized() + .await + .context("failed to materialize rollout before opening state db for thread queue")?; + + let state_db = if let Some(state_db) = self.state_db() { + state_db + } else if let Some(local_store) = self + .services + .thread_store + .as_any() + .downcast_ref::() + { + local_store.state_db().await.ok_or_else(|| { + anyhow::anyhow!( + "thread queue requires a local persisted thread with a state database" + ) + })? + } else { + anyhow::bail!("thread queue requires a local persisted thread with a state database"); + }; + + let thread_metadata_present = state_db + .get_thread(self.conversation_id) + .await + .context("failed to read thread metadata before reconciling thread queue")? + .is_some(); + if !thread_metadata_present { + let rollout_path = self + .current_rollout_path() + .await + .context("failed to locate rollout before reconciling thread queue")? + .ok_or_else(|| { + anyhow::anyhow!("thread queue requires materialized thread metadata") + })?; + reconcile_rollout( + Some(&state_db), + rollout_path.as_path(), + config.model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + } + + Ok(Some(state_db)) + } +} diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 03f29bac53..6402dc32be 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -12,8 +12,10 @@ use tracing::info_span; use crate::session::SteerInputError; use crate::session::session::Session; use crate::session::session::SessionSettingsUpdate; +use crate::state::ActiveTurn; use crate::config::Config; +use crate::config::ConfigOverrides; use crate::realtime_context::REALTIME_TURN_TOKEN_BUDGET; use crate::realtime_context::truncate_realtime_text_to_token_budget; use crate::realtime_conversation::REALTIME_USER_TEXT_PREFIX; @@ -24,6 +26,13 @@ use crate::tasks::CompactTask; use crate::tasks::UserShellCommandMode; use crate::tasks::UserShellCommandTask; use crate::tasks::execute_user_shell_command; +use codex_app_server_protocol::PermissionProfileModificationParams; +use codex_app_server_protocol::PermissionProfileSelectionParams; +use codex_app_server_protocol::TurnEnvironmentParams; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets; +use codex_protocol::error::CodexErr; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::protocol::CodexErrorInfo; @@ -43,6 +52,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortReason; +use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::WarningEvent; use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputResponse; @@ -112,7 +122,327 @@ pub(super) async fn user_input_or_turn_inner( op: Op, mirror_user_text_to_realtime: Option<()>, ) { - let (items, updates, responsesapi_client_metadata) = match op { + let (items, updates, responsesapi_client_metadata) = prepare_user_input_or_turn(sess, op).await; + + let Ok(current_context) = sess.new_turn_with_sub_id(sub_id.clone(), updates).await else { + // new_turn_with_sub_id already emits the error event. + return; + }; + sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref()) + .await; + let accepted_items = match sess + .steer_input( + items.clone(), + /*expected_turn_id*/ None, + responsesapi_client_metadata.clone(), + ) + .await + { + Ok(_) => { + current_context.session_telemetry.user_prompt(&items); + Some(items) + } + Err(SteerInputError::NoActiveTurn(items)) => { + start_regular_turn( + sess, + Arc::clone(¤t_context), + items, + responsesapi_client_metadata, + ) + .await + } + Err(err) => { + sess.send_event_raw(Event { + id: sub_id, + msg: EventMsg::Error(err.to_error_event()), + }) + .await; + None + } + }; + if let (Some(items), Some(())) = (accepted_items, mirror_user_text_to_realtime) { + self::mirror_user_text_to_realtime(sess, &items).await; + } +} + +pub(crate) async fn maybe_start_queued_turn(sess: &Arc, sub_id: String, op: Op) -> bool { + let (items, updates, responsesapi_client_metadata) = prepare_user_input_or_turn(sess, op).await; + let reserved_turn_state = { + let mut active_turn = sess.active_turn.lock().await; + if active_turn.is_some() { + return false; + } + let active_turn = active_turn.get_or_insert_with(ActiveTurn::default); + Arc::clone(&active_turn.turn_state) + }; + let Ok(current_context) = sess.new_turn_with_sub_id(sub_id, updates).await else { + clear_reserved_queued_turn(sess, &reserved_turn_state).await; + return false; + }; + sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref()) + .await; + let still_reserved = { + let active_turn = sess.active_turn.lock().await; + active_turn.as_ref().is_some_and(|active_turn| { + active_turn.tasks.is_empty() + && Arc::ptr_eq(&active_turn.turn_state, &reserved_turn_state) + }) + }; + if !still_reserved { + clear_reserved_queued_turn(sess, &reserved_turn_state).await; + return false; + } + start_reserved_regular_turn(sess, current_context, items, responsesapi_client_metadata) + .await + .is_some() +} + +pub(crate) async fn prepare_turn_start_op( + sess: &Arc, + params: TurnStartParams, +) -> codex_protocol::error::Result<(Op, bool)> { + if params.thread_id != sess.conversation_id.to_string() { + return Err(CodexErr::InvalidRequest( + "turnStartParams.threadId must match the active thread".to_string(), + )); + } + + let collaboration_mode = params + .collaboration_mode + .map(normalize_turn_start_collaboration_mode); + let environment_selections = parse_turn_environment_selections(sess, params.environments)?; + let mapped_items = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect::>(); + let turn_has_input = !mapped_items.is_empty(); + + let has_any_overrides = params.cwd.is_some() + || params.approval_policy.is_some() + || params.approvals_reviewer.is_some() + || params.sandbox_policy.is_some() + || params.permissions.is_some() + || params.model.is_some() + || params.service_tier.is_some() + || params.effort.is_some() + || params.summary.is_some() + || collaboration_mode.is_some() + || params.personality.is_some(); + + if params.sandbox_policy.is_some() && params.permissions.is_some() { + return Err(CodexErr::InvalidRequest( + "`permissions` cannot be combined with `sandboxPolicy`".to_string(), + )); + } + + let cwd = params.cwd; + let approval_policy = params.approval_policy.map(|policy| policy.to_core()); + let approvals_reviewer = params.approvals_reviewer.map(|reviewer| reviewer.to_core()); + let sandbox_policy = params.sandbox_policy.map(|policy| policy.to_core()); + let (permission_profile, active_permission_profile) = + resolve_turn_permission_selection(sess, cwd.clone(), params.permissions).await?; + let model = params.model; + let effort = params.effort.map(Some); + let summary = params.summary; + let service_tier = params.service_tier; + let personality = params.personality; + + if has_any_overrides { + validate_turn_start_settings( + sess, + cwd.clone(), + approval_policy, + approvals_reviewer, + sandbox_policy.clone(), + permission_profile.clone(), + active_permission_profile.clone(), + model.clone(), + effort, + summary, + service_tier.clone(), + collaboration_mode.clone(), + personality, + ) + .await?; + } + + let turn_op = if has_any_overrides { + Op::UserInputWithTurnContext { + items: mapped_items, + environments: environment_selections, + final_output_json_schema: params.output_schema, + responsesapi_client_metadata: params.responsesapi_client_metadata, + cwd, + approval_policy, + approvals_reviewer, + sandbox_policy, + permission_profile, + active_permission_profile, + windows_sandbox_level: None, + model, + effort, + summary, + service_tier, + collaboration_mode, + personality, + } + } else { + Op::UserInput { + items: mapped_items, + environments: environment_selections, + final_output_json_schema: params.output_schema, + responsesapi_client_metadata: params.responsesapi_client_metadata, + } + }; + + Ok((turn_op, turn_has_input)) +} + +fn normalize_turn_start_collaboration_mode( + mut collaboration_mode: CollaborationMode, +) -> CollaborationMode { + if collaboration_mode.settings.developer_instructions.is_none() + && let Some(instructions) = builtin_collaboration_mode_presets() + .into_iter() + .find(|preset| preset.mode == Some(collaboration_mode.mode)) + .and_then(|preset| preset.developer_instructions.flatten()) + .filter(|instructions| !instructions.is_empty()) + { + collaboration_mode.settings.developer_instructions = Some(instructions); + } + + collaboration_mode +} + +fn parse_turn_environment_selections( + sess: &Arc, + environments: Option>, +) -> codex_protocol::error::Result>> { + let environment_selections = environments.map(|environments| { + environments + .into_iter() + .map(|environment| TurnEnvironmentSelection { + environment_id: environment.environment_id, + cwd: environment.cwd, + }) + .collect::>() + }); + + if let Some(environment_selections) = environment_selections.as_ref() { + crate::environment_selection::resolve_environment_selections( + sess.services.environment_manager.as_ref(), + environment_selections, + )?; + } + + Ok(environment_selections) +} + +async fn resolve_turn_permission_selection( + sess: &Arc, + cwd: Option, + permissions: Option, +) -> codex_protocol::error::Result<( + Option, + Option, +)> { + let Some(permissions) = permissions else { + return Ok((None, None)); + }; + + let snapshot = sess.thread_config_snapshot().await; + let config = sess.get_config().await; + let mut overrides = ConfigOverrides { + cwd: cwd.or_else(|| Some(snapshot.cwd.to_path_buf())), + ..Default::default() + }; + apply_permission_profile_selection_to_config_overrides(&mut overrides, permissions); + let config = config.rebuild_with_overrides(overrides).await?; + if let Some(warning) = config + .startup_warnings + .iter() + .find(|warning| warning.contains("Configured value for `permission_profile` is disallowed")) + { + return Err(CodexErr::InvalidRequest(format!( + "invalid turn context override: {warning}" + ))); + } + + Ok(( + Some(config.permissions.permission_profile()), + config.permissions.active_permission_profile(), + )) +} + +fn apply_permission_profile_selection_to_config_overrides( + overrides: &mut ConfigOverrides, + permissions: PermissionProfileSelectionParams, +) { + let PermissionProfileSelectionParams::Profile { id, modifications } = permissions; + overrides.default_permissions = Some(id); + overrides + .additional_writable_roots + .extend(modifications.unwrap_or_default().into_iter().map( + |modification| match modification { + PermissionProfileModificationParams::AdditionalWritableRoot { path } => { + path.to_path_buf() + } + }, + )); +} + +#[allow(clippy::too_many_arguments)] +async fn validate_turn_start_settings( + sess: &Arc, + cwd: Option, + approval_policy: Option, + approvals_reviewer: Option, + sandbox_policy: Option, + permission_profile: Option, + active_permission_profile: Option, + model: Option, + effort: Option>, + summary: Option, + service_tier: Option>, + collaboration_mode: Option, + personality: Option, +) -> codex_protocol::error::Result<()> { + let collaboration_mode = if let Some(collaboration_mode) = collaboration_mode { + collaboration_mode + } else { + sess.collaboration_mode() + .await + .with_updates(model, effort, /*developer_instructions*/ None) + }; + + sess.validate_settings(&SessionSettingsUpdate { + cwd, + approval_policy, + approvals_reviewer, + sandbox_policy, + permission_profile, + active_permission_profile, + windows_sandbox_level: None, + collaboration_mode: Some(collaboration_mode), + reasoning_summary: summary, + service_tier, + personality, + ..Default::default() + }) + .await + .map_err(|err| CodexErr::InvalidRequest(format!("invalid turn context override: {err}"))) +} + +async fn prepare_user_input_or_turn( + sess: &Arc, + op: Op, +) -> ( + Vec, + SessionSettingsUpdate, + Option>, +) { + match op { Op::UserTurn { cwd, approval_policy, @@ -228,58 +558,58 @@ pub(super) async fn user_input_or_turn_inner( responsesapi_client_metadata, ), _ => unreachable!(), - }; + } +} - let Ok(current_context) = sess.new_turn_with_sub_id(sub_id.clone(), updates).await else { - // new_turn_with_sub_id already emits the error event. - return; - }; - sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref()) +async fn start_regular_turn( + sess: &Arc, + current_context: Arc, + items: Vec, + responsesapi_client_metadata: Option>, +) -> Option> { + if let Some(responsesapi_client_metadata) = responsesapi_client_metadata { + current_context + .turn_metadata_state + .set_responsesapi_client_metadata(responsesapi_client_metadata); + } + current_context.session_telemetry.user_prompt(&items); + sess.refresh_mcp_servers_if_requested(¤t_context, Some(sess.mcp_elicitation_reviewer())) .await; - let accepted_items = match sess - .steer_input( - items.clone(), - /*expected_turn_id*/ None, - responsesapi_client_metadata.clone(), - ) - .await - { - Ok(_) => { - current_context.session_telemetry.user_prompt(&items); - Some(items) - } - Err(SteerInputError::NoActiveTurn(items)) => { - if let Some(responsesapi_client_metadata) = responsesapi_client_metadata { - current_context - .turn_metadata_state - .set_responsesapi_client_metadata(responsesapi_client_metadata); - } - current_context.session_telemetry.user_prompt(&items); - sess.refresh_mcp_servers_if_requested( - ¤t_context, - Some(sess.mcp_elicitation_reviewer()), - ) - .await; - let accepted_items = items.clone(); - sess.spawn_task( - Arc::clone(¤t_context), - items, - crate::tasks::RegularTask::new(), - ) - .await; - Some(accepted_items) - } - Err(err) => { - sess.send_event_raw(Event { - id: sub_id, - msg: EventMsg::Error(err.to_error_event()), - }) - .await; - None - } - }; - if let (Some(items), Some(())) = (accepted_items, mirror_user_text_to_realtime) { - self::mirror_user_text_to_realtime(sess, &items).await; + let accepted_items = items.clone(); + sess.spawn_task(current_context, items, crate::tasks::RegularTask::new()) + .await; + Some(accepted_items) +} + +async fn start_reserved_regular_turn( + sess: &Arc, + current_context: Arc, + items: Vec, + responsesapi_client_metadata: Option>, +) -> Option> { + if let Some(responsesapi_client_metadata) = responsesapi_client_metadata { + current_context + .turn_metadata_state + .set_responsesapi_client_metadata(responsesapi_client_metadata); + } + current_context.session_telemetry.user_prompt(&items); + sess.refresh_mcp_servers_if_requested(¤t_context, Some(sess.mcp_elicitation_reviewer())) + .await; + let accepted_items = items.clone(); + sess.start_task(current_context, items, crate::tasks::RegularTask::new()) + .await; + Some(accepted_items) +} + +async fn clear_reserved_queued_turn( + sess: &Arc, + reserved_turn_state: &Arc>, +) { + let mut active_turn = sess.active_turn.lock().await; + if active_turn.as_ref().is_some_and(|active_turn| { + active_turn.tasks.is_empty() && Arc::ptr_eq(&active_turn.turn_state, reserved_turn_state) + }) { + *active_turn = None; } } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 6d4b275421..4e9ffeaa78 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -185,7 +185,7 @@ use codex_protocol::error::Result as CodexResult; use codex_protocol::exec_output::StreamOutput; mod config_lock; -mod handlers; +pub(crate) mod handlers; mod mcp; mod multi_agents; mod review; @@ -1403,6 +1403,11 @@ impl Session { .clone() } + pub(crate) async fn thread_config_snapshot(&self) -> ThreadConfigSnapshot { + let state = self.state.lock().await; + state.session_configuration.thread_config_snapshot() + } + pub(crate) async fn provider(&self) -> ModelProviderInfo { let state = self.state.lock().await; state.session_configuration.provider.clone() diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index b63b16cbf4..1a7d9d5146 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -7327,6 +7327,31 @@ async fn steer_input_requires_active_turn() { assert!(matches!(err, SteerInputError::NoActiveTurn(_))); } +#[tokio::test] +async fn queued_turn_start_only_claims_one_idle_turn() { + let (sess, _tc, _rx) = make_session_and_context_with_rx().await; + let queued_turn_op = || Op::UserInput { + items: vec![UserInput::Text { + text: "queued".to_string(), + text_elements: Vec::new(), + }], + environments: None, + final_output_json_schema: None, + responsesapi_client_metadata: None, + }; + + assert!( + super::handlers::maybe_start_queued_turn(&sess, "queued-1".to_string(), queued_turn_op(),) + .await + ); + assert!( + !super::handlers::maybe_start_queued_turn(&sess, "queued-2".to_string(), queued_turn_op(),) + .await + ); + + sess.abort_all_tasks(TurnAbortReason::Interrupted).await; +} + #[tokio::test] async fn steer_input_enforces_expected_turn_id() { let (sess, tc, _rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 1723904cff..42231b168a 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1477,6 +1477,7 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { | EventMsg::AgentReasoningSectionBreak(_) | EventMsg::SessionConfigured(_) | EventMsg::ThreadGoalUpdated(_) + | EventMsg::ThreadQueuedTurnDispatched(_) | EventMsg::McpStartupUpdate(_) | EventMsg::McpStartupComplete(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index bb7a79f58e..85843ee350 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -774,6 +774,9 @@ impl Session { if !cleared_active_turn { return; } + if let Err(err) = self.maybe_start_thread_queued_turn_if_idle().await { + warn!("failed to continue queued turn after turn completion: {err}"); + } if let Err(err) = self .goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle) .await diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 30e33abe43..69e51da820 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1329,6 +1329,9 @@ pub enum EventMsg { /// Updated long-running goal metadata for the thread. ThreadGoalUpdated(ThreadGoalUpdatedEvent), + /// Core started and removed a durable queued turn. + ThreadQueuedTurnDispatched(ThreadQueuedTurnDispatchedEvent), + /// Incremental MCP startup progress updates. McpStartupUpdate(McpStartupUpdateEvent), @@ -3590,6 +3593,14 @@ pub struct ThreadGoalUpdatedEvent { pub goal: ThreadGoal, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "protocol/")] +pub struct ThreadQueuedTurnDispatchedEvent { + pub thread_id: ThreadId, + pub turn_has_input: bool, +} + /// User's decision in response to an ExecApprovalRequest. #[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq, Display, JsonSchema, TS)] #[serde(rename_all = "snake_case")] diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs index 3d52798b8d..ce98ff8f1c 100644 --- a/codex-rs/rollout-trace/src/protocol_event.rs +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -227,6 +227,7 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option Option<&'static s | EventMsg::AgentReasoningRawContent(_) | EventMsg::AgentReasoningSectionBreak(_) | EventMsg::ThreadGoalUpdated(_) + | EventMsg::ThreadQueuedTurnDispatched(_) | EventMsg::McpStartupUpdate(_) | EventMsg::McpStartupComplete(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 558c3fef98..f11c15d84e 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -141,6 +141,7 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::RawResponseItem(_) | EventMsg::SessionConfigured(_) | EventMsg::ThreadGoalUpdated(_) + | EventMsg::ThreadQueuedTurnDispatched(_) | EventMsg::McpToolCallBegin(_) | EventMsg::ExecCommandBegin(_) | EventMsg::TerminalInteraction(_)