use super::*; #[derive(Clone)] pub(crate) struct TurnRequestProcessor { auth_manager: Arc, thread_manager: Arc, outgoing: Arc, analytics_events_client: AnalyticsEventsClient, arg0_paths: Arg0DispatchPaths, config: Arc, config_manager: ConfigManager, pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, skills_watcher: Arc, } fn resolve_runtime_workspace_roots( workspace_roots: Vec, base_cwd: &AbsolutePathBuf, ) -> Vec { let mut resolved_roots = Vec::new(); for path in workspace_roots { let root = AbsolutePathBuf::resolve_path_against_base(path, base_cwd.as_path()); if !resolved_roots.iter().any(|existing| existing == &root) { resolved_roots.push(root); } } resolved_roots } impl TurnRequestProcessor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( auth_manager: Arc, thread_manager: Arc, outgoing: Arc, analytics_events_client: AnalyticsEventsClient, arg0_paths: Arg0DispatchPaths, config: Arc, config_manager: ConfigManager, pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, skills_watcher: Arc, ) -> Self { Self { auth_manager, thread_manager, outgoing, analytics_events_client, arg0_paths, config, config_manager, pending_thread_unloads, thread_state_manager, thread_watch_manager, thread_list_state_permit, skills_watcher, } } pub(crate) async fn turn_start( &self, request_id: ConnectionRequestId, params: TurnStartParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result, JSONRPCErrorError> { self.turn_start_inner( request_id, params, app_server_client_name, app_server_client_version, ) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_inject_items( &self, params: ThreadInjectItemsParams, ) -> Result, JSONRPCErrorError> { self.thread_inject_items_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn turn_steer( &self, request_id: &ConnectionRequestId, params: TurnSteerParams, ) -> Result, JSONRPCErrorError> { self.turn_steer_inner(request_id, params) .await .map(|response| Some(response.into())) } pub(crate) async fn turn_interrupt( &self, request_id: &ConnectionRequestId, params: TurnInterruptParams, ) -> Result, JSONRPCErrorError> { self.turn_interrupt_inner(request_id, params) .await .map(|response| response.map(Into::into)) } pub(crate) async fn thread_realtime_start( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeStartParams, ) -> Result, JSONRPCErrorError> { self.thread_realtime_start_inner(request_id, params) .await .map(|response| response.map(Into::into)) } pub(crate) async fn thread_realtime_append_audio( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeAppendAudioParams, ) -> Result, JSONRPCErrorError> { self.thread_realtime_append_audio_inner(request_id, params) .await .map(|response| response.map(Into::into)) } pub(crate) async fn thread_realtime_append_text( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeAppendTextParams, ) -> Result, JSONRPCErrorError> { self.thread_realtime_append_text_inner(request_id, params) .await .map(|response| response.map(Into::into)) } pub(crate) async fn thread_realtime_stop( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeStopParams, ) -> Result, JSONRPCErrorError> { self.thread_realtime_stop_inner(request_id, params) .await .map(|response| response.map(Into::into)) } pub(crate) async fn thread_realtime_list_voices( &self, ) -> Result, JSONRPCErrorError> { Ok(Some( ThreadRealtimeListVoicesResponse { voices: RealtimeVoicesList::builtin(), } .into(), )) } pub(crate) async fn review_start( &self, request_id: &ConnectionRequestId, params: ReviewStartParams, ) -> Result, JSONRPCErrorError> { self.review_start_inner(request_id, params) .await .map(|()| None) } fn track_error_response( &self, request_id: &ConnectionRequestId, error: &JSONRPCErrorError, error_type: Option, ) { self.analytics_events_client.track_error_response( request_id.connection_id.0, request_id.request_id.clone(), error.clone(), error_type, ); } async fn load_thread( &self, thread_id: &str, ) -> Result<(ThreadId, Arc), JSONRPCErrorError> { // Resolve the core conversation handle from a v2 thread id string. let thread_id = ThreadId::from_string(thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let thread = self .thread_manager .get_thread(thread_id) .await .map_err(|_| invalid_request(format!("thread not found: {thread_id}")))?; Ok((thread_id, thread)) } fn normalize_turn_start_collaboration_mode( &self, mut collaboration_mode: CollaborationMode, ) -> CollaborationMode { if collaboration_mode.settings.developer_instructions.is_none() && let Some(instructions) = builtin_collaboration_mode_presets() .into_iter() .find(|preset| preset.mode == Some(collaboration_mode.mode)) .and_then(|preset| preset.developer_instructions.flatten()) .filter(|instructions| !instructions.is_empty()) { collaboration_mode.settings.developer_instructions = Some(instructions); } collaboration_mode } fn review_request_from_target( target: ApiReviewTarget, ) -> Result<(ReviewRequest, String), JSONRPCErrorError> { let cleaned_target = match target { ApiReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges, ApiReviewTarget::BaseBranch { branch } => { let branch = branch.trim().to_string(); if branch.is_empty() { return Err(invalid_request("branch must not be empty".to_string())); } ApiReviewTarget::BaseBranch { branch } } ApiReviewTarget::Commit { sha, title } => { let sha = sha.trim().to_string(); if sha.is_empty() { return Err(invalid_request("sha must not be empty".to_string())); } let title = title .map(|t| t.trim().to_string()) .filter(|t| !t.is_empty()); ApiReviewTarget::Commit { sha, title } } ApiReviewTarget::Custom { instructions } => { let trimmed = instructions.trim().to_string(); if trimmed.is_empty() { return Err(invalid_request( "instructions must not be empty".to_string(), )); } ApiReviewTarget::Custom { instructions: trimmed, } } }; let core_target = match cleaned_target { ApiReviewTarget::UncommittedChanges => CoreReviewTarget::UncommittedChanges, ApiReviewTarget::BaseBranch { branch } => CoreReviewTarget::BaseBranch { branch }, ApiReviewTarget::Commit { sha, title } => CoreReviewTarget::Commit { sha, title }, ApiReviewTarget::Custom { instructions } => CoreReviewTarget::Custom { instructions }, }; let hint = codex_core::review_prompts::user_facing_hint(&core_target); let review_request = ReviewRequest { target: core_target, user_facing_hint: Some(hint.clone()), }; Ok((review_request, hint)) } fn parse_environment_selections( &self, environments: Option>, ) -> Result>, JSONRPCErrorError> { let environment_selections = environments.map(|environments| { environments .into_iter() .map(|environment| TurnEnvironmentSelection { environment_id: environment.environment_id, cwd: environment.cwd, }) .collect::>() }); if let Some(environment_selections) = environment_selections.as_ref() { self.thread_manager .validate_environment_selections(environment_selections) .map_err(|err| invalid_request(environment_selection_error_message(err)))?; } Ok(environment_selections) } async fn request_trace_context( &self, request_id: &ConnectionRequestId, ) -> Option { self.outgoing.request_trace_context(request_id).await } async fn submit_core_op( &self, request_id: &ConnectionRequestId, thread: &CodexThread, op: Op, ) -> CodexResult { thread .submit_with_trace(op, self.request_trace_context(request_id).await) .await } fn input_too_large_error(actual_chars: usize) -> JSONRPCErrorError { let mut error = invalid_params(format!( "Input exceeds the maximum length of {MAX_USER_INPUT_TEXT_CHARS} characters." )); error.data = Some(serde_json::json!({ "input_error_code": INPUT_TOO_LARGE_ERROR_CODE, "max_chars": MAX_USER_INPUT_TEXT_CHARS, "actual_chars": actual_chars, })); error } fn validate_v2_input_limit(items: &[V2UserInput]) -> Result<(), JSONRPCErrorError> { let actual_chars: usize = items.iter().map(V2UserInput::text_char_count).sum(); if actual_chars > MAX_USER_INPUT_TEXT_CHARS { return Err(Self::input_too_large_error(actual_chars)); } Ok(()) } async fn turn_start_inner( &self, request_id: ConnectionRequestId, params: TurnStartParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result { if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { self.track_error_response( &request_id, &error, Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), ); return Err(error); } let (thread_id, thread) = self.load_thread(¶ms.thread_id) .await .inspect_err(|error| { self.track_error_response(&request_id, error, /*error_type*/ None); })?; Self::set_app_server_client_info( thread.as_ref(), app_server_client_name, app_server_client_version, ) .await .inspect_err(|error| { self.track_error_response(&request_id, error, /*error_type*/ None); })?; let collaboration_mode = params .collaboration_mode .map(|mode| self.normalize_turn_start_collaboration_mode(mode)); let environment_selections = self.parse_environment_selections(params.environments)?; // Map v2 input items to core input items. let mapped_items: Vec = params .input .into_iter() .map(V2UserInput::into_core) .collect(); let turn_has_input = !mapped_items.is_empty(); let runtime_workspace_roots_request = params.runtime_workspace_roots.clone(); let snapshot = if params.permissions.is_some() || runtime_workspace_roots_request.is_some() { Some(thread.config_snapshot().await) } else { None }; let has_any_overrides = params.cwd.is_some() || runtime_workspace_roots_request.is_some() || params.approval_policy.is_some() || params.approvals_reviewer.is_some() || params.sandbox_policy.is_some() || params.permissions.is_some() || params.model.is_some() || params.service_tier.is_some() || params.effort.is_some() || params.summary.is_some() || collaboration_mode.is_some() || params.personality.is_some(); if params.sandbox_policy.is_some() && params.permissions.is_some() { return Err(invalid_request( "`permissions` cannot be combined with `sandboxPolicy`", )); } let cwd = params.cwd; let runtime_workspace_roots = if let Some(workspace_roots) = runtime_workspace_roots_request.clone() { let Some(snapshot) = snapshot.as_ref() else { return Err(internal_error( "turn/start runtime workspace roots missing thread snapshot", )); }; let base_cwd = cwd .as_ref() .map(|cwd| AbsolutePathBuf::resolve_path_against_base(cwd, snapshot.cwd.as_path())) .unwrap_or_else(|| snapshot.cwd.clone()); Some(resolve_runtime_workspace_roots(workspace_roots, &base_cwd)) } else { None }; let approval_policy = params.approval_policy.map(AskForApproval::to_core); let approvals_reviewer = params .approvals_reviewer .map(codex_app_server_protocol::ApprovalsReviewer::to_core); let sandbox_policy = params.sandbox_policy.map(|p| p.to_core()); let (permission_profile, active_permission_profile, profile_workspace_roots) = if let Some(permissions) = params.permissions { let Some(snapshot) = snapshot.as_ref() else { return Err(internal_error( "turn/start permission selection missing thread snapshot", )); }; let mut overrides = ConfigOverrides { cwd: cwd.clone(), workspace_roots: Some(runtime_workspace_roots_request.clone().unwrap_or_else( || { snapshot .workspace_roots .iter() .map(AbsolutePathBuf::to_path_buf) .collect() }, )), codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(), main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(), ..Default::default() }; apply_permission_profile_selection_to_config_overrides( &mut overrides, Some(permissions), ); let config = self .config_manager .load_for_cwd( /*request_overrides*/ None, overrides, Some(snapshot.cwd.to_path_buf()), ) .await .map_err(|err| config_load_error(&err))?; // Startup config is allowed to fall back when requirements // disallow a configured profile. An explicit turn request // is different: reject it before accepting user input. if let Some(warning) = config.startup_warnings.iter().find(|warning| { warning.contains("Configured value for `permission_profile` is disallowed") }) { return Err(invalid_request(format!( "invalid turn context override: {warning}" ))); } ( Some(config.permissions.permission_profile().clone()), config.permissions.active_permission_profile(), Some(config.permissions.profile_workspace_roots().to_vec()), ) } else { (None, None, None) }; let model = params.model; let effort = params.effort.map(Some); let summary = params.summary; let service_tier = params.service_tier; let personality = params.personality; // If any overrides are provided, validate them synchronously so the // request can fail before accepting user input. The actual update is // still queued together with the input below to preserve submission order. if has_any_overrides { thread .validate_turn_context_overrides(CodexThreadTurnContextOverrides { cwd: cwd.clone(), workspace_roots: runtime_workspace_roots.clone(), approval_policy, approvals_reviewer, sandbox_policy: sandbox_policy.clone(), permission_profile: permission_profile.clone(), active_permission_profile: active_permission_profile.clone(), profile_workspace_roots: profile_workspace_roots.clone(), windows_sandbox_level: None, model: model.clone(), effort, summary, service_tier: service_tier.clone(), collaboration_mode: collaboration_mode.clone(), personality, }) .await .map_err(|err| invalid_request(format!("invalid turn context override: {err}")))?; } // Start the turn by submitting the user input. Return its submission id as turn_id. let turn_op = if has_any_overrides { Op::UserInputWithTurnContext { items: mapped_items, environments: environment_selections, final_output_json_schema: params.output_schema, responsesapi_client_metadata: params.responsesapi_client_metadata, cwd, workspace_roots: runtime_workspace_roots, profile_workspace_roots, approval_policy, approvals_reviewer, sandbox_policy, permission_profile, active_permission_profile, windows_sandbox_level: None, model, effort, summary, service_tier, collaboration_mode, personality, } } else { Op::UserInput { items: mapped_items, environments: environment_selections, final_output_json_schema: params.output_schema, responsesapi_client_metadata: params.responsesapi_client_metadata, } }; let turn_id = self .submit_core_op(&request_id, thread.as_ref(), turn_op) .await .map_err(|err| { let error = internal_error(format!("failed to start turn: {err}")); self.track_error_response(&request_id, &error, /*error_type*/ None); error })?; if turn_has_input { let config_snapshot = thread.config_snapshot().await; codex_memories_write::start_memories_startup_task( Arc::clone(&self.thread_manager), Arc::clone(&self.auth_manager), thread_id, Arc::clone(&thread), thread.config().await, &config_snapshot.session_source, ); } self.outgoing .record_request_turn_id(&request_id, &turn_id) .await; let turn = Turn { id: turn_id, items: vec![], items_view: TurnItemsView::NotLoaded, error: None, status: TurnStatus::InProgress, started_at: None, completed_at: None, duration_ms: None, }; Ok(TurnStartResponse { turn }) } async fn thread_inject_items_response_inner( &self, params: ThreadInjectItemsParams, ) -> Result { let (_, thread) = self.load_thread(¶ms.thread_id).await?; let items = params .items .into_iter() .enumerate() .map(|(index, value)| { serde_json::from_value::(value) .map_err(|err| format!("items[{index}] is not a valid response item: {err}")) }) .collect::, _>>() .map_err(invalid_request)?; thread .inject_response_items(items) .await .map_err(|err| match err { CodexErr::InvalidRequest(message) => invalid_request(message), err => internal_error(format!("failed to inject response items: {err}")), })?; Ok(ThreadInjectItemsResponse {}) } async fn set_app_server_client_info( thread: &CodexThread, app_server_client_name: Option, app_server_client_version: Option, ) -> Result<(), JSONRPCErrorError> { let mcp_elicitations_auto_deny = xcode_26_4_mcp_elicitations_auto_deny( app_server_client_name.as_deref(), app_server_client_version.as_deref(), ); thread .set_app_server_client_info( app_server_client_name, app_server_client_version, mcp_elicitations_auto_deny, ) .await .map_err(|err| internal_error(format!("failed to set app server client info: {err}"))) } async fn turn_steer_inner( &self, request_id: &ConnectionRequestId, params: TurnSteerParams, ) -> Result { let (_, thread) = self .load_thread(¶ms.thread_id) .await .inspect_err(|error| { self.track_error_response(request_id, error, /*error_type*/ None); })?; if params.expected_turn_id.is_empty() { return Err(invalid_request("expectedTurnId must not be empty")); } self.outgoing .record_request_turn_id(request_id, ¶ms.expected_turn_id) .await; if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { self.track_error_response( request_id, &error, Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), ); return Err(error); } let mapped_items: Vec = params .input .into_iter() .map(V2UserInput::into_core) .collect(); let turn_id = thread .steer_input( mapped_items, Some(¶ms.expected_turn_id), params.responsesapi_client_metadata, ) .await .map_err(|err| { let (message, data, error_type) = match err { SteerInputError::NoActiveTurn(_) => ( "no active turn to steer".to_string(), None, Some(AnalyticsJsonRpcError::TurnSteer( TurnSteerRequestError::NoActiveTurn, )), ), SteerInputError::ExpectedTurnMismatch { expected, actual } => ( format!("expected active turn id `{expected}` but found `{actual}`"), None, Some(AnalyticsJsonRpcError::TurnSteer( TurnSteerRequestError::ExpectedTurnMismatch, )), ), SteerInputError::ActiveTurnNotSteerable { turn_kind } => { let (message, turn_steer_error) = match turn_kind { codex_protocol::protocol::NonSteerableTurnKind::Review => ( "cannot steer a review turn".to_string(), TurnSteerRequestError::NonSteerableReview, ), codex_protocol::protocol::NonSteerableTurnKind::Compact => ( "cannot steer a compact turn".to_string(), TurnSteerRequestError::NonSteerableCompact, ), }; let error = TurnError { message: message.clone(), codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable { turn_kind: turn_kind.into(), }), additional_details: None, }; let data = match serde_json::to_value(error) { Ok(data) => Some(data), Err(error) => { tracing::error!( ?error, "failed to serialize active-turn-not-steerable turn error" ); None } }; ( message, data, Some(AnalyticsJsonRpcError::TurnSteer(turn_steer_error)), ) } SteerInputError::EmptyInput => ( "input must not be empty".to_string(), None, Some(AnalyticsJsonRpcError::Input(InputError::Empty)), ), }; let mut error = invalid_request(message); error.data = data; self.track_error_response(request_id, &error, error_type); error })?; Ok(TurnSteerResponse { turn_id }) } async fn prepare_realtime_conversation_thread( &self, request_id: &ConnectionRequestId, thread_id: &str, ) -> Result)>, JSONRPCErrorError> { let (thread_id, thread) = self.load_thread(thread_id).await?; match self .ensure_conversation_listener( thread_id, request_id.connection_id, /*raw_events_enabled*/ false, ) .await { Ok(EnsureConversationListenerResult::Attached) => {} Ok(EnsureConversationListenerResult::ConnectionClosed) => { return Ok(None); } Err(error) => return Err(error), } if !thread.enabled(Feature::RealtimeConversation) { return Err(invalid_request(format!( "thread {thread_id} does not support realtime conversation" ))); } Ok(Some((thread_id, thread))) } async fn thread_realtime_start_inner( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeStartParams, ) -> Result, JSONRPCErrorError> { let Some((_, thread)) = self .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) .await? else { return Ok(None); }; self.submit_core_op( request_id, thread.as_ref(), Op::RealtimeConversationStart(ConversationStartParams { output_modality: params.output_modality, prompt: params.prompt, realtime_session_id: params.realtime_session_id, transport: params.transport.map(|transport| match transport { ThreadRealtimeStartTransport::Websocket => { ConversationStartTransport::Websocket } ThreadRealtimeStartTransport::Webrtc { sdp } => { ConversationStartTransport::Webrtc { sdp } } }), voice: params.voice, }), ) .await .map_err(|err| internal_error(format!("failed to start realtime conversation: {err}")))?; Ok(Some(ThreadRealtimeStartResponse::default())) } async fn thread_realtime_append_audio_inner( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeAppendAudioParams, ) -> Result, JSONRPCErrorError> { let Some((_, thread)) = self .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) .await? else { return Ok(None); }; self.submit_core_op( request_id, thread.as_ref(), Op::RealtimeConversationAudio(ConversationAudioParams { frame: params.audio.into(), }), ) .await .map_err(|err| { internal_error(format!( "failed to append realtime conversation audio: {err}" )) })?; Ok(Some(ThreadRealtimeAppendAudioResponse::default())) } async fn thread_realtime_append_text_inner( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeAppendTextParams, ) -> Result, JSONRPCErrorError> { let Some((_, thread)) = self .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) .await? else { return Ok(None); }; self.submit_core_op( request_id, thread.as_ref(), Op::RealtimeConversationText(ConversationTextParams { text: params.text }), ) .await .map_err(|err| { internal_error(format!( "failed to append realtime conversation text: {err}" )) })?; Ok(Some(ThreadRealtimeAppendTextResponse::default())) } async fn thread_realtime_stop_inner( &self, request_id: &ConnectionRequestId, params: ThreadRealtimeStopParams, ) -> Result, JSONRPCErrorError> { let Some((_, thread)) = self .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) .await? else { return Ok(None); }; self.submit_core_op(request_id, thread.as_ref(), Op::RealtimeConversationClose) .await .map_err(|err| { internal_error(format!("failed to stop realtime conversation: {err}")) })?; Ok(Some(ThreadRealtimeStopResponse::default())) } fn build_review_turn(turn_id: String, display_text: &str) -> Turn { let items = if display_text.is_empty() { Vec::new() } else { vec![ThreadItem::UserMessage { id: turn_id.clone(), content: vec![V2UserInput::Text { text: display_text.to_string(), // Review prompt display text is synthesized; no UI element ranges to preserve. text_elements: Vec::new(), }], }] }; Turn { id: turn_id, items, items_view: TurnItemsView::NotLoaded, error: None, status: TurnStatus::InProgress, started_at: None, completed_at: None, duration_ms: None, } } async fn emit_review_started( &self, request_id: &ConnectionRequestId, turn: Turn, review_thread_id: String, ) { let response = ReviewStartResponse { turn, review_thread_id, }; self.outgoing .send_response(request_id.clone(), response) .await; } async fn start_inline_review( &self, request_id: &ConnectionRequestId, parent_thread: Arc, review_request: ReviewRequest, display_text: &str, parent_thread_id: String, ) -> std::result::Result<(), JSONRPCErrorError> { let turn_id = self .submit_core_op( request_id, parent_thread.as_ref(), Op::Review { review_request }, ) .await .map_err(|err| internal_error(format!("failed to start review: {err}")))?; let turn = Self::build_review_turn(turn_id, display_text); self.emit_review_started(request_id, turn, parent_thread_id) .await; Ok(()) } async fn start_detached_review( &self, request_id: &ConnectionRequestId, parent_thread_id: ThreadId, parent_thread: Arc, review_request: ReviewRequest, display_text: &str, ) -> std::result::Result<(), JSONRPCErrorError> { parent_thread.ensure_rollout_materialized().await; parent_thread.flush_rollout().await.map_err(|err| { internal_error(format!( "failed to flush parent thread {parent_thread_id}: {err}" )) })?; let parent_history = parent_thread .load_history(/*include_archived*/ true) .await .map_err(|err| { internal_error(format!( "failed to load parent thread {parent_thread_id}: {err}" )) })?; let mut config = self.config.as_ref().clone(); if let Some(review_model) = &config.review_model { config.model = Some(review_model.clone()); } let NewThread { thread_id, thread: review_thread, .. } = self .thread_manager .fork_thread_from_history( ForkSnapshot::Interrupted, config.clone(), InitialHistory::Resumed(ResumedHistory { conversation_id: parent_thread_id, history: parent_history.items, rollout_path: parent_thread.rollout_path(), }), /*thread_source*/ None, /*persist_extended_history*/ false, self.request_trace_context(request_id).await, ) .await .map_err(|err| { internal_error(format!("error creating detached review thread: {err}")) })?; log_listener_attach_result( self.ensure_conversation_listener( thread_id, request_id.connection_id, /*raw_events_enabled*/ false, ) .await, thread_id, request_id.connection_id, "review thread", ); let fallback_provider = self.config.model_provider_id.as_str(); match review_thread .read_thread( /*include_archived*/ true, /*include_history*/ false, ) .await { Ok(stored_thread) => { let (mut thread, _) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); thread.session_id = review_thread.session_configured().session_id.to_string(); self.thread_watch_manager .upsert_thread_silently(thread.clone()) .await; thread.status = resolve_thread_status( self.thread_watch_manager .loaded_status_for_thread(&thread.id) .await, /*has_in_progress_turn*/ false, ); let notif = thread_started_notification(thread); self.outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) .await; } Err(err) => { tracing::warn!("failed to load summary for review thread {thread_id}: {err}"); } } let turn_id = self .submit_core_op( request_id, review_thread.as_ref(), Op::Review { review_request }, ) .await .map_err(|err| { internal_error(format!("failed to start detached review turn: {err}")) })?; let turn = Self::build_review_turn(turn_id, display_text); let review_thread_id = thread_id.to_string(); self.emit_review_started(request_id, turn, review_thread_id) .await; Ok(()) } async fn review_start_inner( &self, request_id: &ConnectionRequestId, params: ReviewStartParams, ) -> Result<(), JSONRPCErrorError> { let ReviewStartParams { thread_id, target, delivery, } = params; let (parent_thread_id, parent_thread) = self.load_thread(&thread_id).await?; let (review_request, display_text) = Self::review_request_from_target(target)?; match delivery.unwrap_or(ApiReviewDelivery::Inline).to_core() { CoreReviewDelivery::Inline => { self.start_inline_review( request_id, parent_thread, review_request, &display_text, thread_id, ) .await?; } CoreReviewDelivery::Detached => { self.start_detached_review( request_id, parent_thread_id, parent_thread, review_request, &display_text, ) .await?; } } Ok(()) } async fn turn_interrupt_inner( &self, request_id: &ConnectionRequestId, params: TurnInterruptParams, ) -> Result, JSONRPCErrorError> { let TurnInterruptParams { thread_id, turn_id } = params; let is_startup_interrupt = turn_id.is_empty(); let (thread_uuid, thread) = self.load_thread(&thread_id).await?; // Record turn interrupts so we can reply when TurnAborted arrives. Startup // interrupts do not have a turn and are acknowledged after submission. if !is_startup_interrupt { let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; let is_running = matches!(thread.agent_status().await, AgentStatus::Running); { let mut thread_state = thread_state.lock().await; if let Some(active_turn) = thread_state.active_turn_snapshot() { if active_turn.id != turn_id { return Err(invalid_request(format!( "expected active turn id {turn_id} but found {}", active_turn.id ))); } } else if thread_state.last_terminal_turn_id.as_deref() == Some(turn_id.as_str()) || !is_running { return Err(invalid_request("no active turn to interrupt")); } thread_state.pending_interrupts.push(request_id.clone()); } self.outgoing .record_request_turn_id(request_id, &turn_id) .await; } // Submit the interrupt. Turn interrupts respond upon TurnAborted; startup // interrupts respond here because startup cancellation has no turn event. match self .submit_core_op(request_id, thread.as_ref(), Op::Interrupt) .await { Ok(_) if is_startup_interrupt => Ok(Some(TurnInterruptResponse {})), Ok(_) => Ok(None), Err(err) => { if !is_startup_interrupt { let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; let mut thread_state = thread_state.lock().await; thread_state .pending_interrupts .retain(|pending_request_id| pending_request_id != request_id); } let interrupt_target = if is_startup_interrupt { "startup" } else { "turn" }; Err(internal_error(format!( "failed to interrupt {interrupt_target}: {err}" ))) } } } fn listener_task_context(&self) -> ListenerTaskContext { ListenerTaskContext { thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), pending_thread_unloads: Arc::clone(&self.pending_thread_unloads), thread_watch_manager: self.thread_watch_manager.clone(), thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), skills_watcher: Arc::clone(&self.skills_watcher), } } async fn ensure_conversation_listener( &self, conversation_id: ThreadId, connection_id: ConnectionId, raw_events_enabled: bool, ) -> Result { super::thread_lifecycle::ensure_conversation_listener( self.listener_task_context(), conversation_id, connection_id, raw_events_enabled, ) .await } } fn xcode_26_4_mcp_elicitations_auto_deny( client_name: Option<&str>, client_version: Option<&str>, ) -> bool { // Xcode 26.4 shipped before app-server MCP elicitation requests were // client-visible. Keep elicitations auto-denied for that client line. // TODO: Remove this compatibility hack once Xcode 26.4 ages out. client_name == Some("Xcode") && client_version.is_some_and(|version| version.starts_with("26.4")) }