use super::legacy_sandbox_compat::*; use super::*; use crate::error_code::method_not_found; const THREAD_LIST_DEFAULT_LIMIT: usize = 25; const THREAD_LIST_MAX_LIMIT: usize = 100; const PERSIST_EXTENDED_HISTORY_DEPRECATION_SUMMARY: &str = "persistExtendedHistory is deprecated and ignored"; const PERSIST_EXTENDED_HISTORY_DEPRECATION_DETAILS: &str = "Remove this parameter. App-server always uses limited history persistence."; struct ThreadListFilters { model_providers: Option>, source_kinds: Option>, archived: bool, cwd_filters: Option>, search_term: Option, use_state_db_only: bool, } #[derive(Clone)] struct PersistedThreadPermissionState { permission_profile: PermissionProfile, active_permission_profile: Option, workspace_roots: Vec, } fn absolute_path_from_history_path( path: &Path, base: Option<&AbsolutePathBuf>, ) -> Option { if let Ok(path) = AbsolutePathBuf::try_from(path) { Some(path) } else if let Some(base) = base { Some(AbsolutePathBuf::resolve_path_against_base( path, base.as_path(), )) } else { AbsolutePathBuf::relative_to_current_dir(path).ok() } } fn effective_cwd_for_legacy_sandbox( request_cwd: Option<&str>, history_cwd: Option<&Path>, persisted_permission_state: Option<&PersistedThreadPermissionState>, ) -> Option { let history_cwd = history_cwd.and_then(|cwd| absolute_path_from_history_path(cwd, /*base*/ None)); request_cwd .and_then(|cwd| absolute_path_from_history_path(Path::new(cwd), history_cwd.as_ref())) .or_else(|| history_cwd.clone()) .or_else(|| { persisted_permission_state .and_then(|state| state.workspace_roots.first()) .cloned() }) } fn persisted_thread_permission_state( history: &InitialHistory, fallback_cwd: Option<&Path>, fallback_sandbox_policy: Option<&codex_protocol::protocol::SandboxPolicy>, ) -> Option { let mut cwd = fallback_cwd.and_then(|cwd| absolute_path_from_history_path(cwd, /*base*/ None)); let mut workspace_roots = None; let mut permission_profile = None; let mut active_permission_profile = None; for item in history.get_rollout_items() { match item { RolloutItem::SessionMeta(meta_line) => { cwd = absolute_path_from_history_path(meta_line.meta.cwd.as_path(), cwd.as_ref()) .or(cwd); workspace_roots = Some(meta_line.meta.workspace_roots); } RolloutItem::TurnContext(context) => { cwd = absolute_path_from_history_path(context.cwd.as_path(), cwd.as_ref()).or(cwd); workspace_roots = Some(context.workspace_roots); let context_cwd = cwd .as_ref() .map(AbsolutePathBuf::as_path) .unwrap_or(context.cwd.as_path()); permission_profile = Some(context.permission_profile.unwrap_or_else(|| { PermissionProfile::from_legacy_sandbox_policy_for_cwd( &context.sandbox_policy, context_cwd, ) })); active_permission_profile = context.active_permission_profile; } RolloutItem::EventMsg(EventMsg::SessionConfigured(event)) => { cwd = Some(event.cwd.clone()); workspace_roots = Some(event.workspace_roots); permission_profile = Some(event.permission_profile); active_permission_profile = event.active_permission_profile; } RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) => { } } } if permission_profile.is_none() { let cwd = cwd.as_ref()?; let fallback_sandbox_policy = fallback_sandbox_policy?; permission_profile = Some(PermissionProfile::from_legacy_sandbox_policy_for_cwd( fallback_sandbox_policy, cwd.as_path(), )); } Some(PersistedThreadPermissionState { permission_profile: permission_profile?, active_permission_profile, workspace_roots: workspace_roots.unwrap_or_else(|| cwd.into_iter().collect()), }) } fn collect_resume_override_mismatches( request: &ThreadResumeParams, config_snapshot: &ThreadConfigSnapshot, ) -> Vec { let mut mismatch_details = Vec::new(); if let Some(requested_model) = request.model.as_deref() && requested_model != config_snapshot.model { mismatch_details.push(format!( "model requested={requested_model} active={}", config_snapshot.model )); } if let Some(requested_provider) = request.model_provider.as_deref() && requested_provider != config_snapshot.model_provider_id { mismatch_details.push(format!( "model_provider requested={requested_provider} active={}", config_snapshot.model_provider_id )); } if let Some(requested_service_tier) = request.service_tier.as_ref() && requested_service_tier != &config_snapshot.service_tier { mismatch_details.push(format!( "service_tier requested={requested_service_tier:?} active={:?}", config_snapshot.service_tier )); } if let Some(requested_cwd) = request.cwd.as_deref() { let requested_cwd_path = std::path::PathBuf::from(requested_cwd); if requested_cwd_path != config_snapshot.cwd.as_path() { mismatch_details.push(format!( "cwd requested={} active={}", requested_cwd_path.display(), config_snapshot.cwd.display() )); } } if let Some(requested_approval) = request.approval_policy.as_ref() { let active_approval: AskForApproval = config_snapshot.approval_policy.into(); if requested_approval != &active_approval { mismatch_details.push(format!( "approval_policy requested={requested_approval:?} active={active_approval:?}" )); } } if let Some(requested_review_policy) = request.approvals_reviewer.as_ref() { let active_review_policy: codex_app_server_protocol::ApprovalsReviewer = config_snapshot.approvals_reviewer.into(); if requested_review_policy != &active_review_policy { mismatch_details.push(format!( "approvals_reviewer requested={requested_review_policy:?} active={active_review_policy:?}" )); } } if let Some(requested_sandbox) = request.sandbox.as_ref() { let active_sandbox = config_snapshot.sandbox_policy(); let sandbox_matches = matches!( (requested_sandbox, &active_sandbox), ( SandboxMode::ReadOnly, codex_protocol::protocol::SandboxPolicy::ReadOnly { .. } ) | ( SandboxMode::WorkspaceWrite, codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { .. } ) | ( SandboxMode::DangerFullAccess, codex_protocol::protocol::SandboxPolicy::DangerFullAccess ) | ( SandboxMode::DangerFullAccess, codex_protocol::protocol::SandboxPolicy::ExternalSandbox { .. } ) ); if !sandbox_matches { mismatch_details.push(format!( "sandbox requested={requested_sandbox:?} active={active_sandbox:?}" )); } } if let Some(requested_personality) = request.personality.as_ref() && config_snapshot.personality.as_ref() != Some(requested_personality) { mismatch_details.push(format!( "personality requested={requested_personality:?} active={:?}", config_snapshot.personality )); } if request.config.is_some() { mismatch_details .push("config overrides were provided and ignored while running".to_string()); } if request.base_instructions.is_some() { mismatch_details .push("baseInstructions override was provided and ignored while running".to_string()); } if request.developer_instructions.is_some() { mismatch_details.push( "developerInstructions override was provided and ignored while running".to_string(), ); } mismatch_details } fn merge_persisted_resume_metadata( request_overrides: &mut Option>, typesafe_overrides: &mut ConfigOverrides, persisted_metadata: &ThreadMetadata, ) { if has_model_resume_override(request_overrides.as_ref(), typesafe_overrides) { return; } typesafe_overrides.model = persisted_metadata.model.clone(); typesafe_overrides.model_provider = Some(persisted_metadata.model_provider.clone()); if let Some(reasoning_effort) = persisted_metadata.reasoning_effort { request_overrides.get_or_insert_with(HashMap::new).insert( "model_reasoning_effort".to_string(), serde_json::Value::String(reasoning_effort.to_string()), ); } } fn normalize_thread_list_cwd_filters( cwd: Option, ) -> Result>, JSONRPCErrorError> { let Some(cwd) = cwd else { return Ok(None); }; let cwds = match cwd { ThreadListCwdFilter::One(cwd) => vec![cwd], ThreadListCwdFilter::Many(cwds) => cwds, }; let mut normalized_cwds = Vec::with_capacity(cwds.len()); for cwd in cwds { let cwd = AbsolutePathBuf::relative_to_current_dir(cwd.as_str()) .map(AbsolutePathBuf::into_path_buf) .map_err(|err| { invalid_params(format!("invalid thread/list cwd filter `{cwd}`: {err}")) })?; normalized_cwds.push(cwd); } Ok(Some(normalized_cwds)) } fn has_model_resume_override( request_overrides: Option<&HashMap>, typesafe_overrides: &ConfigOverrides, ) -> bool { typesafe_overrides.model.is_some() || typesafe_overrides.model_provider.is_some() || request_overrides.is_some_and(|overrides| overrides.contains_key("model")) || request_overrides .is_some_and(|overrides| overrides.contains_key("model_reasoning_effort")) } fn validate_dynamic_tools(tools: &[ApiDynamicToolSpec]) -> Result<(), String> { const DYNAMIC_TOOL_NAME_MAX_LEN: usize = 128; const DYNAMIC_TOOL_NAMESPACE_MAX_LEN: usize = 64; const DYNAMIC_TOOL_IDENTIFIER_PATTERN: &str = "^[a-zA-Z0-9_-]+$"; const RESERVED_RESPONSES_NAMESPACES: &[&str] = &[ "api_tool", "browser", "computer", "container", "file_search", "functions", "image_gen", "multi_tool_use", "python", "python_user_visible", "submodel_delegator", "terminal", "tool_search", "web", ]; fn escape_identifier_for_error(value: &str) -> String { value.escape_default().to_string() } fn validate_dynamic_tool_identifier( value: &str, label: &str, max_len: usize, ) -> Result<(), String> { if !value .bytes() .all(|byte| byte.is_ascii_alphanumeric() || matches!(byte, b'_' | b'-')) { return Err(format!( "{label} must match {DYNAMIC_TOOL_IDENTIFIER_PATTERN} to match Responses API: {}", escape_identifier_for_error(value), )); } if value.chars().count() > max_len { return Err(format!( "{label} must be at most {max_len} characters to match Responses API: {}", escape_identifier_for_error(value), )); } Ok(()) } let mut seen = HashSet::new(); for tool in tools { let name = tool.name.trim(); if name.is_empty() { return Err("dynamic tool name must not be empty".to_string()); } if name != tool.name { return Err(format!( "dynamic tool name has leading/trailing whitespace: {}", escape_identifier_for_error(&tool.name), )); } validate_dynamic_tool_identifier(name, "dynamic tool name", DYNAMIC_TOOL_NAME_MAX_LEN)?; if name == "mcp" || name.starts_with("mcp__") { return Err(format!("dynamic tool name is reserved: {name}")); } let namespace = tool.namespace.as_deref().map(str::trim); if let Some(namespace) = namespace { if namespace.is_empty() { return Err(format!( "dynamic tool namespace must not be empty for {name}" )); } if Some(namespace) != tool.namespace.as_deref() { return Err(format!( "dynamic tool namespace has leading/trailing whitespace for {name}: {namespace}", name = escape_identifier_for_error(name), namespace = escape_identifier_for_error(namespace), )); } validate_dynamic_tool_identifier( namespace, "dynamic tool namespace", DYNAMIC_TOOL_NAMESPACE_MAX_LEN, )?; if namespace == "mcp" || namespace.starts_with("mcp__") { return Err(format!( "dynamic tool namespace is reserved for {name}: {namespace}" )); } if RESERVED_RESPONSES_NAMESPACES.contains(&namespace) { return Err(format!( "dynamic tool namespace collides with a reserved Responses API namespace for {name}: {namespace}", )); } } if !seen.insert((namespace, name)) { if let Some(namespace) = namespace { return Err(format!( "duplicate dynamic tool name in namespace {namespace}: {name}" )); } return Err(format!("duplicate dynamic tool name: {name}")); } if tool.defer_loading && namespace.is_none() { return Err(format!( "deferred dynamic tool must include a namespace: {name}" )); } if let Err(err) = codex_tools::parse_tool_input_schema(&tool.input_schema) { return Err(format!( "dynamic tool input schema is not supported for {name}: {err}" )); } } Ok(()) } #[derive(Clone)] pub(crate) struct ThreadRequestProcessor { pub(super) auth_manager: Arc, pub(super) thread_manager: Arc, pub(super) outgoing: Arc, pub(super) arg0_paths: Arg0DispatchPaths, pub(super) config: Arc, pub(super) config_manager: ConfigManager, pub(super) thread_store: Arc, pub(super) pending_thread_unloads: Arc>>, pub(super) thread_state_manager: ThreadStateManager, pub(super) thread_watch_manager: ThreadWatchManager, pub(super) thread_list_state_permit: Arc, pub(super) thread_goal_processor: ThreadGoalRequestProcessor, pub(super) state_db: Option, pub(super) background_tasks: TaskTracker, pub(super) skills_watcher: Arc, } impl ThreadRequestProcessor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( auth_manager: Arc, thread_manager: Arc, outgoing: Arc, arg0_paths: Arg0DispatchPaths, config: Arc, config_manager: ConfigManager, thread_store: Arc, pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, thread_goal_processor: ThreadGoalRequestProcessor, state_db: Option, skills_watcher: Arc, ) -> Self { Self { auth_manager, thread_manager, outgoing, arg0_paths, config, config_manager, thread_store, pending_thread_unloads, thread_state_manager, thread_watch_manager, thread_list_state_permit, thread_goal_processor, state_db, background_tasks: TaskTracker::new(), skills_watcher, } } pub(crate) async fn thread_start( &self, request_id: ConnectionRequestId, params: ThreadStartParams, app_server_client_name: Option, app_server_client_version: Option, request_context: RequestContext, ) -> Result, JSONRPCErrorError> { self.thread_start_inner( request_id, params, app_server_client_name, app_server_client_version, request_context, ) .await .map(|()| None) } pub(crate) async fn thread_unsubscribe( &self, request_id: &ConnectionRequestId, params: ThreadUnsubscribeParams, ) -> Result, JSONRPCErrorError> { self.thread_unsubscribe_response_inner(params, request_id.connection_id) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_resume( &self, request_id: ConnectionRequestId, params: ThreadResumeParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result, JSONRPCErrorError> { self.thread_resume_inner( request_id, params, app_server_client_name, app_server_client_version, ) .await .map(|()| None) } pub(crate) async fn thread_fork( &self, request_id: ConnectionRequestId, params: ThreadForkParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result, JSONRPCErrorError> { self.thread_fork_inner( request_id, params, app_server_client_name, app_server_client_version, ) .await .map(|()| None) } pub(crate) async fn thread_archive( &self, request_id: ConnectionRequestId, params: ThreadArchiveParams, ) -> Result, JSONRPCErrorError> { match self.thread_archive_inner(params).await { Ok((response, archived_thread_ids)) => { self.outgoing .send_response(request_id.clone(), response) .await; for thread_id in archived_thread_ids { self.outgoing .send_server_notification(ServerNotification::ThreadArchived( ThreadArchivedNotification { thread_id }, )) .await; } Ok(None) } Err(error) => Err(error), } } pub(crate) async fn thread_increment_elicitation( &self, params: ThreadIncrementElicitationParams, ) -> Result, JSONRPCErrorError> { self.thread_increment_elicitation_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_decrement_elicitation( &self, params: ThreadDecrementElicitationParams, ) -> Result, JSONRPCErrorError> { self.thread_decrement_elicitation_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_set_name( &self, request_id: ConnectionRequestId, params: ThreadSetNameParams, ) -> Result, JSONRPCErrorError> { match self.thread_set_name_response_inner(params).await { Ok((response, notification)) => { self.outgoing .send_response(request_id.clone(), response) .await; if let Some(notification) = notification { self.outgoing .send_server_notification(ServerNotification::ThreadNameUpdated( notification, )) .await; } Ok(None) } Err(error) => Err(error), } } pub(crate) async fn thread_metadata_update( &self, params: ThreadMetadataUpdateParams, ) -> Result, JSONRPCErrorError> { self.thread_metadata_update_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_memory_mode_set( &self, params: ThreadMemoryModeSetParams, ) -> Result, JSONRPCErrorError> { self.thread_memory_mode_set_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn memory_reset( &self, ) -> Result, JSONRPCErrorError> { self.memory_reset_response_inner() .await .map(|response: MemoryResetResponse| Some(response.into())) } pub(crate) async fn thread_unarchive( &self, request_id: ConnectionRequestId, params: ThreadUnarchiveParams, ) -> Result, JSONRPCErrorError> { match self.thread_unarchive_inner(params).await { Ok((response, notification)) => { self.outgoing .send_response(request_id.clone(), response) .await; self.outgoing .send_server_notification(ServerNotification::ThreadUnarchived(notification)) .await; Ok(None) } Err(error) => Err(error), } } pub(crate) async fn thread_compact_start( &self, request_id: &ConnectionRequestId, params: ThreadCompactStartParams, ) -> Result, JSONRPCErrorError> { self.thread_compact_start_inner(request_id, params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_background_terminals_clean( &self, request_id: &ConnectionRequestId, params: ThreadBackgroundTerminalsCleanParams, ) -> Result, JSONRPCErrorError> { self.thread_background_terminals_clean_inner(request_id, params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_rollback( &self, request_id: &ConnectionRequestId, params: ThreadRollbackParams, ) -> Result, JSONRPCErrorError> { self.thread_rollback_inner(request_id, params) .await .map(|()| None) } pub(crate) async fn thread_list( &self, params: ThreadListParams, ) -> Result, JSONRPCErrorError> { self.thread_list_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_loaded_list( &self, params: ThreadLoadedListParams, ) -> Result, JSONRPCErrorError> { self.thread_loaded_list_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_read( &self, params: ThreadReadParams, ) -> Result, JSONRPCErrorError> { self.thread_read_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_turns_list( &self, params: ThreadTurnsListParams, ) -> Result, JSONRPCErrorError> { self.thread_turns_list_response_inner(params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_turns_items_list( &self, _params: ThreadTurnsItemsListParams, ) -> Result, JSONRPCErrorError> { Err(method_not_found( "thread/turns/items/list is not supported yet", )) } pub(crate) async fn thread_shell_command( &self, request_id: &ConnectionRequestId, params: ThreadShellCommandParams, ) -> Result, JSONRPCErrorError> { self.thread_shell_command_inner(request_id, params) .await .map(|response| Some(response.into())) } pub(crate) async fn thread_approve_guardian_denied_action( &self, request_id: &ConnectionRequestId, params: ThreadApproveGuardianDeniedActionParams, ) -> Result, JSONRPCErrorError> { self.thread_approve_guardian_denied_action_inner(request_id, params) .await .map(|response| Some(response.into())) } pub(crate) async fn conversation_summary( &self, params: GetConversationSummaryParams, ) -> Result, JSONRPCErrorError> { self.get_thread_summary_response_inner(params) .await .map(|response| Some(response.into())) } async fn instruction_sources_from_config(config: &Config) -> Vec { codex_core::AgentsMdManager::new(config) .instruction_sources(LOCAL_FS.as_ref()) .await } async fn load_thread( &self, thread_id: &str, ) -> Result<(ThreadId, Arc), JSONRPCErrorError> { // Resolve the core conversation handle from a v2 thread id string. let thread_id = ThreadId::from_string(thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let thread = self .thread_manager .get_thread(thread_id) .await .map_err(|_| invalid_request(format!("thread not found: {thread_id}")))?; Ok((thread_id, thread)) } async fn acquire_thread_list_state_permit( &self, ) -> Result, JSONRPCErrorError> { self.thread_list_state_permit .acquire() .await .map_err(|err| { internal_error(format!("failed to acquire thread list state permit: {err}")) }) } async fn set_app_server_client_info( thread: &CodexThread, app_server_client_name: Option, app_server_client_version: Option, ) -> Result<(), JSONRPCErrorError> { let mcp_elicitations_auto_deny = xcode_26_4_mcp_elicitations_auto_deny( app_server_client_name.as_deref(), app_server_client_version.as_deref(), ); thread .set_app_server_client_info( app_server_client_name, app_server_client_version, mcp_elicitations_auto_deny, ) .await .map_err(|err| internal_error(format!("failed to set app server client info: {err}"))) } async fn finalize_thread_teardown(&self, thread_id: ThreadId) { self.pending_thread_unloads.lock().await.remove(&thread_id); self.outgoing .cancel_requests_for_thread(thread_id, /*error*/ None) .await; self.thread_state_manager .remove_thread_state(thread_id) .await; self.thread_watch_manager .remove_thread(&thread_id.to_string()) .await; } async fn thread_unsubscribe_response_inner( &self, params: ThreadUnsubscribeParams, connection_id: ConnectionId, ) -> Result { let thread_id = ThreadId::from_string(¶ms.thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; if self.thread_manager.get_thread(thread_id).await.is_err() { self.finalize_thread_teardown(thread_id).await; return Ok(ThreadUnsubscribeResponse { status: ThreadUnsubscribeStatus::NotLoaded, }); }; let was_subscribed = self .thread_state_manager .unsubscribe_connection_from_thread(thread_id, connection_id) .await; let status = if was_subscribed { ThreadUnsubscribeStatus::Unsubscribed } else { ThreadUnsubscribeStatus::NotSubscribed }; Ok(ThreadUnsubscribeResponse { status }) } async fn prepare_thread_for_archive(&self, thread_id: ThreadId) { let removed_conversation = self.thread_manager.remove_thread(&thread_id).await; if let Some(conversation) = removed_conversation { info!("thread {thread_id} was active; shutting down"); match wait_for_thread_shutdown(&conversation).await { ThreadShutdownResult::Complete => {} ThreadShutdownResult::SubmitFailed => { error!( "failed to submit Shutdown to thread {thread_id}; proceeding with archive" ); } ThreadShutdownResult::TimedOut => { warn!("thread {thread_id} shutdown timed out; proceeding with archive"); } } } self.finalize_thread_teardown(thread_id).await; } fn listener_task_context(&self) -> ListenerTaskContext { ListenerTaskContext { thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), pending_thread_unloads: Arc::clone(&self.pending_thread_unloads), thread_watch_manager: self.thread_watch_manager.clone(), thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), skills_watcher: Arc::clone(&self.skills_watcher), } } async fn ensure_conversation_listener( &self, conversation_id: ThreadId, connection_id: ConnectionId, raw_events_enabled: bool, ) -> Result { super::thread_lifecycle::ensure_conversation_listener( self.listener_task_context(), conversation_id, connection_id, raw_events_enabled, ) .await } async fn ensure_listener_task_running( &self, conversation_id: ThreadId, conversation: Arc, thread_state: Arc>, ) -> Result<(), JSONRPCErrorError> { super::thread_lifecycle::ensure_listener_task_running( self.listener_task_context(), conversation_id, conversation, thread_state, ) .await } async fn thread_start_inner( &self, request_id: ConnectionRequestId, params: ThreadStartParams, app_server_client_name: Option, app_server_client_version: Option, request_context: RequestContext, ) -> Result<(), JSONRPCErrorError> { let ThreadStartParams { model, model_provider, service_tier, cwd, workspace_roots, approval_policy, approvals_reviewer, sandbox, permissions, config, service_name, base_instructions, developer_instructions, dynamic_tools, mock_experimental_field: _mock_experimental_field, experimental_raw_events, personality, ephemeral, session_start_source, thread_source, environments, persist_extended_history, } = params; if sandbox.is_some() && permissions.is_some() { return Err(invalid_request( "`permissions` cannot be combined with `sandbox`", )); } if persist_extended_history { self.send_persist_extended_history_deprecation_notice(request_id.connection_id) .await; } let environment_selections = self.parse_environment_selections(environments)?; let mut typesafe_overrides = self.build_thread_config_overrides( model, model_provider, service_tier, cwd, workspace_roots, approval_policy, approvals_reviewer, sandbox, permissions, base_instructions, developer_instructions, personality, ); typesafe_overrides.ephemeral = ephemeral; let listener_task_context = ListenerTaskContext { thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), pending_thread_unloads: Arc::clone(&self.pending_thread_unloads), thread_watch_manager: self.thread_watch_manager.clone(), thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), skills_watcher: Arc::clone(&self.skills_watcher), }; let request_trace = request_context.request_trace(); let config_manager = self.config_manager.clone(); let outgoing = Arc::clone(&listener_task_context.outgoing); let error_request_id = request_id.clone(); let thread_start_task = async move { if let Err(error) = Self::thread_start_task( listener_task_context, config_manager, request_id, app_server_client_name, app_server_client_version, config, typesafe_overrides, dynamic_tools, session_start_source, thread_source.map(Into::into), environment_selections, service_name, experimental_raw_events, request_trace, ) .await { outgoing.send_error(error_request_id, error).await; } }; self.background_tasks .spawn(thread_start_task.instrument(request_context.span())); Ok(()) } pub(crate) async fn drain_background_tasks(&self) { self.background_tasks.close(); if tokio::time::timeout(Duration::from_secs(10), self.background_tasks.wait()) .await .is_err() { warn!("timed out waiting for background tasks to shut down; proceeding"); } } pub(crate) async fn clear_all_thread_listeners(&self) { self.thread_state_manager.clear_all_listeners().await; } pub(crate) async fn shutdown_threads(&self) { let report = self .thread_manager .shutdown_all_threads_bounded(Duration::from_secs(10)) .await; for thread_id in report.submit_failed { warn!("failed to submit Shutdown to thread {thread_id}"); } for thread_id in report.timed_out { warn!("timed out waiting for thread {thread_id} to shut down"); } } async fn request_trace_context( &self, request_id: &ConnectionRequestId, ) -> Option { self.outgoing.request_trace_context(request_id).await } async fn send_persist_extended_history_deprecation_notice(&self, connection_id: ConnectionId) { self.outgoing .send_server_notification_to_connections( &[connection_id], ServerNotification::DeprecationNotice(DeprecationNoticeNotification { summary: PERSIST_EXTENDED_HISTORY_DEPRECATION_SUMMARY.to_string(), details: Some(PERSIST_EXTENDED_HISTORY_DEPRECATION_DETAILS.to_string()), }), ) .await; } async fn submit_core_op( &self, request_id: &ConnectionRequestId, thread: &CodexThread, op: Op, ) -> CodexResult { thread .submit_with_trace(op, self.request_trace_context(request_id).await) .await } #[allow(clippy::too_many_arguments)] async fn thread_start_task( listener_task_context: ListenerTaskContext, config_manager: ConfigManager, request_id: ConnectionRequestId, app_server_client_name: Option, app_server_client_version: Option, config_overrides: Option>, typesafe_overrides: ConfigOverrides, dynamic_tools: Option>, session_start_source: Option, thread_source: Option, environments: Option>, service_name: Option, experimental_raw_events: bool, request_trace: Option, ) -> Result<(), JSONRPCErrorError> { let thread_start_started_at = std::time::Instant::now(); let requested_cwd = typesafe_overrides.cwd.clone(); let mut config = config_manager .load_with_overrides(config_overrides.clone(), typesafe_overrides.clone()) .await .map_err(|err| config_load_error(&err))?; // The user may have requested WorkspaceWrite or DangerFullAccess via // the command line, though in the process of deriving the Config, it // could be downgraded to ReadOnly (perhaps there is no sandbox // available on Windows or the enterprise config disallows it). The cwd // should still be considered "trusted" in this case. let requested_permissions_trust_project = requested_permissions_trust_project(&typesafe_overrides, config.cwd.as_path()); let effective_permissions_trust_project = permission_profile_trusts_project( config.permissions.permission_profile_ref(), config.cwd.as_path(), ); if requested_cwd.is_some() && config.active_project.trust_level.is_none() && (requested_permissions_trust_project || effective_permissions_trust_project) { let trust_target = resolve_root_git_project_for_trust(LOCAL_FS.as_ref(), &config.cwd) .await .unwrap_or_else(|| config.cwd.clone()); let current_cli_overrides = config_manager.current_cli_overrides(); let cli_overrides_with_trust; let cli_overrides_for_reload = if let Err(err) = codex_core::config::set_project_trust_level( &listener_task_context.codex_home, trust_target.as_path(), TrustLevel::Trusted, ) { warn!( "failed to persist trusted project state for {}; continuing with in-memory trust for this thread: {err}", trust_target.display() ); let mut project = toml::map::Map::new(); project.insert( "trust_level".to_string(), TomlValue::String("trusted".to_string()), ); let mut projects = toml::map::Map::new(); projects.insert( project_trust_key(trust_target.as_path()), TomlValue::Table(project), ); cli_overrides_with_trust = current_cli_overrides .iter() .cloned() .chain(std::iter::once(( "projects".to_string(), TomlValue::Table(projects), ))) .collect::>(); cli_overrides_with_trust.as_slice() } else { current_cli_overrides.as_slice() }; config = config_manager .load_with_cli_overrides( cli_overrides_for_reload, config_overrides, typesafe_overrides, /*fallback_cwd*/ None, ) .await .map_err(|err| config_load_error(&err))?; } let instruction_sources = Self::instruction_sources_from_config(&config).await; let environments = environments.unwrap_or_else(|| { listener_task_context .thread_manager .default_environment_selections(&config.cwd) }); let dynamic_tools = dynamic_tools.unwrap_or_default(); let core_dynamic_tools = if dynamic_tools.is_empty() { Vec::new() } else { validate_dynamic_tools(&dynamic_tools).map_err(invalid_request)?; dynamic_tools .into_iter() .map(|tool| CoreDynamicToolSpec { namespace: tool.namespace, name: tool.name, description: tool.description, input_schema: tool.input_schema, defer_loading: tool.defer_loading, }) .collect() }; let core_dynamic_tool_count = core_dynamic_tools.len(); let create_thread_started_at = std::time::Instant::now(); let NewThread { thread_id, thread, session_configured, .. } = listener_task_context .thread_manager .start_thread_with_options(StartThreadOptions { config, initial_history: match session_start_source .unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup) { codex_app_server_protocol::ThreadStartSource::Startup => InitialHistory::New, codex_app_server_protocol::ThreadStartSource::Clear => InitialHistory::Cleared, }, session_source: None, thread_source, dynamic_tools: core_dynamic_tools, persist_extended_history: false, metrics_service_name: service_name, parent_trace: request_trace, environments, }) .instrument(tracing::info_span!( "app_server.thread_start.create_thread", otel.name = "app_server.thread_start.create_thread", thread_start.dynamic_tool_count = core_dynamic_tool_count, thread_start.persist_extended_history = false, )) .await .map_err(|err| match err { CodexErr::InvalidRequest(message) => invalid_request(message), err => internal_error(format!("error creating thread: {err}")), })?; let session_telemetry = thread.session_telemetry(); session_telemetry.record_startup_phase( "thread_start_create_thread", create_thread_started_at.elapsed(), Some("ready"), ); Self::set_app_server_client_info( thread.as_ref(), app_server_client_name, app_server_client_version, ) .await?; let config_snapshot = thread .config_snapshot() .instrument(tracing::info_span!( "app_server.thread_start.config_snapshot", otel.name = "app_server.thread_start.config_snapshot", )) .await; let mut thread = build_thread_from_snapshot( thread_id, session_configured.session_id.to_string(), &config_snapshot, session_configured.rollout_path.clone(), ); // Auto-attach a thread listener when starting a thread. log_listener_attach_result( super::thread_lifecycle::ensure_conversation_listener( listener_task_context.clone(), thread_id, request_id.connection_id, experimental_raw_events, ) .instrument(tracing::info_span!( "app_server.thread_start.attach_listener", otel.name = "app_server.thread_start.attach_listener", thread_start.experimental_raw_events = experimental_raw_events, )) .await, thread_id, request_id.connection_id, "thread", ); listener_task_context .thread_watch_manager .upsert_thread_silently(thread.clone()) .instrument(tracing::info_span!( "app_server.thread_start.upsert_thread", otel.name = "app_server.thread_start.upsert_thread", )) .await; thread.status = resolve_thread_status( listener_task_context .thread_watch_manager .loaded_status_for_thread(&thread.id) .instrument(tracing::info_span!( "app_server.thread_start.resolve_status", otel.name = "app_server.thread_start.resolve_status", )) .await, /*has_in_progress_turn*/ false, ); let sandbox = thread_response_sandbox_policy( &config_snapshot.permission_profile, &config_snapshot.workspace_roots, config_snapshot.cwd.as_path(), ); let active_permission_profile = thread_response_active_permission_profile(config_snapshot.active_permission_profile); let response = ThreadStartResponse { thread: thread.clone(), model: config_snapshot.model, model_provider: config_snapshot.model_provider_id, service_tier: config_snapshot.service_tier, cwd: config_snapshot.cwd, instruction_sources, approval_policy: config_snapshot.approval_policy.into(), approvals_reviewer: config_snapshot.approvals_reviewer.into(), sandbox, workspace_roots: config_snapshot.workspace_roots, active_permission_profile, reasoning_effort: config_snapshot.reasoning_effort, }; let notif = thread_started_notification(thread); listener_task_context .outgoing .send_response(request_id, response) .instrument(tracing::info_span!( "app_server.thread_start.send_response", otel.name = "app_server.thread_start.send_response", )) .await; listener_task_context .outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) .instrument(tracing::info_span!( "app_server.thread_start.notify_started", otel.name = "app_server.thread_start.notify_started", )) .await; session_telemetry.record_startup_phase( "thread_start_total", thread_start_started_at.elapsed(), Some("ready"), ); Ok(()) } #[allow(clippy::too_many_arguments)] fn build_thread_config_overrides( &self, model: Option, model_provider: Option, service_tier: Option>, cwd: Option, workspace_roots: Option>, approval_policy: Option, approvals_reviewer: Option, sandbox: Option, permissions: Option, base_instructions: Option, developer_instructions: Option, personality: Option, ) -> ConfigOverrides { let mut overrides = ConfigOverrides { model, model_provider, service_tier, cwd: cwd.map(PathBuf::from), workspace_roots: workspace_roots .map(|roots| roots.into_iter().map(|root| root.to_path_buf()).collect()), approval_policy: approval_policy .map(codex_app_server_protocol::AskForApproval::to_core), approvals_reviewer: approvals_reviewer .map(codex_app_server_protocol::ApprovalsReviewer::to_core), sandbox_mode: sandbox.map(SandboxMode::to_core), codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(), main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(), base_instructions, developer_instructions, personality, ..Default::default() }; apply_permission_profile_selection_to_config_overrides(&mut overrides, permissions); overrides } async fn validate_active_permission_profile_selection( &self, permissions: String, request_overrides: Option>, cwd: Option, fallback_cwd: Option, ) -> Result { let mut overrides = ConfigOverrides { cwd, codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(), main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(), ..Default::default() }; apply_permission_profile_selection_to_config_overrides(&mut overrides, Some(permissions)); let config = self .config_manager .load_for_cwd(request_overrides, overrides, fallback_cwd) .await .map_err(|err| config_load_error(&err))?; if let Some(warning) = config.startup_warnings.iter().find(|warning| { warning.contains("Configured value for `permission_profile` is disallowed") }) { return Err(invalid_request(format!( "invalid permission profile selection: {warning}" ))); } let active_permission_profile = config .permissions .active_permission_profile() .ok_or_else(|| { invalid_request( "permission profile selection did not resolve to a named profile", ) })?; Ok(ResolvedPermissionProfileSelection { permission_profile: config.permissions.permission_profile(), active_permission_profile, }) } fn parse_environment_selections( &self, environments: Option>, ) -> Result>, JSONRPCErrorError> { let environment_selections = environments.map(|environments| { environments .into_iter() .map(|environment| TurnEnvironmentSelection { environment_id: environment.environment_id, cwd: environment.cwd, }) .collect::>() }); if let Some(environment_selections) = environment_selections.as_ref() { self.thread_manager .validate_environment_selections(environment_selections) .map_err(|err| invalid_request(environment_selection_error_message(err)))?; } Ok(environment_selections) } async fn thread_archive_inner( &self, params: ThreadArchiveParams, ) -> Result<(ThreadArchiveResponse, Vec), JSONRPCErrorError> { let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; self.thread_archive_response(params).await } async fn thread_archive_response( &self, params: ThreadArchiveParams, ) -> Result<(ThreadArchiveResponse, Vec), JSONRPCErrorError> { let thread_id = ThreadId::from_string(¶ms.thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let mut thread_ids = vec![thread_id]; if let Some(state_db_ctx) = self.state_db.as_ref() { let descendants = state_db_ctx .list_thread_spawn_descendants(thread_id) .await .map_err(|err| { internal_error(format!( "failed to list spawned descendants for thread id {thread_id}: {err}" )) })?; let mut seen = HashSet::from([thread_id]); for descendant_id in descendants { if seen.insert(descendant_id) { thread_ids.push(descendant_id); } } } let mut archive_thread_ids = Vec::new(); match self .thread_store .read_thread(StoreReadThreadParams { thread_id, include_archived: false, include_history: false, }) .await { Ok(thread) => { if thread.archived_at.is_none() { archive_thread_ids.push(thread_id); } } Err(err) => return Err(thread_store_archive_error("archive", err)), } for descendant_thread_id in thread_ids.into_iter().skip(1) { match self .thread_store .read_thread(StoreReadThreadParams { thread_id: descendant_thread_id, include_archived: true, include_history: false, }) .await { Ok(thread) => { if thread.archived_at.is_none() { archive_thread_ids.push(descendant_thread_id); } } Err(err) => { warn!( "failed to read spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}" ); } } } let mut archived_thread_ids = Vec::new(); let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first() else { return Ok((ThreadArchiveResponse {}, archived_thread_ids)); }; self.prepare_thread_for_archive(*parent_thread_id).await; match self .thread_store .archive_thread(StoreArchiveThreadParams { thread_id: *parent_thread_id, }) .await { Ok(()) => { archived_thread_ids.push(parent_thread_id.to_string()); } Err(err) => return Err(thread_store_archive_error("archive", err)), } for descendant_thread_id in descendant_thread_ids.iter().rev().copied() { self.prepare_thread_for_archive(descendant_thread_id).await; match self .thread_store .archive_thread(StoreArchiveThreadParams { thread_id: descendant_thread_id, }) .await { Ok(()) => { archived_thread_ids.push(descendant_thread_id.to_string()); } Err(err) => { warn!( "failed to archive spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}" ); } } } Ok((ThreadArchiveResponse {}, archived_thread_ids)) } async fn thread_increment_elicitation_inner( &self, params: ThreadIncrementElicitationParams, ) -> Result { let (_, thread) = self.load_thread(¶ms.thread_id).await?; let count = thread .increment_out_of_band_elicitation_count() .await .map_err(|err| { internal_error(format!( "failed to increment out-of-band elicitation counter: {err}" )) })?; Ok(ThreadIncrementElicitationResponse { count, paused: count > 0, }) } async fn thread_decrement_elicitation_inner( &self, params: ThreadDecrementElicitationParams, ) -> Result { let (_, thread) = self.load_thread(¶ms.thread_id).await?; let count = thread .decrement_out_of_band_elicitation_count() .await .map_err(|err| match err { CodexErr::InvalidRequest(message) => invalid_request(message), err => internal_error(format!( "failed to decrement out-of-band elicitation counter: {err}" )), })?; Ok(ThreadDecrementElicitationResponse { count, paused: count > 0, }) } async fn thread_set_name_response_inner( &self, params: ThreadSetNameParams, ) -> Result<(ThreadSetNameResponse, Option), JSONRPCErrorError> { let ThreadSetNameParams { thread_id, name } = params; let thread_id = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let Some(name) = codex_core::util::normalize_thread_name(&name) else { return Err(invalid_request("thread name must not be empty")); }; let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; self.thread_manager .update_thread_metadata( thread_id, StoreThreadMetadataPatch { name: Some(Some(name.clone())), ..Default::default() }, /*include_archived*/ false, ) .await .map_err(|err| core_thread_write_error("set thread name", err))?; Ok(( ThreadSetNameResponse {}, Some(ThreadNameUpdatedNotification { thread_id: thread_id.to_string(), thread_name: Some(name), }), )) } async fn thread_memory_mode_set_response_inner( &self, params: ThreadMemoryModeSetParams, ) -> Result { let ThreadMemoryModeSetParams { thread_id, mode } = params; let thread_id = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; self.thread_manager .update_thread_metadata( thread_id, StoreThreadMetadataPatch { memory_mode: Some(mode.to_core()), ..Default::default() }, /*include_archived*/ false, ) .await .map_err(|err| core_thread_write_error("set thread memory mode", err))?; Ok(ThreadMemoryModeSetResponse {}) } async fn memory_reset_response_inner(&self) -> Result { let state_db = self .state_db .clone() .ok_or_else(|| internal_error("sqlite state db unavailable for memory reset"))?; state_db.clear_memory_data().await.map_err(|err| { internal_error(format!("failed to clear memory rows in state db: {err}")) })?; clear_memory_roots_contents(&self.config.codex_home) .await .map_err(|err| { internal_error(format!( "failed to clear memory directories under {}: {err}", self.config.codex_home.display() )) })?; Ok(MemoryResetResponse {}) } async fn thread_metadata_update_response_inner( &self, params: ThreadMetadataUpdateParams, ) -> Result { let ThreadMetadataUpdateParams { thread_id, git_info, } = params; let thread_uuid = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let Some(ThreadMetadataGitInfoUpdateParams { sha, branch, origin_url, }) = git_info else { return Err(invalid_request("gitInfo must include at least one field")); }; if sha.is_none() && branch.is_none() && origin_url.is_none() { return Err(invalid_request("gitInfo must include at least one field")); } let git_sha = Self::normalize_thread_metadata_git_field(sha, "gitInfo.sha")?; let git_branch = Self::normalize_thread_metadata_git_field(branch, "gitInfo.branch")?; let git_origin_url = Self::normalize_thread_metadata_git_field(origin_url, "gitInfo.originUrl")?; let patch = StoreThreadMetadataPatch { git_info: Some(StoreGitInfoPatch { sha: git_sha, branch: git_branch, origin_url: git_origin_url, }), ..Default::default() }; let updated_thread = { let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; self.thread_manager .update_thread_metadata(thread_uuid, patch, /*include_archived*/ true) .await .map_err(|err| core_thread_write_error("update thread metadata", err))? }; let (mut thread, _) = thread_from_stored_thread( updated_thread, self.config.model_provider_id.as_str(), &self.config.cwd, ); if let Ok(loaded_thread) = self.thread_manager.get_thread(thread_uuid).await { thread.session_id = loaded_thread.session_configured().session_id.to_string(); } self.attach_thread_name(thread_uuid, &mut thread).await; thread.status = resolve_thread_status( self.thread_watch_manager .loaded_status_for_thread(&thread.id) .await, /*has_in_progress_turn*/ false, ); Ok(ThreadMetadataUpdateResponse { thread }) } fn normalize_thread_metadata_git_field( value: Option>, name: &str, ) -> Result>, JSONRPCErrorError> { match value { Some(Some(value)) => { let value = value.trim().to_string(); if value.is_empty() { return Err(invalid_request(format!("{name} must not be empty"))); } Ok(Some(Some(value))) } Some(None) => Ok(Some(None)), None => Ok(None), } } async fn thread_unarchive_inner( &self, params: ThreadUnarchiveParams, ) -> Result<(ThreadUnarchiveResponse, ThreadUnarchivedNotification), JSONRPCErrorError> { let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; let (response, thread_id) = self.thread_unarchive_response(params).await?; Ok((response, ThreadUnarchivedNotification { thread_id })) } async fn thread_unarchive_response( &self, params: ThreadUnarchiveParams, ) -> Result<(ThreadUnarchiveResponse, String), JSONRPCErrorError> { let thread_id = ThreadId::from_string(¶ms.thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let fallback_provider = self.config.model_provider_id.clone(); let stored_thread = self .thread_store .unarchive_thread(StoreArchiveThreadParams { thread_id }) .await .map_err(|err| thread_store_archive_error("unarchive", err))?; let (mut thread, _) = thread_from_stored_thread(stored_thread, fallback_provider.as_str(), &self.config.cwd); thread.status = resolve_thread_status( self.thread_watch_manager .loaded_status_for_thread(&thread.id) .await, /*has_in_progress_turn*/ false, ); self.attach_thread_name(thread_id, &mut thread).await; let thread_id = thread.id.clone(); Ok((ThreadUnarchiveResponse { thread }, thread_id)) } async fn thread_rollback_inner( &self, request_id: &ConnectionRequestId, params: ThreadRollbackParams, ) -> Result<(), JSONRPCErrorError> { self.thread_rollback_start(request_id, params).await } async fn thread_rollback_start( &self, request_id: &ConnectionRequestId, params: ThreadRollbackParams, ) -> Result<(), JSONRPCErrorError> { let ThreadRollbackParams { thread_id, num_turns, } = params; if num_turns == 0 { return Err(invalid_request("numTurns must be >= 1")); } let (thread_id, thread) = self.load_thread(&thread_id).await?; let request = request_id.clone(); let rollback_already_in_progress = { let thread_state = self.thread_state_manager.thread_state(thread_id).await; let mut thread_state = thread_state.lock().await; if thread_state.pending_rollbacks.is_some() { true } else { thread_state.pending_rollbacks = Some(request.clone()); false } }; if rollback_already_in_progress { return Err(invalid_request( "rollback already in progress for this thread", )); } if let Err(err) = self .submit_core_op( request_id, thread.as_ref(), Op::ThreadRollback { num_turns }, ) .await { // No ThreadRollback event will arrive if an error occurs. // Clean up and reply immediately. let thread_state = self.thread_state_manager.thread_state(thread_id).await; thread_state.lock().await.pending_rollbacks = None; return Err(internal_error(format!("failed to start rollback: {err}"))); } Ok(()) } async fn thread_compact_start_inner( &self, request_id: &ConnectionRequestId, params: ThreadCompactStartParams, ) -> Result { let ThreadCompactStartParams { thread_id } = params; let (_, thread) = self.load_thread(&thread_id).await?; self.submit_core_op(request_id, thread.as_ref(), Op::Compact) .await .map_err(|err| internal_error(format!("failed to start compaction: {err}")))?; Ok(ThreadCompactStartResponse {}) } async fn thread_background_terminals_clean_inner( &self, request_id: &ConnectionRequestId, params: ThreadBackgroundTerminalsCleanParams, ) -> Result { let ThreadBackgroundTerminalsCleanParams { thread_id } = params; let (_, thread) = self.load_thread(&thread_id).await?; self.submit_core_op(request_id, thread.as_ref(), Op::CleanBackgroundTerminals) .await .map_err(|err| { internal_error(format!("failed to clean background terminals: {err}")) })?; Ok(ThreadBackgroundTerminalsCleanResponse {}) } async fn thread_shell_command_inner( &self, request_id: &ConnectionRequestId, params: ThreadShellCommandParams, ) -> Result { let ThreadShellCommandParams { thread_id, command } = params; let command = command.trim().to_string(); if command.is_empty() { return Err(invalid_request("command must not be empty")); } let (_, thread) = self.load_thread(&thread_id).await?; self.submit_core_op( request_id, thread.as_ref(), Op::RunUserShellCommand { command }, ) .await .map_err(|err| internal_error(format!("failed to start shell command: {err}")))?; Ok(ThreadShellCommandResponse {}) } async fn thread_approve_guardian_denied_action_inner( &self, request_id: &ConnectionRequestId, params: ThreadApproveGuardianDeniedActionParams, ) -> Result { let ThreadApproveGuardianDeniedActionParams { thread_id, event } = params; let event = serde_json::from_value(event) .map_err(|err| invalid_request(format!("invalid Guardian denial event: {err}")))?; let (_, thread) = self.load_thread(&thread_id).await?; self.submit_core_op( request_id, thread.as_ref(), Op::ApproveGuardianDeniedAction { event }, ) .await .map_err(|err| internal_error(format!("failed to approve Guardian denial: {err}")))?; Ok(ThreadApproveGuardianDeniedActionResponse {}) } async fn thread_list_response_inner( &self, params: ThreadListParams, ) -> Result { let ThreadListParams { cursor, limit, sort_key, sort_direction, model_providers, source_kinds, archived, cwd, use_state_db_only, search_term, } = params; let cwd_filters = normalize_thread_list_cwd_filters(cwd)?; let requested_page_size = limit .map(|value| value as usize) .unwrap_or(THREAD_LIST_DEFAULT_LIMIT) .clamp(1, THREAD_LIST_MAX_LIMIT); let store_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) { ThreadSortKey::CreatedAt => StoreThreadSortKey::CreatedAt, ThreadSortKey::UpdatedAt => StoreThreadSortKey::UpdatedAt, }; let sort_direction = sort_direction.unwrap_or(SortDirection::Desc); let (stored_threads, next_cursor) = self .list_threads_common( requested_page_size, cursor, store_sort_key, sort_direction, ThreadListFilters { model_providers, source_kinds, archived: archived.unwrap_or(false), cwd_filters, search_term, use_state_db_only, }, ) .await?; let backwards_cursor = stored_threads.first().and_then(|thread| { thread_backwards_cursor_for_sort_key(thread, store_sort_key, sort_direction) }); let mut threads = Vec::with_capacity(stored_threads.len()); let mut status_ids = Vec::with_capacity(stored_threads.len()); let fallback_provider = self.config.model_provider_id.clone(); for stored_thread in stored_threads { let (thread, _) = thread_from_stored_thread( stored_thread, fallback_provider.as_str(), &self.config.cwd, ); status_ids.push(thread.id.clone()); threads.push(thread); } let statuses = self .thread_watch_manager .loaded_statuses_for_threads(status_ids) .await; let data: Vec<_> = threads .into_iter() .map(|mut thread| { if let Some(status) = statuses.get(&thread.id) { thread.status = status.clone(); } thread }) .collect(); Ok(ThreadListResponse { data, next_cursor, backwards_cursor, }) } async fn thread_loaded_list_response_inner( &self, params: ThreadLoadedListParams, ) -> Result { let ThreadLoadedListParams { cursor, limit } = params; let mut data: Vec = self .thread_manager .list_thread_ids() .await .into_iter() .map(|thread_id| thread_id.to_string()) .collect(); if data.is_empty() { return Ok(ThreadLoadedListResponse { data, next_cursor: None, }); } data.sort(); let total = data.len(); let start = match cursor { Some(cursor) => { let cursor = match ThreadId::from_string(&cursor) { Ok(id) => id.to_string(), Err(_) => return Err(invalid_request(format!("invalid cursor: {cursor}"))), }; match data.binary_search(&cursor) { Ok(idx) => idx + 1, Err(idx) => idx, } } None => 0, }; let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; let end = start.saturating_add(effective_limit).min(total); let page = data[start..end].to_vec(); let next_cursor = page.last().filter(|_| end < total).cloned(); Ok(ThreadLoadedListResponse { data: page, next_cursor, }) } async fn thread_read_response_inner( &self, params: ThreadReadParams, ) -> Result { let ThreadReadParams { thread_id, include_turns, } = params; let thread_uuid = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let thread = self .read_thread_view(thread_uuid, include_turns) .await .map_err(thread_read_view_error)?; Ok(ThreadReadResponse { thread }) } /// Builds the API view for `thread/read` from persisted metadata plus optional live state. async fn read_thread_view( &self, thread_id: ThreadId, include_turns: bool, ) -> Result { let loaded_thread = self.thread_manager.get_thread(thread_id).await.ok(); let mut thread = if include_turns { if let Some(loaded_thread) = loaded_thread.as_ref() { // Loaded thread with turns: use persisted metadata when it exists, // but reconstruct turns from the live ThreadStore history. let persisted_thread = self .load_persisted_thread_for_read(thread_id, /*include_turns*/ false) .await?; self.load_live_thread_view( thread_id, include_turns, loaded_thread, persisted_thread, ) .await? } else if let Some(thread) = self .load_persisted_thread_for_read(thread_id, include_turns) .await? { // Unloaded thread with turns: load metadata and history together // from the ThreadStore. thread } else { return Err(ThreadReadViewError::InvalidRequest(format!( "thread not loaded: {thread_id}" ))); } } else if let Some(thread) = self .load_persisted_thread_for_read(thread_id, include_turns) .await? { // Persisted metadata-only read: no live thread state is needed. thread } else if let Some(loaded_thread) = loaded_thread.as_ref() { // Loaded metadata-only read before persistence is materialized: build // the response from the live thread snapshot. self.load_live_thread_view( thread_id, include_turns, loaded_thread, /*persisted_thread*/ None, ) .await? } else { return Err(ThreadReadViewError::InvalidRequest(format!( "thread not loaded: {thread_id}" ))); }; let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() { matches!(loaded_thread.agent_status().await, AgentStatus::Running) } else { false }; let thread_status = self .thread_watch_manager .loaded_status_for_thread(&thread.id) .await; set_thread_status_and_interrupt_stale_turns( &mut thread, thread_status, has_live_in_progress_turn, ); Ok(thread) } async fn load_persisted_thread_for_read( &self, thread_id: ThreadId, include_turns: bool, ) -> Result, ThreadReadViewError> { let fallback_provider = self.config.model_provider_id.as_str(); match self .thread_store .read_thread(StoreReadThreadParams { thread_id, include_archived: true, include_history: include_turns, }) .await { Ok(stored_thread) => { let (mut thread, history) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); if include_turns && let Some(history) = history { thread.turns = build_api_turns_from_rollout_items(&history.items); } Ok(Some(thread)) } Err(ThreadStoreError::InvalidRequest { message }) if message == format!("no rollout found for thread id {thread_id}") => { Ok(None) } Err(ThreadStoreError::ThreadNotFound { thread_id: missing_thread_id, }) if missing_thread_id == thread_id => Ok(None), Err(ThreadStoreError::InvalidRequest { message }) => { Err(ThreadReadViewError::InvalidRequest(message)) } Err(err) => Err(ThreadReadViewError::Internal(format!( "failed to read thread: {err}" ))), } } /// Builds a `thread/read` view from a loaded thread plus optional persisted metadata. async fn load_live_thread_view( &self, thread_id: ThreadId, include_turns: bool, loaded_thread: &CodexThread, persisted_thread: Option, ) -> Result { let config_snapshot = loaded_thread.config_snapshot().await; if include_turns && config_snapshot.ephemeral { return Err(ThreadReadViewError::InvalidRequest( "ephemeral threads do not support includeTurns".to_string(), )); } let fallback_thread = build_thread_from_loaded_snapshot(thread_id, &config_snapshot, loaded_thread); let mut thread = if let Some(mut thread) = persisted_thread { if thread.path.is_none() { thread.path = fallback_thread.path.clone(); } thread.session_id.clone_from(&fallback_thread.session_id); thread.ephemeral = fallback_thread.ephemeral; thread } else { fallback_thread }; self.apply_thread_read_store_fields(thread_id, &mut thread, include_turns, loaded_thread) .await?; Ok(thread) } async fn apply_thread_read_store_fields( &self, thread_id: ThreadId, thread: &mut Thread, include_turns: bool, loaded_thread: &CodexThread, ) -> Result<(), ThreadReadViewError> { self.attach_thread_name(thread_id, thread).await; if include_turns { let history = loaded_thread .load_history(/*include_archived*/ true) .await .map_err(|err| thread_read_history_load_error(thread_id, err))?; thread.turns = build_api_turns_from_rollout_items(&history.items); } Ok(()) } async fn thread_turns_list_response_inner( &self, params: ThreadTurnsListParams, ) -> Result { let ThreadTurnsListParams { thread_id, cursor, limit, sort_direction, items_view, } = params; let items_view = items_view.unwrap_or(TurnItemsView::Summary); let thread_uuid = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; let items = self .load_thread_turns_list_history(thread_uuid) .await .map_err(thread_read_view_error)?; // This API optimizes network transfer by letting clients page through a // thread's turns incrementally, but it still replays the entire rollout on // every request. Rollback and compaction events can change earlier turns, so // the server has to rebuild the full turn list until turn metadata is indexed // separately. let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); let has_live_running_thread = match loaded_thread.as_ref() { Some(thread) => matches!(thread.agent_status().await, AgentStatus::Running), None => false, }; let active_turn = if loaded_thread.is_some() { // Persisted history may not yet include the currently running turn. The // app-server listener has already projected live turn events into ThreadState, // so merge that in-memory snapshot before paginating. let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; let state = thread_state.lock().await; state.active_turn_snapshot() } else { None }; let mut turns = reconstruct_thread_turns_for_turns_list( &items, self.thread_watch_manager .loaded_status_for_thread(&thread_uuid.to_string()) .await, has_live_running_thread, active_turn, ); for turn in &mut turns { match items_view { TurnItemsView::NotLoaded => { turn.items.clear(); turn.items_view = TurnItemsView::NotLoaded; } TurnItemsView::Summary => { let first_user_message = turn .items .iter() .find(|item| matches!(item, ThreadItem::UserMessage { .. })) .cloned(); let final_agent_message = turn .items .iter() .rev() .find(|item| matches!(item, ThreadItem::AgentMessage { .. })) .cloned(); turn.items = match (first_user_message, final_agent_message) { (Some(user_message), Some(agent_message)) if user_message.id() != agent_message.id() => { vec![user_message, agent_message] } (Some(user_message), _) => vec![user_message], (None, Some(agent_message)) => vec![agent_message], (None, None) => Vec::new(), }; turn.items_view = TurnItemsView::Summary; } TurnItemsView::Full => { turn.items_view = TurnItemsView::Full; } } } let page = paginate_thread_turns( turns, cursor.as_deref(), limit, sort_direction.unwrap_or(SortDirection::Desc), )?; Ok(ThreadTurnsListResponse { data: page.turns, next_cursor: page.next_cursor, backwards_cursor: page.backwards_cursor, }) } async fn load_thread_turns_list_history( &self, thread_id: ThreadId, ) -> Result, ThreadReadViewError> { match self .thread_store .read_thread(StoreReadThreadParams { thread_id, include_archived: true, include_history: true, }) .await { Ok(stored_thread) => { let history = stored_thread.history.ok_or_else(|| { ThreadReadViewError::Internal(format!( "thread store did not return history for thread {thread_id}" )) })?; return Ok(history.items); } Err(ThreadStoreError::InvalidRequest { message }) if message == format!("no rollout found for thread id {thread_id}") => {} Err(ThreadStoreError::ThreadNotFound { thread_id: missing_thread_id, }) if missing_thread_id == thread_id => {} Err(ThreadStoreError::InvalidRequest { message }) => { return Err(ThreadReadViewError::InvalidRequest(message)); } Err(err) => { return Err(ThreadReadViewError::Internal(format!( "failed to read thread: {err}" ))); } } let thread = self .thread_manager .get_thread(thread_id) .await .map_err(|_| { ThreadReadViewError::InvalidRequest(format!("thread not loaded: {thread_id}")) })?; let config_snapshot = thread.config_snapshot().await; if config_snapshot.ephemeral { return Err(ThreadReadViewError::InvalidRequest( "ephemeral threads do not support thread/turns/list".to_string(), )); } thread .load_history(/*include_archived*/ true) .await .map(|history| history.items) .map_err(|err| thread_turns_list_history_load_error(thread_id, err)) } pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { self.thread_manager.subscribe_thread_created() } pub(crate) async fn connection_initialized( &self, connection_id: ConnectionId, capabilities: ConnectionCapabilities, ) { self.thread_state_manager .connection_initialized(connection_id, capabilities) .await; } pub(crate) async fn connection_closed(&self, connection_id: ConnectionId) { let thread_ids = self .thread_state_manager .remove_connection(connection_id) .await; for thread_id in thread_ids { if self.thread_manager.get_thread(thread_id).await.is_err() { // Reconcile stale app-server bookkeeping when the thread has already been // removed from the core manager. self.finalize_thread_teardown(thread_id).await; } } } pub(crate) fn subscribe_running_assistant_turn_count(&self) -> watch::Receiver { self.thread_watch_manager.subscribe_running_turn_count() } /// Best-effort: ensure initialized connections are subscribed to this thread. pub(crate) async fn try_attach_thread_listener( &self, thread_id: ThreadId, connection_ids: Vec, ) { if let Ok(thread) = self.thread_manager.get_thread(thread_id).await { let config_snapshot = thread.config_snapshot().await; let loaded_thread = build_thread_from_snapshot( thread_id, thread.session_configured().session_id.to_string(), &config_snapshot, thread.rollout_path(), ); self.thread_watch_manager.upsert_thread(loaded_thread).await; } for connection_id in connection_ids { log_listener_attach_result( self.ensure_conversation_listener( thread_id, connection_id, /*raw_events_enabled*/ false, ) .await, thread_id, connection_id, "thread", ); } } async fn thread_resume_inner( &self, request_id: ConnectionRequestId, params: ThreadResumeParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result<(), JSONRPCErrorError> { if let Ok(thread_id) = ThreadId::from_string(¶ms.thread_id) && self .pending_thread_unloads .lock() .await .contains(&thread_id) { self.outgoing .send_error( request_id, invalid_request(format!( "thread {thread_id} is closing; retry thread/resume after the thread is closed" )), ) .await; return Ok(()); } if params.sandbox.is_some() && params.permissions.is_some() { self.outgoing .send_error( request_id, invalid_request("`permissions` cannot be combined with `sandbox`"), ) .await; return Ok(()); } if params.persist_extended_history { self.send_persist_extended_history_deprecation_notice(request_id.connection_id) .await; } let redact_resume_payloads = should_redact_thread_resume_payloads(app_server_client_name.as_deref()); let _thread_list_state_permit = match self.acquire_thread_list_state_permit().await { Ok(permit) => permit, Err(error) => { self.outgoing.send_error(request_id, error).await; return Ok(()); } }; match self .resume_running_thread( &request_id, ¶ms, app_server_client_name.clone(), app_server_client_version.clone(), ) .await { Ok(true) => return Ok(()), Ok(false) => {} Err(error) => { self.outgoing.send_error(request_id, error).await; return Ok(()); } } let ThreadResumeParams { thread_id, history, path, model, model_provider, service_tier, cwd, workspace_roots, approval_policy, approvals_reviewer, sandbox, permissions, config: mut request_overrides, base_instructions, developer_instructions, personality, exclude_turns, persist_extended_history: _persist_extended_history, } = params; let include_turns = !exclude_turns; let mut workspace_roots = workspace_roots; let (thread_history, resume_source_thread) = match if let Some(history) = history { self.resume_thread_from_history(history.as_slice()) .await .map(|thread_history| (thread_history, None)) } else { self.resume_thread_from_rollout(&thread_id, path.as_ref()) .await .map(|(thread_history, stored_thread)| (thread_history, Some(stored_thread))) } { Ok(value) => value, Err(error) => { self.outgoing.send_error(request_id, error).await; return Ok(()); } }; let history_cwd = thread_history.session_cwd(); let persisted_permission_state = persisted_thread_permission_state( &thread_history, history_cwd.as_deref(), resume_source_thread .as_ref() .map(|thread| &thread.sandbox_policy), ); let permission_profile_selection = if let Some(permissions) = permissions { match self .validate_active_permission_profile_selection( permissions, request_overrides.clone(), cwd.clone().map(PathBuf::from), history_cwd.clone(), ) .await { Ok(selection) => Some(selection), Err(error) => { self.outgoing.send_error(request_id, error).await; return Ok(()); } } } else if let Some(sandbox_mode) = sandbox { let sandbox_policy = sandbox_policy_from_legacy_mode(sandbox_mode); let Some(effective_cwd) = effective_cwd_for_legacy_sandbox( cwd.as_deref(), history_cwd.as_deref(), persisted_permission_state.as_ref(), ) else { self.outgoing .send_error( request_id, invalid_request("`sandbox` requires a cwd to resolve legacy permissions"), ) .await; return Ok(()); }; match resolve_legacy_sandbox_profile_selection( &sandbox_policy, persisted_permission_state .as_ref() .map(|state| CurrentPermissionProfile { permission_profile: &state.permission_profile, workspace_roots: &state.workspace_roots, }), &effective_cwd, workspace_roots.as_deref(), "sandbox", ) { Ok(LegacySandboxResolution::Noop { workspace_roots: legacy_workspace_roots, }) => { if workspace_roots.is_none() { workspace_roots = legacy_workspace_roots; } None } Ok(LegacySandboxResolution::Selection(legacy_selection)) => { if workspace_roots.is_none() { workspace_roots = legacy_selection.workspace_roots.clone(); } match self .validate_active_permission_profile_selection( legacy_selection.permissions.clone(), request_overrides.clone(), cwd.clone().map(PathBuf::from), history_cwd.clone(), ) .await .and_then(|selection| { validate_legacy_sandbox_profile_selection( &legacy_selection, &selection, &effective_cwd, workspace_roots.as_deref(), "sandbox", ) .map(|()| selection) }) { Ok(selection) => Some(selection), Err(error) => { self.outgoing.send_error(request_id, error).await; return Ok(()); } } } Err(error) => { self.outgoing.send_error(request_id, error).await; return Ok(()); } } } else { None }; let active_permission_profile = permission_profile_selection .as_ref() .map(|selection| selection.active_permission_profile.clone()) .or_else(|| { persisted_permission_state .as_ref() .and_then(|state| state.active_permission_profile.clone()) }); let expected_active_permission_profile_source = permission_profile_selection .as_ref() .map(|selection| selection.permission_profile.clone()) .or_else(|| { persisted_permission_state .as_ref() .map(|state| state.permission_profile.clone()) }); let workspace_roots_were_explicit = workspace_roots.is_some(); let mut typesafe_overrides = self.build_thread_config_overrides( model, model_provider, service_tier, cwd, workspace_roots, approval_policy, approvals_reviewer, /*sandbox*/ None, /*permissions*/ None, base_instructions, developer_instructions, personality, ); typesafe_overrides.permission_profile = expected_active_permission_profile_source.clone(); if !workspace_roots_were_explicit { if let Some(persisted_permission_state) = persisted_permission_state.as_ref() { typesafe_overrides.workspace_roots = Some( persisted_permission_state .workspace_roots .iter() .map(codex_utils_absolute_path::AbsolutePathBuf::to_path_buf) .collect(), ); } else if let Some(root) = history_cwd .as_deref() .and_then(|cwd| absolute_path_from_history_path(cwd, /*base*/ None)) { typesafe_overrides.workspace_roots = Some(vec![root.to_path_buf()]); } } self.load_and_apply_persisted_resume_metadata( &thread_history, &mut request_overrides, &mut typesafe_overrides, ) .await; // Derive a Config using the same logic as new conversation, honoring overrides if provided. let mut config = match self .config_manager .load_for_cwd(request_overrides, typesafe_overrides, history_cwd) .await { Ok(config) => config, Err(err) => { let error = config_load_error(&err); self.outgoing.send_error(request_id, error).await; return Ok(()); } }; config .permissions .set_active_permission_profile_for_current_profile( active_permission_profile, expected_active_permission_profile_source.as_ref(), ); let instruction_sources = Self::instruction_sources_from_config(&config).await; let response_history = thread_history.clone(); match self .thread_manager .resume_thread_with_history( config.clone(), thread_history, self.auth_manager.clone(), /*persist_extended_history*/ false, self.request_trace_context(&request_id).await, ) .await { Ok(NewThread { thread_id, thread: codex_thread, session_configured, .. }) => { if let Err(err) = Self::set_app_server_client_info( codex_thread.as_ref(), app_server_client_name, app_server_client_version, ) .await { self.outgoing.send_error(request_id, err).await; return Ok(()); } let SessionConfiguredEvent { rollout_path, .. } = session_configured; let Some(rollout_path) = rollout_path else { let error = internal_error(format!("rollout path missing for thread {thread_id}")); self.outgoing.send_error(request_id, error).await; return Ok(()); }; // Auto-attach a thread listener when resuming a thread. log_listener_attach_result( self.ensure_conversation_listener( thread_id, request_id.connection_id, /*raw_events_enabled*/ false, ) .await, thread_id, request_id.connection_id, "thread", ); let mut thread = match self .load_thread_from_resume_source_or_send_internal( thread_id, codex_thread.as_ref(), &response_history, rollout_path.as_path(), resume_source_thread, include_turns, ) .await { Ok(thread) => thread, Err(message) => { self.outgoing .send_error(request_id, internal_error(message)) .await; return Ok(()); } }; thread.thread_source = codex_thread .config_snapshot() .await .thread_source .map(Into::into); self.thread_watch_manager .upsert_thread(thread.clone()) .await; let thread_status = self .thread_watch_manager .loaded_status_for_thread(&thread.id) .await; set_thread_status_and_interrupt_stale_turns( &mut thread, thread_status, /*has_live_in_progress_turn*/ false, ); let config_snapshot = codex_thread.config_snapshot().await; let sandbox = thread_response_sandbox_policy( &config_snapshot.permission_profile, &config_snapshot.workspace_roots, config_snapshot.cwd.as_path(), ); let active_permission_profile = thread_response_active_permission_profile( config_snapshot.active_permission_profile, ); let token_usage_thread = include_turns.then(|| thread.clone()); if redact_resume_payloads { redact_thread_resume_payloads(&mut thread); } let response = ThreadResumeResponse { thread, model: session_configured.model, model_provider: session_configured.model_provider_id, service_tier: session_configured.service_tier, cwd: session_configured.cwd, instruction_sources, approval_policy: session_configured.approval_policy.into(), approvals_reviewer: session_configured.approvals_reviewer.into(), sandbox, workspace_roots: config_snapshot.workspace_roots, active_permission_profile, reasoning_effort: session_configured.reasoning_effort, }; let connection_id = request_id.connection_id; self.outgoing.send_response(request_id, response).await; // `excludeTurns` is explicitly the cheap resume path, so avoid // rebuilding history only to attribute a replayed usage update. if let Some(token_usage_thread) = token_usage_thread { let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( &response_history.get_rollout_items(), token_usage_thread.turns.as_slice(), ); // The client needs restored usage before it starts another turn. // Sending after the response preserves JSON-RPC request ordering while // still filling the status line before the next turn lifecycle begins. send_thread_token_usage_update_to_connection( &self.outgoing, connection_id, thread_id, &token_usage_thread, codex_thread.as_ref(), token_usage_turn_id, ) .await; } self.thread_goal_processor .emit_resume_goal_snapshot_and_continue(thread_id, codex_thread.as_ref()) .await; } Err(err) => { let error = internal_error(format!("error resuming thread: {err}")); self.outgoing.send_error(request_id, error).await; } } Ok(()) } async fn load_and_apply_persisted_resume_metadata( &self, thread_history: &InitialHistory, request_overrides: &mut Option>, typesafe_overrides: &mut ConfigOverrides, ) -> Option { let InitialHistory::Resumed(resumed_history) = thread_history else { return None; }; let state_db_ctx = self.state_db.clone()?; let persisted_metadata = state_db_ctx .get_thread(resumed_history.conversation_id) .await .ok() .flatten()?; merge_persisted_resume_metadata(request_overrides, typesafe_overrides, &persisted_metadata); Some(persisted_metadata) } async fn resume_running_thread( &self, request_id: &ConnectionRequestId, params: &ThreadResumeParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result { let running_thread = if params.history.is_some() { if let Ok(existing_thread_id) = ThreadId::from_string(¶ms.thread_id) && self .thread_manager .get_thread(existing_thread_id) .await .is_ok() { return Err(invalid_request(format!( "cannot resume thread {existing_thread_id} with history while it is already running" ))); } None } else if params.path.is_some() { let source_thread = self .read_stored_thread_for_resume( ¶ms.thread_id, params.path.as_ref(), /*include_history*/ true, ) .await?; let existing_thread_id = source_thread.thread_id; if let Ok(existing_thread) = self.thread_manager.get_thread(existing_thread_id).await { if let (Some(requested_path), Some(active_path)) = ( params.path.as_ref(), existing_thread.rollout_path().as_ref(), ) && requested_path != active_path { return Err(invalid_request(format!( "cannot resume running thread {existing_thread_id} with stale path: requested `{}`, active `{}`", requested_path.display(), active_path.display() ))); } Some((existing_thread_id, existing_thread, source_thread)) } else { None } } else if let Ok(existing_thread_id) = ThreadId::from_string(¶ms.thread_id) && let Ok(existing_thread) = self.thread_manager.get_thread(existing_thread_id).await { let source_thread = self .read_stored_thread_for_resume( ¶ms.thread_id, /*path*/ None, /*include_history*/ true, ) .await?; if source_thread.thread_id != existing_thread_id { return Err(invalid_request(format!( "cannot resume running thread {existing_thread_id} from source thread {}", source_thread.thread_id ))); } Some((existing_thread_id, existing_thread, source_thread)) } else { None }; if let Some((existing_thread_id, existing_thread, source_thread)) = running_thread { let redact_resume_payloads = should_redact_thread_resume_payloads(app_server_client_name.as_deref()); let history_items = source_thread .history .as_ref() .map(|history| history.items.clone()) .ok_or_else(|| { internal_error(format!( "thread {existing_thread_id} did not include persisted history" )) })?; let thread_state = self .thread_state_manager .thread_state(existing_thread_id) .await; self.ensure_listener_task_running( existing_thread_id, existing_thread.clone(), thread_state.clone(), ) .await?; Self::set_app_server_client_info( existing_thread.as_ref(), app_server_client_name, app_server_client_version, ) .await?; let config_snapshot = existing_thread.config_snapshot().await; let mut workspace_roots = params.workspace_roots.clone(); let permission_profile_selection = if let Some(permissions) = params.permissions.clone() { Some( self.validate_active_permission_profile_selection( permissions, /*request_overrides*/ None, /*cwd*/ None, Some(config_snapshot.cwd.to_path_buf()), ) .await?, ) } else if let Some(sandbox_mode) = params.sandbox { let sandbox_policy = sandbox_policy_from_legacy_mode(sandbox_mode); match resolve_legacy_sandbox_profile_selection( &sandbox_policy, Some(CurrentPermissionProfile { permission_profile: &config_snapshot.permission_profile, workspace_roots: &config_snapshot.workspace_roots, }), &config_snapshot.cwd, workspace_roots.as_deref(), "sandbox", )? { LegacySandboxResolution::Noop { workspace_roots: legacy_workspace_roots, } => { if workspace_roots.is_none() { workspace_roots = legacy_workspace_roots; } None } LegacySandboxResolution::Selection(legacy_selection) => { if workspace_roots.is_none() { workspace_roots = legacy_selection.workspace_roots.clone(); } let selection = self .validate_active_permission_profile_selection( legacy_selection.permissions.clone(), /*request_overrides*/ None, /*cwd*/ None, Some(config_snapshot.cwd.to_path_buf()), ) .await?; validate_legacy_sandbox_profile_selection( &legacy_selection, &selection, &config_snapshot.cwd, workspace_roots.as_deref(), "sandbox", )?; Some(selection) } } } else { None }; if workspace_roots.is_some() || permission_profile_selection.is_some() { existing_thread .update_turn_context_overrides(CodexThreadTurnContextOverrides { cwd: None, workspace_roots: workspace_roots.map(|roots| { roots .into_iter() .map(AbsolutePathBuf::into_path_buf) .collect() }), approval_policy: None, approvals_reviewer: None, sandbox_policy: None, permission_profile: permission_profile_selection .as_ref() .map(|selection| selection.permission_profile.clone()), active_permission_profile: permission_profile_selection .map(|selection| selection.active_permission_profile), windows_sandbox_level: None, model: None, effort: None, summary: None, service_tier: None, collaboration_mode: None, personality: None, }) .await .map_err(|err| { invalid_request(format!("invalid thread resume override: {err}")) })?; } let config_snapshot = existing_thread.config_snapshot().await; let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot); if !mismatch_details.is_empty() { tracing::warn!( "thread/resume overrides ignored for running thread {}: {}", existing_thread_id, mismatch_details.join("; ") ); } let mut summary_source_thread = source_thread; summary_source_thread.history = None; let mut thread_summary = self.stored_thread_to_api_thread( summary_source_thread, config_snapshot.model_provider_id.as_str(), /*include_turns*/ false, ); thread_summary.session_id = existing_thread.session_configured().session_id.to_string(); let mut config_for_instruction_sources = self.config.as_ref().clone(); config_for_instruction_sources.cwd = config_snapshot.cwd.clone(); let instruction_sources = Self::instruction_sources_from_config(&config_for_instruction_sources).await; let listener_command_tx = { let thread_state = thread_state.lock().await; thread_state.listener_command_tx() }; let Some(listener_command_tx) = listener_command_tx else { return Err(internal_error(format!( "failed to enqueue running thread resume for thread {existing_thread_id}: thread listener is not running" ))); }; let (emit_thread_goal_update, thread_goal_state_db) = self .thread_goal_processor .pending_resume_goal_state(existing_thread.as_ref()) .await; let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse( Box::new(crate::thread_state::PendingThreadResumeRequest { request_id: request_id.clone(), history_items, config_snapshot, instruction_sources, thread_summary, emit_thread_goal_update, thread_goal_state_db, include_turns: !params.exclude_turns, redact_resume_payloads, }), ); if listener_command_tx.send(command).is_err() { return Err(internal_error(format!( "failed to enqueue running thread resume for thread {existing_thread_id}: thread listener command channel is closed" ))); } return Ok(true); } Ok(false) } async fn resume_thread_from_history( &self, history: &[ResponseItem], ) -> Result { if history.is_empty() { return Err(invalid_request("history must not be empty")); } Ok(InitialHistory::Forked( history .iter() .cloned() .map(RolloutItem::ResponseItem) .collect(), )) } async fn resume_thread_from_rollout( &self, thread_id: &str, path: Option<&PathBuf>, ) -> Result<(InitialHistory, StoredThread), JSONRPCErrorError> { let stored_thread = self .read_stored_thread_for_resume(thread_id, path, /*include_history*/ true) .await?; let history = self .stored_thread_to_initial_history(&stored_thread) .await?; Ok((history, stored_thread)) } async fn read_stored_thread_for_resume( &self, thread_id: &str, path: Option<&PathBuf>, include_history: bool, ) -> Result { let result = if let Some(path) = path { self.thread_store .read_thread_by_rollout_path(StoreReadThreadByRolloutPathParams { rollout_path: path.clone(), include_archived: true, include_history, }) .await } else { let existing_thread_id = match ThreadId::from_string(thread_id) { Ok(id) => id, Err(err) => { return Err(invalid_request(format!("invalid thread id: {err}"))); } }; let params = StoreReadThreadParams { thread_id: existing_thread_id, include_archived: true, include_history, }; self.thread_store.read_thread(params).await }; result.map_err(thread_store_resume_read_error) } async fn stored_thread_to_initial_history( &self, stored_thread: &StoredThread, ) -> Result { let thread_id = stored_thread.thread_id; let history = stored_thread .history .as_ref() .map(|history| history.items.clone()) .ok_or_else(|| { internal_error(format!( "thread {thread_id} did not include persisted history" )) })?; Ok(InitialHistory::Resumed(ResumedHistory { conversation_id: thread_id, history, rollout_path: stored_thread.rollout_path.clone(), })) } fn stored_thread_to_api_thread( &self, stored_thread: StoredThread, fallback_provider: &str, include_turns: bool, ) -> Thread { let (mut thread, history) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); if include_turns && let Some(history) = history { populate_thread_turns_from_history( &mut thread, &history.items, /*active_turn*/ None, ); } thread } async fn read_stored_thread_for_new_fork( &self, thread_id: ThreadId, include_history: bool, ) -> Result { self.thread_store .read_thread(StoreReadThreadParams { thread_id, include_archived: true, include_history, }) .await .map_err(thread_store_resume_read_error) } async fn load_thread_from_resume_source_or_send_internal( &self, thread_id: ThreadId, thread: &CodexThread, thread_history: &InitialHistory, rollout_path: &Path, resume_source_thread: Option, include_turns: bool, ) -> std::result::Result { let config_snapshot = thread.config_snapshot().await; let session_id = thread.session_configured().session_id.to_string(); let thread = match thread_history { InitialHistory::Resumed(resumed) => { let fallback_provider = config_snapshot.model_provider_id.as_str(); if let Some(stored_thread) = resume_source_thread { let stored_thread = if let Some(rollout_path) = stored_thread.rollout_path.clone() { self.thread_store .read_thread_by_rollout_path(StoreReadThreadByRolloutPathParams { rollout_path, include_archived: true, include_history: false, }) .await .unwrap_or(StoredThread { history: None, ..stored_thread }) } else { self.thread_store .read_thread(StoreReadThreadParams { thread_id: stored_thread.thread_id, include_archived: true, include_history: false, }) .await .unwrap_or(StoredThread { history: None, ..stored_thread }) }; Ok(thread_from_stored_thread( stored_thread, fallback_provider, &self.config.cwd, ) .0) } else { match self .thread_store .read_thread(StoreReadThreadParams { thread_id: resumed.conversation_id, include_archived: true, include_history: false, }) .await { Ok(stored_thread) => Ok(thread_from_stored_thread( stored_thread, fallback_provider, &self.config.cwd, ) .0), Err(read_err) => { Err(format!("failed to read thread from store: {read_err}")) } } } } InitialHistory::Forked(items) => { let mut thread = build_thread_from_snapshot( thread_id, session_id.clone(), &config_snapshot, Some(rollout_path.into()), ); thread.preview = preview_from_rollout_items(items); Ok(thread) } InitialHistory::New | InitialHistory::Cleared => Err(format!( "failed to build resume response for thread {thread_id}: initial history missing" )), }; let mut thread = thread?; thread.id = thread_id.to_string(); thread.session_id = session_id; thread.path = Some(rollout_path.to_path_buf()); if include_turns { let history_items = thread_history.get_rollout_items(); populate_thread_turns_from_history( &mut thread, &history_items, /*active_turn*/ None, ); } self.attach_thread_name(thread_id, &mut thread).await; Ok(thread) } async fn attach_thread_name(&self, thread_id: ThreadId, thread: &mut Thread) { if let Ok(stored_thread) = self .thread_store .read_thread(StoreReadThreadParams { thread_id, include_archived: true, include_history: false, }) .await && let Some(title) = stored_thread.name.as_deref().map(str::trim) && !title.is_empty() && stored_thread.preview.trim() != title { set_thread_name_from_title(thread, title.to_string()); } } async fn thread_fork_inner( &self, request_id: ConnectionRequestId, params: ThreadForkParams, app_server_client_name: Option, app_server_client_version: Option, ) -> Result<(), JSONRPCErrorError> { let ThreadForkParams { thread_id, path, model, model_provider, service_tier, cwd, workspace_roots, approval_policy, approvals_reviewer, sandbox, permissions, config: cli_overrides, base_instructions, developer_instructions, ephemeral, thread_source, exclude_turns, persist_extended_history, } = params; let include_turns = !exclude_turns; if sandbox.is_some() && permissions.is_some() { return Err(invalid_request( "`permissions` cannot be combined with `sandbox`", )); } if persist_extended_history { self.send_persist_extended_history_deprecation_notice(request_id.connection_id) .await; } let source_thread = self .read_stored_thread_for_resume(&thread_id, path.as_ref(), /*include_history*/ true) .await?; let source_thread_id = source_thread.thread_id; let history_items = source_thread .history .as_ref() .map(|history| history.items.clone()) .ok_or_else(|| { internal_error(format!( "thread {source_thread_id} did not include persisted history" )) })?; let history_cwd = Some(source_thread.cwd.clone()); // Persist Windows sandbox mode. let mut cli_overrides = cli_overrides.unwrap_or_default(); if cfg!(windows) { match WindowsSandboxLevel::from_config(&self.config) { WindowsSandboxLevel::Elevated => { cli_overrides .insert("windows.sandbox".to_string(), serde_json::json!("elevated")); } WindowsSandboxLevel::RestrictedToken => { cli_overrides.insert( "windows.sandbox".to_string(), serde_json::json!("unelevated"), ); } WindowsSandboxLevel::Disabled => {} } } let request_overrides = if cli_overrides.is_empty() { None } else { Some(cli_overrides) }; let fork_history = InitialHistory::Forked(history_items.clone()); let persisted_permission_state = persisted_thread_permission_state( &fork_history, history_cwd.as_deref(), Some(&source_thread.sandbox_policy), ) .ok_or_else(|| { invalid_request("thread history is missing persisted permission configuration") })?; let mut workspace_roots = workspace_roots; let permission_profile_selection = if let Some(permissions) = permissions { Some( self.validate_active_permission_profile_selection( permissions, request_overrides.clone(), cwd.clone().map(PathBuf::from), history_cwd.clone(), ) .await?, ) } else if let Some(sandbox_mode) = sandbox { let sandbox_policy = sandbox_policy_from_legacy_mode(sandbox_mode); let effective_cwd = effective_cwd_for_legacy_sandbox( cwd.as_deref(), history_cwd.as_deref(), Some(&persisted_permission_state), ) .ok_or_else(|| { invalid_request("`sandbox` requires a cwd to resolve legacy permissions") })?; match resolve_legacy_sandbox_profile_selection( &sandbox_policy, Some(CurrentPermissionProfile { permission_profile: &persisted_permission_state.permission_profile, workspace_roots: &persisted_permission_state.workspace_roots, }), &effective_cwd, workspace_roots.as_deref(), "sandbox", )? { LegacySandboxResolution::Noop { workspace_roots: legacy_workspace_roots, } => { if workspace_roots.is_none() { workspace_roots = legacy_workspace_roots; } None } LegacySandboxResolution::Selection(legacy_selection) => { if workspace_roots.is_none() { workspace_roots = legacy_selection.workspace_roots.clone(); } let selection = self .validate_active_permission_profile_selection( legacy_selection.permissions.clone(), request_overrides.clone(), cwd.clone().map(PathBuf::from), history_cwd.clone(), ) .await?; validate_legacy_sandbox_profile_selection( &legacy_selection, &selection, &effective_cwd, workspace_roots.as_deref(), "sandbox", )?; Some(selection) } } } else { None }; let active_permission_profile = permission_profile_selection .as_ref() .map(|selection| selection.active_permission_profile.clone()) .or_else(|| persisted_permission_state.active_permission_profile.clone()); let expected_active_permission_profile_source = permission_profile_selection .as_ref() .map(|selection| selection.permission_profile.clone()) .or_else(|| Some(persisted_permission_state.permission_profile.clone())); let workspace_roots_were_explicit = workspace_roots.is_some(); let mut typesafe_overrides = self.build_thread_config_overrides( model, model_provider, service_tier, cwd, workspace_roots, approval_policy, approvals_reviewer, /*sandbox*/ None, /*permissions*/ None, base_instructions, developer_instructions, /*personality*/ None, ); typesafe_overrides.permission_profile = expected_active_permission_profile_source.clone(); if !workspace_roots_were_explicit { typesafe_overrides.workspace_roots = Some( persisted_permission_state .workspace_roots .iter() .map(codex_utils_absolute_path::AbsolutePathBuf::to_path_buf) .collect(), ); } typesafe_overrides.ephemeral = ephemeral.then_some(true); // Derive a Config using the same logic as new conversation, honoring overrides if provided. let mut config = self .config_manager .load_for_cwd(request_overrides, typesafe_overrides, history_cwd) .await .map_err(|err| config_load_error(&err))?; config .permissions .set_active_permission_profile_for_current_profile( active_permission_profile, expected_active_permission_profile_source.as_ref(), ); let fallback_model_provider = config.model_provider_id.clone(); let instruction_sources = Self::instruction_sources_from_config(&config).await; let NewThread { thread_id, thread: forked_thread, session_configured, .. } = self .thread_manager .fork_thread_from_history( ForkSnapshot::Interrupted, config, InitialHistory::Resumed(ResumedHistory { conversation_id: source_thread_id, history: history_items.clone(), rollout_path: source_thread.rollout_path.clone(), }), thread_source.map(Into::into), /*persist_extended_history*/ false, self.request_trace_context(&request_id).await, ) .await .map_err(|err| match err { CodexErr::Io(_) | CodexErr::Json(_) => { invalid_request(format!("failed to load thread {source_thread_id}: {err}")) } CodexErr::InvalidRequest(message) => invalid_request(message), err => internal_error(format!("error forking thread: {err}")), })?; Self::set_app_server_client_info( forked_thread.as_ref(), app_server_client_name, app_server_client_version, ) .await?; // Auto-attach a conversation listener when forking a thread. log_listener_attach_result( self.ensure_conversation_listener( thread_id, request_id.connection_id, /*raw_events_enabled*/ false, ) .await, thread_id, request_id.connection_id, "thread", ); // Persistent forks materialize their own rollout immediately. Ephemeral forks stay // pathless, so they rebuild their visible history from the copied source history instead. let mut thread = if session_configured.rollout_path.is_some() { let stored_thread = self .read_stored_thread_for_new_fork(thread_id, include_turns) .await?; self.stored_thread_to_api_thread( stored_thread, fallback_model_provider.as_str(), include_turns, ) } else { let config_snapshot = forked_thread.config_snapshot().await; // forked thread names do not inherit the source thread name let mut thread = build_thread_from_snapshot( thread_id, session_configured.session_id.to_string(), &config_snapshot, /*path*/ None, ); thread.preview = preview_from_rollout_items(&history_items); thread.forked_from_id = Some(source_thread_id.to_string()); if include_turns { populate_thread_turns_from_history( &mut thread, &history_items, /*active_turn*/ None, ); } thread }; thread.session_id = session_configured.session_id.to_string(); thread.thread_source = forked_thread .config_snapshot() .await .thread_source .map(Into::into); self.thread_watch_manager .upsert_thread_silently(thread.clone()) .await; thread.status = resolve_thread_status( self.thread_watch_manager .loaded_status_for_thread(&thread.id) .await, /*has_in_progress_turn*/ false, ); let config_snapshot = forked_thread.config_snapshot().await; let sandbox = thread_response_sandbox_policy( &config_snapshot.permission_profile, &config_snapshot.workspace_roots, config_snapshot.cwd.as_path(), ); let active_permission_profile = thread_response_active_permission_profile(config_snapshot.active_permission_profile); let response = ThreadForkResponse { thread: thread.clone(), model: session_configured.model, model_provider: session_configured.model_provider_id, service_tier: session_configured.service_tier, cwd: session_configured.cwd, instruction_sources, approval_policy: session_configured.approval_policy.into(), approvals_reviewer: session_configured.approvals_reviewer.into(), sandbox, workspace_roots: config_snapshot.workspace_roots, active_permission_profile, reasoning_effort: session_configured.reasoning_effort, }; let notif = thread_started_notification(thread); let connection_id = request_id.connection_id; let token_usage_thread = include_turns.then(|| response.thread.clone()); self.outgoing.send_response(request_id, response).await; // `excludeTurns` is the cheap fork path, so skip restored usage replay // instead of rebuilding history only to attribute a historical update. if let Some(token_usage_thread) = token_usage_thread { let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( &history_items, token_usage_thread.turns.as_slice(), ); // Mirror the resume contract for forks: the new thread is usable as soon // as the response arrives, so restored usage must follow immediately. send_thread_token_usage_update_to_connection( &self.outgoing, connection_id, thread_id, &token_usage_thread, forked_thread.as_ref(), token_usage_turn_id, ) .await; } self.outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) .await; Ok(()) } async fn get_thread_summary_response_inner( &self, params: GetConversationSummaryParams, ) -> Result { let fallback_provider = self.config.model_provider_id.as_str(); let read_result = match params { GetConversationSummaryParams::ThreadId { conversation_id } => self .thread_store .read_thread(StoreReadThreadParams { thread_id: conversation_id, include_archived: true, include_history: false, }) .await .map_err(|err| conversation_summary_thread_id_read_error(conversation_id, err)), GetConversationSummaryParams::RolloutPath { rollout_path } => { let Some(local_thread_store) = self .thread_store .as_any() .downcast_ref::() else { return Err(invalid_request( "rollout path queries are only supported with the local thread store", )); }; local_thread_store .read_thread_by_rollout_path( rollout_path.clone(), /*include_archived*/ true, /*include_history*/ false, ) .await .map_err(|err| conversation_summary_rollout_path_read_error(&rollout_path, err)) } }; let stored_thread = read_result?; let summary = summary_from_stored_thread(stored_thread, fallback_provider); Ok(GetConversationSummaryResponse { summary }) } async fn list_threads_common( &self, requested_page_size: usize, cursor: Option, sort_key: StoreThreadSortKey, sort_direction: SortDirection, filters: ThreadListFilters, ) -> Result<(Vec, Option), JSONRPCErrorError> { let ThreadListFilters { model_providers, source_kinds, archived, cwd_filters, search_term, use_state_db_only, } = filters; let mut cursor_obj = cursor; let mut last_cursor = cursor_obj.clone(); let mut remaining = requested_page_size; let mut items = Vec::with_capacity(requested_page_size); let mut next_cursor: Option = None; let model_provider_filter = match model_providers { Some(providers) => { if providers.is_empty() { None } else { Some(providers) } } None => Some(vec![self.config.model_provider_id.clone()]), }; let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); let allowed_sources = allowed_sources_vec.as_slice(); let store_sort_direction = match sort_direction { SortDirection::Asc => StoreSortDirection::Asc, SortDirection::Desc => StoreSortDirection::Desc, }; while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); let page = self .thread_store .list_threads(StoreListThreadsParams { page_size, cursor: cursor_obj.clone(), sort_key, sort_direction: store_sort_direction, allowed_sources: allowed_sources.to_vec(), model_providers: model_provider_filter.clone(), cwd_filters: cwd_filters.clone(), archived, search_term: search_term.clone(), use_state_db_only, }) .await .map_err(thread_store_list_error)?; let mut filtered = Vec::with_capacity(page.items.len()); for it in page.items { let source = with_thread_spawn_agent_metadata( it.source.clone(), it.agent_nickname.clone(), it.agent_role.clone(), ); if source_kind_filter .as_ref() .is_none_or(|filter| source_kind_matches(&source, filter)) && cwd_filters.as_ref().is_none_or(|expected_cwds| { expected_cwds.iter().any(|expected_cwd| { path_utils::paths_match_after_normalization(&it.cwd, expected_cwd) }) }) { filtered.push(it); if filtered.len() >= remaining { break; } } } items.extend(filtered); remaining = requested_page_size.saturating_sub(items.len()); next_cursor = page.next_cursor; if remaining == 0 { break; } let Some(cursor_val) = next_cursor.clone() else { break; }; // Break if our pagination would reuse the same cursor again; this avoids // an infinite loop when filtering drops everything on the page. if last_cursor.as_ref() == Some(&cursor_val) { next_cursor = None; break; } last_cursor = Some(cursor_val.clone()); cursor_obj = Some(cursor_val); } Ok((items, next_cursor)) } } fn xcode_26_4_mcp_elicitations_auto_deny( client_name: Option<&str>, client_version: Option<&str>, ) -> bool { // Xcode 26.4 shipped before app-server MCP elicitation requests were // client-visible. Keep elicitations auto-denied for that client line. // TODO: Remove this compatibility hack once Xcode 26.4 ages out. client_name == Some("Xcode") && client_version.is_some_and(|version| version.starts_with("26.4")) } const THREAD_TURNS_DEFAULT_LIMIT: usize = 25; const THREAD_TURNS_MAX_LIMIT: usize = 100; fn thread_backwards_cursor_for_sort_key( thread: &StoredThread, sort_key: StoreThreadSortKey, sort_direction: SortDirection, ) -> Option { let timestamp = match sort_key { StoreThreadSortKey::CreatedAt => thread.created_at, StoreThreadSortKey::UpdatedAt => thread.updated_at, }; // The state DB stores unique millisecond timestamps. Offset the reverse cursor by one // millisecond so the opposite-direction query includes the page anchor. let timestamp = match sort_direction { SortDirection::Asc => timestamp.checked_add_signed(ChronoDuration::milliseconds(1))?, SortDirection::Desc => timestamp.checked_sub_signed(ChronoDuration::milliseconds(1))?, }; Some(timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)) } struct ThreadTurnsPage { pub(super) turns: Vec, pub(super) next_cursor: Option, pub(super) backwards_cursor: Option, } #[derive(serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] struct ThreadTurnsCursor { turn_id: String, include_anchor: bool, } fn paginate_thread_turns( turns: Vec, cursor: Option<&str>, limit: Option, sort_direction: SortDirection, ) -> Result { if turns.is_empty() { return Ok(ThreadTurnsPage { turns: Vec::new(), next_cursor: None, backwards_cursor: None, }); } let anchor = cursor.map(parse_thread_turns_cursor).transpose()?; let page_size = limit .map(|value| value as usize) .unwrap_or(THREAD_TURNS_DEFAULT_LIMIT) .clamp(1, THREAD_TURNS_MAX_LIMIT); let anchor_index = anchor .as_ref() .and_then(|anchor| turns.iter().position(|turn| turn.id == anchor.turn_id)); if anchor.is_some() && anchor_index.is_none() { return Err(invalid_request( "invalid cursor: anchor turn is no longer present", )); } let mut keyed_turns: Vec<_> = turns.into_iter().enumerate().collect(); match sort_direction { SortDirection::Asc => { if let (Some(anchor), Some(anchor_index)) = (anchor.as_ref(), anchor_index) { keyed_turns.retain(|(index, _)| { if anchor.include_anchor { *index >= anchor_index } else { *index > anchor_index } }); } } SortDirection::Desc => { keyed_turns.reverse(); if let (Some(anchor), Some(anchor_index)) = (anchor.as_ref(), anchor_index) { keyed_turns.retain(|(index, _)| { if anchor.include_anchor { *index <= anchor_index } else { *index < anchor_index } }); } } } let more_turns_available = keyed_turns.len() > page_size; keyed_turns.truncate(page_size); let backwards_cursor = keyed_turns .first() .map(|(_, turn)| serialize_thread_turns_cursor(&turn.id, /*include_anchor*/ true)) .transpose()?; let next_cursor = if more_turns_available { keyed_turns .last() .map(|(_, turn)| serialize_thread_turns_cursor(&turn.id, /*include_anchor*/ false)) .transpose()? } else { None }; let turns = keyed_turns.into_iter().map(|(_, turn)| turn).collect(); Ok(ThreadTurnsPage { turns, next_cursor, backwards_cursor, }) } fn serialize_thread_turns_cursor( turn_id: &str, include_anchor: bool, ) -> Result { serde_json::to_string(&ThreadTurnsCursor { turn_id: turn_id.to_string(), include_anchor, }) .map_err(|err| internal_error(format!("failed to serialize cursor: {err}"))) } fn parse_thread_turns_cursor(cursor: &str) -> Result { serde_json::from_str(cursor).map_err(|_| invalid_request(format!("invalid cursor: {cursor}"))) } fn reconstruct_thread_turns_for_turns_list( items: &[RolloutItem], loaded_status: ThreadStatus, has_live_running_thread: bool, active_turn: Option, ) -> Vec { let has_live_in_progress_turn = has_live_running_thread || active_turn .as_ref() .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); let mut turns = build_api_turns_from_rollout_items(items); normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn); if let Some(active_turn) = active_turn { merge_turn_history_with_active_turn(&mut turns, active_turn); } turns } fn normalize_thread_turns_status( turns: &mut [Turn], loaded_status: ThreadStatus, has_live_in_progress_turn: bool, ) { let status = resolve_thread_status(loaded_status, has_live_in_progress_turn); if matches!(status, ThreadStatus::Active { .. }) { return; } for turn in turns { if matches!(turn.status, TurnStatus::InProgress) { turn.status = TurnStatus::Interrupted; } } } enum ThreadReadViewError { InvalidRequest(String), Unsupported(&'static str), Internal(String), } fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError { match err { ThreadReadViewError::InvalidRequest(message) => invalid_request(message), ThreadReadViewError::Unsupported(operation) => { unsupported_thread_store_operation(operation) } ThreadReadViewError::Internal(message) => internal_error(message), } } fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError { method_not_found(format!("{operation} is not supported yet")) } fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), ThreadStoreError::Unsupported { operation } => { unsupported_thread_store_operation(operation) } err => internal_error(format!("failed to list threads: {err}")), } } fn thread_store_resume_read_error(err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), ThreadStoreError::Unsupported { operation } => { unsupported_thread_store_operation(operation) } ThreadStoreError::ThreadNotFound { thread_id } => { invalid_request(format!("no rollout found for thread id {thread_id}")) } err => internal_error(format!("failed to read thread: {err}")), } } fn thread_turns_list_history_load_error( thread_id: ThreadId, err: ThreadStoreError, ) -> ThreadReadViewError { match err { ThreadStoreError::InvalidRequest { message } if message.starts_with("failed to resolve rollout path `") => { ThreadReadViewError::InvalidRequest(format!( "thread {thread_id} is not materialized yet; thread/turns/list is unavailable before first user message" )) } ThreadStoreError::InvalidRequest { message } => { ThreadReadViewError::InvalidRequest(message) } ThreadStoreError::Unsupported { operation } => ThreadReadViewError::Unsupported(operation), err => ThreadReadViewError::Internal(format!( "failed to load thread history for thread {thread_id}: {err}" )), } } fn thread_read_history_load_error( thread_id: ThreadId, err: ThreadStoreError, ) -> ThreadReadViewError { match err { ThreadStoreError::InvalidRequest { message } if message.starts_with("failed to resolve rollout path `") => { ThreadReadViewError::InvalidRequest(format!( "thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message" )) } ThreadStoreError::ThreadNotFound { thread_id: missing_thread_id, } if missing_thread_id == thread_id => ThreadReadViewError::InvalidRequest(format!( "thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message" )), ThreadStoreError::InvalidRequest { message } => { ThreadReadViewError::InvalidRequest(message) } ThreadStoreError::Unsupported { operation } => ThreadReadViewError::Unsupported(operation), err => ThreadReadViewError::Internal(format!( "failed to load thread history for thread {thread_id}: {err}" )), } } fn conversation_summary_thread_id_read_error( conversation_id: ThreadId, err: ThreadStoreError, ) -> JSONRPCErrorError { let no_rollout_message = format!("no rollout found for thread id {conversation_id}"); match err { ThreadStoreError::InvalidRequest { message } if message == no_rollout_message => { conversation_summary_not_found_error(conversation_id) } ThreadStoreError::Unsupported { operation } => { unsupported_thread_store_operation(operation) } ThreadStoreError::ThreadNotFound { thread_id } if thread_id == conversation_id => { conversation_summary_not_found_error(conversation_id) } ThreadStoreError::InvalidRequest { message } => invalid_request(message), err => internal_error(format!( "failed to load conversation summary for {conversation_id}: {err}" )), } } fn conversation_summary_not_found_error(conversation_id: ThreadId) -> JSONRPCErrorError { invalid_request(format!( "no rollout found for conversation id {conversation_id}" )) } fn conversation_summary_rollout_path_read_error( path: &Path, err: ThreadStoreError, ) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), ThreadStoreError::Unsupported { operation } => { unsupported_thread_store_operation(operation) } err => internal_error(format!( "failed to load conversation summary from {}: {}", path.display(), err )), } } fn core_thread_write_error(operation: &str, err: CodexErr) -> JSONRPCErrorError { match err { CodexErr::ThreadNotFound(thread_id) => { invalid_request(format!("thread not found: {thread_id}")) } CodexErr::InvalidRequest(message) => invalid_request(message), CodexErr::UnsupportedOperation(message) => method_not_found(message), err => internal_error(format!("failed to {operation}: {err}")), } } fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), ThreadStoreError::Unsupported { operation: unsupported_operation, } => unsupported_thread_store_operation(unsupported_operation), err => internal_error(format!("failed to {operation} thread: {err}")), } } fn set_thread_name_from_title(thread: &mut Thread, title: String) { if title.trim().is_empty() || thread.preview.trim() == title.trim() { return; } thread.name = Some(title); } pub(crate) fn thread_from_stored_thread( thread: StoredThread, fallback_provider: &str, fallback_cwd: &AbsolutePathBuf, ) -> (Thread, Option) { let path = thread.rollout_path; let git_info = thread.git_info.map(|info| ApiGitInfo { sha: info.commit_hash.map(|sha| sha.0), branch: info.branch, origin_url: info.repository_url, }); let cwd = AbsolutePathBuf::relative_to_current_dir(path_utils::normalize_for_native_workdir( thread.cwd, )) .unwrap_or_else(|err| { warn!("failed to normalize thread cwd while reading stored thread: {err}"); fallback_cwd.clone() }); let source = with_thread_spawn_agent_metadata( thread.source, thread.agent_nickname.clone(), thread.agent_role.clone(), ); let history = thread.history; let thread_id = thread.thread_id.to_string(); let thread = Thread { id: thread_id.clone(), session_id: thread_id, forked_from_id: thread.forked_from_id.map(|id| id.to_string()), preview: thread.preview, ephemeral: false, model_provider: if thread.model_provider.is_empty() { fallback_provider.to_string() } else { thread.model_provider }, created_at: thread.created_at.timestamp(), updated_at: thread.updated_at.timestamp(), status: ThreadStatus::NotLoaded, path, cwd, cli_version: thread.cli_version, agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), source: source.into(), thread_source: thread.thread_source.map(Into::into), git_info, name: thread.name, turns: Vec::new(), }; (thread, history) } fn summary_from_stored_thread( thread: StoredThread, fallback_provider: &str, ) -> ConversationSummary { let path = thread.rollout_path.unwrap_or_default(); let source = with_thread_spawn_agent_metadata( thread.source, thread.agent_nickname.clone(), thread.agent_role.clone(), ); let git_info = thread.git_info.map(|git| ConversationGitInfo { sha: git.commit_hash.map(|sha| sha.0), branch: git.branch, origin_url: git.repository_url, }); ConversationSummary { conversation_id: thread.thread_id, path, preview: thread.preview, // Preserve millisecond precision from the thread store so thread/list cursors // round-trip the same ordering key used by pagination queries. timestamp: Some( thread .created_at .to_rfc3339_opts(SecondsFormat::Millis, true), ), updated_at: Some( thread .updated_at .to_rfc3339_opts(SecondsFormat::Millis, true), ), model_provider: if thread.model_provider.is_empty() { fallback_provider.to_string() } else { thread.model_provider }, cwd: thread.cwd, cli_version: thread.cli_version, source, git_info, } } #[allow(clippy::too_many_arguments)] #[cfg(test)] fn summary_from_state_db_metadata( conversation_id: ThreadId, path: PathBuf, first_user_message: Option, preview: Option, timestamp: String, updated_at: String, model_provider: String, cwd: PathBuf, cli_version: String, source: String, _thread_source: Option, agent_nickname: Option, agent_role: Option, git_sha: Option, git_branch: Option, git_origin_url: Option, ) -> ConversationSummary { let preview = preview.or(first_user_message).unwrap_or_default(); let source = serde_json::from_str(&source) .or_else(|_| serde_json::from_value(serde_json::Value::String(source.clone()))) .unwrap_or(codex_protocol::protocol::SessionSource::Unknown); let source = with_thread_spawn_agent_metadata(source, agent_nickname, agent_role); let git_info = if git_sha.is_none() && git_branch.is_none() && git_origin_url.is_none() { None } else { Some(ConversationGitInfo { sha: git_sha, branch: git_branch, origin_url: git_origin_url, }) }; ConversationSummary { conversation_id, path, preview, timestamp: Some(timestamp), updated_at: Some(updated_at), model_provider, cwd, cli_version, source, git_info, } } #[cfg(test)] fn summary_from_thread_metadata(metadata: &ThreadMetadata) -> ConversationSummary { summary_from_state_db_metadata( metadata.id, metadata.rollout_path.clone(), metadata.first_user_message.clone(), metadata.preview.clone(), metadata .created_at .to_rfc3339_opts(SecondsFormat::Secs, true), metadata .updated_at .to_rfc3339_opts(SecondsFormat::Secs, true), metadata.model_provider.clone(), metadata.cwd.clone(), metadata.cli_version.clone(), metadata.source.clone(), metadata.thread_source, metadata.agent_nickname.clone(), metadata.agent_role.clone(), metadata.git_sha.clone(), metadata.git_branch.clone(), metadata.git_origin_url.clone(), ) } fn preview_from_rollout_items(items: &[RolloutItem]) -> String { items .iter() .find_map(|item| match item { RolloutItem::ResponseItem(item) => match codex_core::parse_turn_item(item) { Some(codex_protocol::items::TurnItem::UserMessage(user)) => Some(user.message()), _ => None, }, _ => None, }) .map(|preview| match preview.find(USER_MESSAGE_BEGIN) { Some(idx) => preview[idx + USER_MESSAGE_BEGIN.len()..].trim().to_string(), None => preview, }) .unwrap_or_default() } fn requested_permissions_trust_project(overrides: &ConfigOverrides, cwd: &Path) -> bool { if matches!( overrides.sandbox_mode, Some( codex_protocol::config_types::SandboxMode::WorkspaceWrite | codex_protocol::config_types::SandboxMode::DangerFullAccess ) ) { return true; } if matches!( overrides.default_permissions.as_deref(), Some(":workspace" | ":danger-no-sandbox") ) { return true; } overrides .permission_profile .as_ref() .is_some_and(|profile| permission_profile_trusts_project(profile, cwd)) } fn permission_profile_trusts_project( profile: &codex_protocol::models::PermissionProfile, cwd: &Path, ) -> bool { match profile { codex_protocol::models::PermissionProfile::Disabled | codex_protocol::models::PermissionProfile::External { .. } => true, codex_protocol::models::PermissionProfile::Managed { .. } => profile .file_system_sandbox_policy() .can_write_path_with_cwd(cwd, cwd), } } fn build_thread_from_snapshot( thread_id: ThreadId, session_id: String, config_snapshot: &ThreadConfigSnapshot, path: Option, ) -> Thread { let now = time::OffsetDateTime::now_utc().unix_timestamp(); Thread { id: thread_id.to_string(), session_id, forked_from_id: None, preview: String::new(), ephemeral: config_snapshot.ephemeral, model_provider: config_snapshot.model_provider_id.clone(), created_at: now, updated_at: now, status: ThreadStatus::NotLoaded, path, cwd: config_snapshot.cwd.clone(), cli_version: env!("CARGO_PKG_VERSION").to_string(), agent_nickname: config_snapshot.session_source.get_nickname(), agent_role: config_snapshot.session_source.get_agent_role(), source: config_snapshot.session_source.clone().into(), thread_source: config_snapshot.thread_source.map(Into::into), git_info: None, name: None, turns: Vec::new(), } } fn build_thread_from_loaded_snapshot( thread_id: ThreadId, config_snapshot: &ThreadConfigSnapshot, loaded_thread: &CodexThread, ) -> Thread { build_thread_from_snapshot( thread_id, loaded_thread.session_configured().session_id.to_string(), config_snapshot, loaded_thread.rollout_path(), ) } #[cfg(test)] #[path = "thread_processor_tests.rs"] mod thread_processor_tests;