diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index edc142c840..1767b6372a 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -47,9 +47,6 @@ use codex_app_server_protocol::CodexErrorInfo; use codex_app_server_protocol::CollaborationModeListParams; use codex_app_server_protocol::CollaborationModeListResponse; use codex_app_server_protocol::CommandExecParams; -use codex_app_server_protocol::CommandExecResizeParams; -use codex_app_server_protocol::CommandExecTerminateParams; -use codex_app_server_protocol::CommandExecWriteParams; use codex_app_server_protocol::ConversationGitInfo; use codex_app_server_protocol::ConversationSummary; use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec; @@ -196,7 +193,6 @@ use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse; use codex_app_server_protocol::ThreadRealtimeAppendTextParams; use codex_app_server_protocol::ThreadRealtimeAppendTextResponse; -use codex_app_server_protocol::ThreadRealtimeListVoicesParams; use codex_app_server_protocol::ThreadRealtimeListVoicesResponse; use codex_app_server_protocol::ThreadRealtimeStartParams; use codex_app_server_protocol::ThreadRealtimeStartResponse; @@ -1005,352 +1001,378 @@ impl CodexMessageProcessor { app_server_client_version: Option, request_context: RequestContext, ) { - let to_connection_request_id = |request_id| ConnectionRequestId { + let request_id = ConnectionRequestId { connection_id, - request_id, + request_id: request.id().clone(), }; - match request { + let response: Result, JSONRPCErrorError> = match request { ClientRequest::Initialize { .. } => { panic!("Initialize should be handled in MessageProcessor"); } // === v2 Thread/Turn APIs === - ClientRequest::ThreadStart { request_id, params } => { - self.thread_start( - to_connection_request_id(request_id), + ClientRequest::ThreadStart { params, .. } => self + .thread_start( + request_id.clone(), params, app_server_client_name.clone(), app_server_client_version.clone(), request_context, ) - .await; + .await + .map(|()| None), + ClientRequest::ThreadUnsubscribe { params, .. } => self + .thread_unsubscribe_response(params, request_id.connection_id) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadResume { params, .. } => self + .thread_resume(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::ThreadFork { params, .. } => self + .thread_fork(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::ThreadArchive { params, .. } => { + match self.thread_archive(params).await { + Ok((response, archived_thread_ids)) => { + self.outgoing + .send_response(request_id.clone(), response) + .await; + for thread_id in archived_thread_ids { + self.outgoing + .send_server_notification(ServerNotification::ThreadArchived( + ThreadArchivedNotification { thread_id }, + )) + .await; + } + Ok(None) + } + Err(error) => Err(error), + } } - ClientRequest::ThreadUnsubscribe { request_id, params } => { - self.thread_unsubscribe(to_connection_request_id(request_id), params) - .await; + ClientRequest::ThreadIncrementElicitation { params, .. } => self + .thread_increment_elicitation(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadDecrementElicitation { params, .. } => self + .thread_decrement_elicitation(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadSetName { params, .. } => { + match self.thread_set_name_response(&request_id, params).await { + Ok((response, notification)) => { + self.outgoing + .send_response(request_id.clone(), response) + .await; + if let Some(notification) = notification { + self.outgoing + .send_server_notification(ServerNotification::ThreadNameUpdated( + notification, + )) + .await; + } + Ok(None) + } + Err(error) => Err(error), + } } - ClientRequest::ThreadResume { request_id, params } => { - self.thread_resume(to_connection_request_id(request_id), params) - .await; + ClientRequest::ThreadGoalSet { params, .. } => self + .thread_goal_set(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::ThreadGoalGet { params, .. } => self + .thread_goal_get(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadGoalClear { params, .. } => self + .thread_goal_clear(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::ThreadMetadataUpdate { params, .. } => self + .thread_metadata_update_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadMemoryModeSet { params, .. } => self + .thread_memory_mode_set_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::MemoryReset { .. } => self + .memory_reset_response() + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadUnarchive { params, .. } => { + match self.thread_unarchive(params).await { + Ok((response, notification)) => { + self.outgoing + .send_response(request_id.clone(), response) + .await; + self.outgoing + .send_server_notification(ServerNotification::ThreadUnarchived( + notification, + )) + .await; + Ok(None) + } + Err(error) => Err(error), + } } - ClientRequest::ThreadFork { request_id, params } => { - self.thread_fork(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadArchive { request_id, params } => { - self.thread_archive(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadIncrementElicitation { request_id, params } => { - self.thread_increment_elicitation(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadDecrementElicitation { request_id, params } => { - self.thread_decrement_elicitation(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadSetName { request_id, params } => { - self.thread_set_name(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadGoalSet { request_id, params } => { - self.thread_goal_set(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadGoalGet { request_id, params } => { - self.thread_goal_get(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadGoalClear { request_id, params } => { - self.thread_goal_clear(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadMetadataUpdate { request_id, params } => { - self.thread_metadata_update(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadMemoryModeSet { request_id, params } => { - self.thread_memory_mode_set(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::MemoryReset { request_id, params } => { - self.memory_reset(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadUnarchive { request_id, params } => { - self.thread_unarchive(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadCompactStart { request_id, params } => { - self.thread_compact_start(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadBackgroundTerminalsClean { request_id, params } => { - self.thread_background_terminals_clean( - to_connection_request_id(request_id), - params, - ) - .await; - } - ClientRequest::ThreadRollback { request_id, params } => { - self.thread_rollback(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadList { request_id, params } => { - self.thread_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadLoadedList { request_id, params } => { - self.thread_loaded_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadRead { request_id, params } => { - self.thread_read(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadTurnsList { request_id, params } => { - self.thread_turns_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadShellCommand { request_id, params } => { - self.thread_shell_command(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadApproveGuardianDeniedAction { request_id, params } => { - self.thread_approve_guardian_denied_action( - to_connection_request_id(request_id), - params, - ) - .await; - } - ClientRequest::SkillsList { request_id, params } => { - self.skills_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::HooksList { request_id, params } => { - self.hooks_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::MarketplaceAdd { request_id, params } => { - self.marketplace_add(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::MarketplaceRemove { request_id, params } => { - self.marketplace_remove(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::MarketplaceUpgrade { request_id, params } => { - self.marketplace_upgrade(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginList { request_id, params } => { - self.plugin_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginRead { request_id, params } => { - self.plugin_read(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginSkillRead { request_id, params } => { - self.plugin_skill_read(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginShareSave { request_id, params } => { - self.plugin_share_save(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginShareList { request_id, params } => { - self.plugin_share_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginShareDelete { request_id, params } => { - self.plugin_share_delete(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::AppsList { request_id, params } => { - self.apps_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::SkillsConfigWrite { request_id, params } => { - self.skills_config_write(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginInstall { request_id, params } => { - self.plugin_install(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::PluginUninstall { request_id, params } => { - self.plugin_uninstall(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::TurnStart { request_id, params } => { - self.turn_start( - to_connection_request_id(request_id), + ClientRequest::ThreadCompactStart { params, .. } => self + .thread_compact_start(&request_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadBackgroundTerminalsClean { params, .. } => self + .thread_background_terminals_clean(&request_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadRollback { params, .. } => self + .thread_rollback(&request_id, params) + .await + .map(|()| None), + ClientRequest::ThreadList { params, .. } => self + .thread_list_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadLoadedList { params, .. } => self + .thread_loaded_list_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadRead { params, .. } => self + .thread_read_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadTurnsList { params, .. } => self + .thread_turns_list_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadShellCommand { params, .. } => self + .thread_shell_command(&request_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadApproveGuardianDeniedAction { params, .. } => self + .thread_approve_guardian_denied_action(&request_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::SkillsList { params, .. } => self + .skills_list_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::HooksList { params, .. } => self + .hooks_list_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::MarketplaceAdd { params, .. } => self + .marketplace_add(params) + .await + .map(|response| Some(response.into())), + ClientRequest::MarketplaceRemove { params, .. } => self + .marketplace_remove(params) + .await + .map(|response| Some(response.into())), + ClientRequest::MarketplaceUpgrade { params, .. } => self + .marketplace_upgrade_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginList { params, .. } => self + .plugin_list(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginRead { params, .. } => self + .plugin_read(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginSkillRead { params, .. } => self + .plugin_skill_read(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginShareSave { params, .. } => self + .plugin_share_save(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginShareList { params, .. } => self + .plugin_share_list(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginShareDelete { params, .. } => self + .plugin_share_delete(params) + .await + .map(|response| Some(response.into())), + ClientRequest::AppsList { params, .. } => self + .apps_list(&request_id, params) + .await + .map(|response| response.map(Into::into)), + ClientRequest::SkillsConfigWrite { params, .. } => self + .skills_config_write_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginInstall { params, .. } => self + .plugin_install(params) + .await + .map(|response| Some(response.into())), + ClientRequest::PluginUninstall { params, .. } => self + .plugin_uninstall(params) + .await + .map(|response| Some(response.into())), + ClientRequest::TurnStart { params, .. } => self + .turn_start( + request_id.clone(), params, app_server_client_name.clone(), app_server_client_version.clone(), ) - .await; + .await + .map(|response| Some(response.into())), + ClientRequest::ThreadInjectItems { params, .. } => self + .thread_inject_items_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::TurnSteer { params, .. } => self + .turn_steer(&request_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::TurnInterrupt { params, .. } => self + .turn_interrupt(&request_id, params) + .await + .map(|response| response.map(Into::into)), + ClientRequest::ThreadRealtimeStart { params, .. } => self + .thread_realtime_start(&request_id, params) + .await + .map(|response| response.map(Into::into)), + ClientRequest::ThreadRealtimeAppendAudio { params, .. } => self + .thread_realtime_append_audio(&request_id, params) + .await + .map(|response| response.map(Into::into)), + ClientRequest::ThreadRealtimeAppendText { params, .. } => self + .thread_realtime_append_text(&request_id, params) + .await + .map(|response| response.map(Into::into)), + ClientRequest::ThreadRealtimeStop { params, .. } => self + .thread_realtime_stop(&request_id, params) + .await + .map(|response| response.map(Into::into)), + ClientRequest::ThreadRealtimeListVoices { params: _, .. } => Ok(Some( + ThreadRealtimeListVoicesResponse { + voices: RealtimeVoicesList::builtin(), + } + .into(), + )), + ClientRequest::ReviewStart { params, .. } => { + self.review_start(&request_id, params).await.map(|()| None) } - ClientRequest::ThreadInjectItems { request_id, params } => { - self.thread_inject_items(to_connection_request_id(request_id), params) - .await; + ClientRequest::GetConversationSummary { params, .. } => self + .get_thread_summary_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ModelList { params, .. } => { + Self::list_models(self.thread_manager.clone(), params) + .await + .map(|response| Some(response.into())) } - ClientRequest::TurnSteer { request_id, params } => { - self.turn_steer(to_connection_request_id(request_id), params) - .await; + ClientRequest::ExperimentalFeatureList { params, .. } => self + .experimental_feature_list_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::CollaborationModeList { params, .. } => { + Self::list_collaboration_modes(self.thread_manager.clone(), params) + .await + .map(|response| Some(response.into())) } - ClientRequest::TurnInterrupt { request_id, params } => { - self.turn_interrupt(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadRealtimeStart { request_id, params } => { - self.thread_realtime_start(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadRealtimeAppendAudio { request_id, params } => { - self.thread_realtime_append_audio(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadRealtimeAppendText { request_id, params } => { - self.thread_realtime_append_text(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadRealtimeStop { request_id, params } => { - self.thread_realtime_stop(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ThreadRealtimeListVoices { request_id, params } => { - self.thread_realtime_list_voices(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ReviewStart { request_id, params } => { - self.review_start(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::GetConversationSummary { request_id, params } => { - self.get_thread_summary(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::ModelList { request_id, params } => { - let outgoing = self.outgoing.clone(); - let thread_manager = self.thread_manager.clone(); - let request_id = to_connection_request_id(request_id); - - tokio::spawn(async move { - Self::list_models(outgoing, thread_manager, request_id, params).await; - }); - } - ClientRequest::ExperimentalFeatureList { request_id, params } => { - self.experimental_feature_list(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::CollaborationModeList { request_id, params } => { - let outgoing = self.outgoing.clone(); - let thread_manager = self.thread_manager.clone(); - let request_id = to_connection_request_id(request_id); - - tokio::spawn(async move { - Self::list_collaboration_modes(outgoing, thread_manager, request_id, params) - .await; - }); - } - ClientRequest::MockExperimentalMethod { request_id, params } => { - self.mock_experimental_method(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::McpServerOauthLogin { request_id, params } => { - self.mcp_server_oauth_login(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::McpServerRefresh { request_id, params } => { - self.mcp_server_refresh(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::McpServerStatusList { request_id, params } => { - self.list_mcp_server_status(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::McpResourceRead { request_id, params } => { - self.read_mcp_resource(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::McpServerToolCall { request_id, params } => { - self.call_mcp_server_tool(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::WindowsSandboxSetupStart { request_id, params } => { - self.windows_sandbox_setup_start(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::LoginAccount { request_id, params } => { - self.login_v2(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::LogoutAccount { - request_id, - params: _, - } => { - self.logout_v2(to_connection_request_id(request_id)).await; - } - ClientRequest::CancelLoginAccount { request_id, params } => { - self.cancel_login_v2(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::GetAccount { request_id, params } => { - self.get_account(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::GitDiffToRemote { request_id, params } => { - self.git_diff_to_origin(to_connection_request_id(request_id), params.cwd) - .await; - } - ClientRequest::GetAuthStatus { request_id, params } => { - self.get_auth_status(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::FuzzyFileSearch { request_id, params } => { - self.fuzzy_file_search(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::FuzzyFileSearchSessionStart { request_id, params } => { - self.fuzzy_file_search_session_start(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::FuzzyFileSearchSessionUpdate { request_id, params } => { - self.fuzzy_file_search_session_update(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::FuzzyFileSearchSessionStop { request_id, params } => { - self.fuzzy_file_search_session_stop(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::OneOffCommandExec { request_id, params } => { - self.exec_one_off_command(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::CommandExecWrite { request_id, params } => { - self.command_exec_write(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::CommandExecResize { request_id, params } => { - self.command_exec_resize(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::CommandExecTerminate { request_id, params } => { - self.command_exec_terminate(to_connection_request_id(request_id), params) - .await; + ClientRequest::MockExperimentalMethod { params, .. } => self + .mock_experimental_method(params) + .await + .map(|response| Some(response.into())), + ClientRequest::McpServerOauthLogin { params, .. } => self + .mcp_server_oauth_login_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::McpServerRefresh { params, .. } => self + .mcp_server_refresh(params) + .await + .map(|response| Some(response.into())), + ClientRequest::McpServerStatusList { params, .. } => self + .list_mcp_server_status(&request_id, params) + .await + .map(|()| None), + ClientRequest::McpResourceRead { params, .. } => self + .read_mcp_resource(&request_id, params) + .await + .map(|()| None), + ClientRequest::McpServerToolCall { params, .. } => self + .call_mcp_server_tool(&request_id, params) + .await + .map(|()| None), + ClientRequest::WindowsSandboxSetupStart { params, .. } => self + .windows_sandbox_setup_start(&request_id, params) + .await + .map(|()| None), + ClientRequest::LoginAccount { params, .. } => self + .login_v2(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::LogoutAccount { .. } => { + self.logout_v2(request_id.clone()).await.map(|()| None) } + ClientRequest::CancelLoginAccount { params, .. } => self + .cancel_login_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::GetAccount { params, .. } => self + .get_account_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::GitDiffToRemote { params, .. } => self + .git_diff_to_origin(params.cwd) + .await + .map(|response| Some(response.into())), + ClientRequest::GetAuthStatus { params, .. } => self + .get_auth_status(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FuzzyFileSearch { params, .. } => self + .fuzzy_file_search(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FuzzyFileSearchSessionStart { params, .. } => self + .fuzzy_file_search_session_start_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FuzzyFileSearchSessionUpdate { params, .. } => self + .fuzzy_file_search_session_update_response(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FuzzyFileSearchSessionStop { params, .. } => self + .fuzzy_file_search_session_stop(params) + .await + .map(|response| Some(response.into())), + ClientRequest::OneOffCommandExec { params, .. } => self + .exec_one_off_command(&request_id, params) + .await + .map(|()| None), + ClientRequest::CommandExecWrite { params, .. } => self + .command_exec_manager + .write(request_id.clone(), params) + .await + .map(|response| Some(response.into())), + ClientRequest::CommandExecResize { params, .. } => self + .command_exec_manager + .resize(request_id.clone(), params) + .await + .map(|response| Some(response.into())), + ClientRequest::CommandExecTerminate { params, .. } => self + .command_exec_manager + .terminate(request_id.clone(), params) + .await + .map(|response| Some(response.into())), ClientRequest::DeviceKeyCreate { .. } | ClientRequest::DeviceKeyPublic { .. } | ClientRequest::DeviceKeySign { .. } => { - warn!("Device key request reached CodexMessageProcessor unexpectedly"); + unreachable!("device key requests are handled by MessageProcessor") } ClientRequest::ConfigRead { .. } | ClientRequest::ConfigValueWrite { .. } | ClientRequest::ConfigBatchWrite { .. } | ClientRequest::ExperimentalFeatureEnablementSet { .. } => { - warn!("Config request reached CodexMessageProcessor unexpectedly"); + unreachable!("config requests are handled by MessageProcessor") } ClientRequest::FsReadFile { .. } | ClientRequest::FsWriteFile { .. } @@ -1361,39 +1383,50 @@ impl CodexMessageProcessor { | ClientRequest::FsCopy { .. } | ClientRequest::FsWatch { .. } | ClientRequest::FsUnwatch { .. } => { - warn!("Filesystem request reached CodexMessageProcessor unexpectedly"); + unreachable!("filesystem requests are handled by MessageProcessor") } ClientRequest::ConfigRequirementsRead { .. } => { - warn!("ConfigRequirementsRead request reached CodexMessageProcessor unexpectedly"); + unreachable!("config requirements requests are handled by MessageProcessor") } ClientRequest::ModelProviderCapabilitiesRead { .. } => { - warn!( - "ModelProviderCapabilitiesRead request reached CodexMessageProcessor unexpectedly" - ); + unreachable!("model provider capabilities requests are handled by MessageProcessor") } ClientRequest::ExternalAgentConfigDetect { .. } | ClientRequest::ExternalAgentConfigImport { .. } => { - warn!("ExternalAgentConfig request reached CodexMessageProcessor unexpectedly"); + unreachable!("external agent config requests are handled by MessageProcessor") } - ClientRequest::GetAccountRateLimits { - request_id, - params: _, - } => { - self.get_account_rate_limits(to_connection_request_id(request_id)) + ClientRequest::GetAccountRateLimits { .. } => self + .get_account_rate_limits() + .await + .map(|response| Some(response.into())), + ClientRequest::SendAddCreditsNudgeEmail { params, .. } => self + .send_add_credits_nudge_email(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FeedbackUpload { params, .. } => self + .upload_feedback_response(params) + .await + .map(|response| Some(response.into())), + }; + + match response { + Ok(Some(response)) => { + self.outgoing + .send_response_as(request_id.clone(), response) .await; } - ClientRequest::SendAddCreditsNudgeEmail { request_id, params } => { - self.send_add_credits_nudge_email(to_connection_request_id(request_id), params) - .await; - } - ClientRequest::FeedbackUpload { request_id, params } => { - self.upload_feedback(to_connection_request_id(request_id), params) - .await; + Ok(None) => {} + Err(error) => { + self.outgoing.send_error(request_id.clone(), error).await; } } } - async fn login_v2(&self, request_id: ConnectionRequestId, params: LoginAccountParams) { + async fn login_v2( + &self, + request_id: ConnectionRequestId, + params: LoginAccountParams, + ) -> Result<(), JSONRPCErrorError> { match params { LoginAccountParams::ApiKey { api_key } => { self.login_api_key_v2(request_id, LoginApiKeyParams { api_key }) @@ -1422,6 +1455,7 @@ impl CodexMessageProcessor { .await; } } + Ok(()) } fn external_auth_active_error(&self) -> JSONRPCErrorError { @@ -1726,15 +1760,6 @@ impl CodexMessageProcessor { } } - async fn cancel_login_v2( - &self, - request_id: ConnectionRequestId, - params: CancelLoginAccountParams, - ) { - let result = self.cancel_login_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn cancel_login_response( &self, params: CancelLoginAccountParams, @@ -1923,7 +1948,7 @@ impl CodexMessageProcessor { .map(CodexAuth::api_auth_mode)) } - async fn logout_v2(&self, request_id: ConnectionRequestId) { + async fn logout_v2(&self, request_id: ConnectionRequestId) -> Result<(), JSONRPCErrorError> { let result = self.logout_common().await; let account_updated = result @@ -1943,6 +1968,7 @@ impl CodexMessageProcessor { .send_server_notification(ServerNotification::AccountUpdated(payload)) .await; } + Ok(()) } async fn refresh_token_if_requested(&self, do_refresh: bool) -> RefreshTokenRequestOutcome { @@ -1960,7 +1986,10 @@ impl CodexMessageProcessor { RefreshTokenRequestOutcome::NotAttemptedOrSucceeded } - async fn get_auth_status(&self, request_id: ConnectionRequestId, params: GetAuthStatusParams) { + async fn get_auth_status( + &self, + params: GetAuthStatusParams, + ) -> Result { let include_token = params.include_token.unwrap_or(false); let do_refresh = params.refresh_token.unwrap_or(false); @@ -2020,12 +2049,7 @@ impl CodexMessageProcessor { } }; - self.outgoing.send_response(request_id, response).await; - } - - async fn get_account(&self, request_id: ConnectionRequestId, params: GetAccountParams) { - let result = self.get_account_response(params).await; - self.outgoing.send_result(request_id, result).await; + Ok(response) } async fn get_account_response( @@ -2056,34 +2080,31 @@ impl CodexMessageProcessor { }) } - async fn get_account_rate_limits(&self, request_id: ConnectionRequestId) { - let result = - self.fetch_account_rate_limits() - .await - .map( - |(rate_limits, rate_limits_by_limit_id)| GetAccountRateLimitsResponse { - rate_limits: rate_limits.into(), - rate_limits_by_limit_id: Some( - rate_limits_by_limit_id - .into_iter() - .map(|(limit_id, snapshot)| (limit_id, snapshot.into())) - .collect(), - ), - }, - ); - self.outgoing.send_result(request_id, result).await; + async fn get_account_rate_limits( + &self, + ) -> Result { + self.fetch_account_rate_limits() + .await + .map( + |(rate_limits, rate_limits_by_limit_id)| GetAccountRateLimitsResponse { + rate_limits: rate_limits.into(), + rate_limits_by_limit_id: Some( + rate_limits_by_limit_id + .into_iter() + .map(|(limit_id, snapshot)| (limit_id, snapshot.into())) + .collect(), + ), + }, + ) } async fn send_add_credits_nudge_email( &self, - request_id: ConnectionRequestId, params: SendAddCreditsNudgeEmailParams, - ) { - let result = self - .send_add_credits_nudge_email_inner(params) + ) -> Result { + self.send_add_credits_nudge_email_inner(params) .await - .map(|status| SendAddCreditsNudgeEmailResponse { status }); - self.outgoing.send_result(request_id, result).await; + .map(|status| SendAddCreditsNudgeEmailResponse { status }) } async fn send_add_credits_nudge_email_inner( @@ -2208,14 +2229,11 @@ impl CodexMessageProcessor { async fn exec_one_off_command( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: CommandExecParams, - ) { - let result = self - .exec_one_off_command_inner(request_id.clone(), params) + ) -> Result<(), JSONRPCErrorError> { + self.exec_one_off_command_inner(request_id.clone(), params) .await - .map(|()| None::); - self.send_optional_result(request_id, result).await; } async fn exec_one_off_command_inner( @@ -2453,42 +2471,6 @@ impl CodexMessageProcessor { .preserve_deny_read_restrictions_from(configured_file_system_sandbox_policy); } - async fn command_exec_write( - &self, - request_id: ConnectionRequestId, - params: CommandExecWriteParams, - ) { - let result = self - .command_exec_manager - .write(request_id.clone(), params) - .await; - self.outgoing.send_result(request_id, result).await; - } - - async fn command_exec_resize( - &self, - request_id: ConnectionRequestId, - params: CommandExecResizeParams, - ) { - let result = self - .command_exec_manager - .resize(request_id.clone(), params) - .await; - self.outgoing.send_result(request_id, result).await; - } - - async fn command_exec_terminate( - &self, - request_id: ConnectionRequestId, - params: CommandExecTerminateParams, - ) { - let result = self - .command_exec_manager - .terminate(request_id.clone(), params) - .await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_start( &self, request_id: ConnectionRequestId, @@ -2496,7 +2478,7 @@ impl CodexMessageProcessor { app_server_client_name: Option, app_server_client_version: Option, request_context: RequestContext, - ) { + ) -> Result<(), JSONRPCErrorError> { let ThreadStartParams { model, model_provider, @@ -2520,21 +2502,11 @@ impl CodexMessageProcessor { persist_extended_history, } = params; if sandbox.is_some() && permissions.is_some() { - self.outgoing - .send_error( - request_id, - invalid_request("`permissions` cannot be combined with `sandbox`"), - ) - .await; - return; + return Err(invalid_request( + "`permissions` cannot be combined with `sandbox`", + )); } - let environment_selections = match self.parse_environment_selections(environments) { - Ok(environment_selections) => environment_selections, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let environment_selections = self.parse_environment_selections(environments)?; let mut typesafe_overrides = self.build_thread_config_overrides( model, model_provider, @@ -2562,8 +2534,10 @@ impl CodexMessageProcessor { }; let request_trace = request_context.request_trace(); let config_manager = self.config_manager.clone(); + let outgoing = Arc::clone(&listener_task_context.outgoing); + let error_request_id = request_id.clone(); let thread_start_task = async move { - Self::thread_start_task( + if let Err(error) = Self::thread_start_task( listener_task_context, config_manager, request_id, @@ -2579,10 +2553,14 @@ impl CodexMessageProcessor { experimental_raw_events, request_trace, ) - .await; + .await + { + outgoing.send_error(error_request_id, error).await; + } }; self.background_tasks .spawn(thread_start_task.instrument(request_context.span())); + Ok(()) } pub(crate) async fn import_external_agent_session( @@ -2711,255 +2689,236 @@ impl CodexMessageProcessor { service_name: Option, experimental_raw_events: bool, request_trace: Option, - ) { - let result = async { - let requested_cwd = typesafe_overrides.cwd.clone(); - let mut config = config_manager - .load_with_overrides(config_overrides.clone(), typesafe_overrides.clone()) + ) -> Result<(), JSONRPCErrorError> { + let requested_cwd = typesafe_overrides.cwd.clone(); + let mut config = config_manager + .load_with_overrides(config_overrides.clone(), typesafe_overrides.clone()) + .await + .map_err(|err| config_load_error(&err))?; + + // The user may have requested WorkspaceWrite or DangerFullAccess via + // the command line, though in the process of deriving the Config, it + // could be downgraded to ReadOnly (perhaps there is no sandbox + // available on Windows or the enterprise config disallows it). The cwd + // should still be considered "trusted" in this case. + let requested_permissions_trust_project = + requested_permissions_trust_project(&typesafe_overrides, config.cwd.as_path()); + let effective_permissions_trust_project = permission_profile_trusts_project( + &config.permissions.permission_profile(), + config.cwd.as_path(), + ); + + if requested_cwd.is_some() + && config.active_project.trust_level.is_none() + && (requested_permissions_trust_project || effective_permissions_trust_project) + { + let trust_target = resolve_root_git_project_for_trust(LOCAL_FS.as_ref(), &config.cwd) + .await + .unwrap_or_else(|| config.cwd.clone()); + let current_cli_overrides = config_manager.current_cli_overrides(); + let cli_overrides_with_trust; + let cli_overrides_for_reload = if let Err(err) = + codex_core::config::set_project_trust_level( + &listener_task_context.codex_home, + trust_target.as_path(), + TrustLevel::Trusted, + ) { + warn!( + "failed to persist trusted project state for {}; continuing with in-memory trust for this thread: {err}", + trust_target.display() + ); + let mut project = toml::map::Map::new(); + project.insert( + "trust_level".to_string(), + TomlValue::String("trusted".to_string()), + ); + let mut projects = toml::map::Map::new(); + projects.insert( + project_trust_key(trust_target.as_path()), + TomlValue::Table(project), + ); + cli_overrides_with_trust = current_cli_overrides + .iter() + .cloned() + .chain(std::iter::once(( + "projects".to_string(), + TomlValue::Table(projects), + ))) + .collect::>(); + cli_overrides_with_trust.as_slice() + } else { + current_cli_overrides.as_slice() + }; + + config = config_manager + .load_with_cli_overrides( + cli_overrides_for_reload, + config_overrides, + typesafe_overrides, + /*fallback_cwd*/ None, + ) .await .map_err(|err| config_load_error(&err))?; + } - // The user may have requested WorkspaceWrite or DangerFullAccess via - // the command line, though in the process of deriving the Config, it - // could be downgraded to ReadOnly (perhaps there is no sandbox - // available on Windows or the enterprise config disallows it). The cwd - // should still be considered "trusted" in this case. - let requested_permissions_trust_project = - requested_permissions_trust_project(&typesafe_overrides, config.cwd.as_path()); - let effective_permissions_trust_project = permission_profile_trusts_project( - &config.permissions.permission_profile(), - config.cwd.as_path(), - ); - - if requested_cwd.is_some() - && config.active_project.trust_level.is_none() - && (requested_permissions_trust_project || effective_permissions_trust_project) - { - let trust_target = - resolve_root_git_project_for_trust(LOCAL_FS.as_ref(), &config.cwd) - .await - .unwrap_or_else(|| config.cwd.clone()); - let current_cli_overrides = config_manager.current_cli_overrides(); - let cli_overrides_with_trust; - let cli_overrides_for_reload = - if let Err(err) = codex_core::config::set_project_trust_level( - &listener_task_context.codex_home, - trust_target.as_path(), - TrustLevel::Trusted, - ) { - warn!( - "failed to persist trusted project state for {}; continuing with in-memory trust for this thread: {err}", - trust_target.display() - ); - let mut project = toml::map::Map::new(); - project.insert( - "trust_level".to_string(), - TomlValue::String("trusted".to_string()), - ); - let mut projects = toml::map::Map::new(); - projects.insert( - project_trust_key(trust_target.as_path()), - TomlValue::Table(project), - ); - cli_overrides_with_trust = current_cli_overrides - .iter() - .cloned() - .chain(std::iter::once(( - "projects".to_string(), - TomlValue::Table(projects), - ))) - .collect::>(); - cli_overrides_with_trust.as_slice() - } else { - current_cli_overrides.as_slice() - }; - - config = config_manager - .load_with_cli_overrides( - cli_overrides_for_reload, - config_overrides, - typesafe_overrides, - /*fallback_cwd*/ None, - ) - .await - .map_err(|err| config_load_error(&err))?; - } - - let instruction_sources = Self::instruction_sources_from_config(&config).await; - let environments = environments.unwrap_or_else(|| { - listener_task_context - .thread_manager - .default_environment_selections(&config.cwd) - }); - let dynamic_tools = dynamic_tools.unwrap_or_default(); - let core_dynamic_tools = if dynamic_tools.is_empty() { - Vec::new() - } else { - validate_dynamic_tools(&dynamic_tools).map_err(invalid_request)?; - dynamic_tools - .into_iter() - .map(|tool| CoreDynamicToolSpec { - namespace: tool.namespace, - name: tool.name, - description: tool.description, - input_schema: tool.input_schema, - defer_loading: tool.defer_loading, - }) - .collect() - }; - let core_dynamic_tool_count = core_dynamic_tools.len(); - - let NewThread { - thread_id, - thread, - session_configured, - .. - } = listener_task_context + let instruction_sources = Self::instruction_sources_from_config(&config).await; + let environments = environments.unwrap_or_else(|| { + listener_task_context .thread_manager - .start_thread_with_options(StartThreadOptions { - config, - initial_history: match session_start_source - .unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup) - { - codex_app_server_protocol::ThreadStartSource::Startup => { - InitialHistory::New - } - codex_app_server_protocol::ThreadStartSource::Clear => { - InitialHistory::Cleared - } - }, - session_source: None, - dynamic_tools: core_dynamic_tools, - persist_extended_history, - metrics_service_name: service_name, - parent_trace: request_trace, - environments, + .default_environment_selections(&config.cwd) + }); + let dynamic_tools = dynamic_tools.unwrap_or_default(); + let core_dynamic_tools = if dynamic_tools.is_empty() { + Vec::new() + } else { + validate_dynamic_tools(&dynamic_tools).map_err(invalid_request)?; + dynamic_tools + .into_iter() + .map(|tool| CoreDynamicToolSpec { + namespace: tool.namespace, + name: tool.name, + description: tool.description, + input_schema: tool.input_schema, + defer_loading: tool.defer_loading, }) - .instrument(tracing::info_span!( - "app_server.thread_start.create_thread", - otel.name = "app_server.thread_start.create_thread", - thread_start.dynamic_tool_count = core_dynamic_tool_count, - thread_start.persist_extended_history = persist_extended_history, - )) - .await - .map_err(|err| match err { - CodexErr::InvalidRequest(message) => invalid_request(message), - err => internal_error(format!("error creating thread: {err}")), - })?; + .collect() + }; + let core_dynamic_tool_count = core_dynamic_tools.len(); - Self::set_app_server_client_info( - thread.as_ref(), - app_server_client_name, - app_server_client_version, - ) - .await?; + let NewThread { + thread_id, + thread, + session_configured, + .. + } = listener_task_context + .thread_manager + .start_thread_with_options(StartThreadOptions { + config, + initial_history: match session_start_source + .unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup) + { + codex_app_server_protocol::ThreadStartSource::Startup => InitialHistory::New, + codex_app_server_protocol::ThreadStartSource::Clear => InitialHistory::Cleared, + }, + session_source: None, + dynamic_tools: core_dynamic_tools, + persist_extended_history, + metrics_service_name: service_name, + parent_trace: request_trace, + environments, + }) + .instrument(tracing::info_span!( + "app_server.thread_start.create_thread", + otel.name = "app_server.thread_start.create_thread", + thread_start.dynamic_tool_count = core_dynamic_tool_count, + thread_start.persist_extended_history = persist_extended_history, + )) + .await + .map_err(|err| match err { + CodexErr::InvalidRequest(message) => invalid_request(message), + err => internal_error(format!("error creating thread: {err}")), + })?; - let config_snapshot = thread - .config_snapshot() - .instrument(tracing::info_span!( - "app_server.thread_start.config_snapshot", - otel.name = "app_server.thread_start.config_snapshot", - )) - .await; - let mut thread = build_thread_from_snapshot( - thread_id, - &config_snapshot, - session_configured.rollout_path.clone(), - ); + Self::set_app_server_client_info( + thread.as_ref(), + app_server_client_name, + app_server_client_version, + ) + .await?; - // Auto-attach a thread listener when starting a thread. - Self::log_listener_attach_result( - Self::ensure_conversation_listener_task( - listener_task_context.clone(), - thread_id, - request_id.connection_id, - experimental_raw_events, - ) - .instrument(tracing::info_span!( - "app_server.thread_start.attach_listener", - otel.name = "app_server.thread_start.attach_listener", - thread_start.experimental_raw_events = experimental_raw_events, - )) - .await, + let config_snapshot = thread + .config_snapshot() + .instrument(tracing::info_span!( + "app_server.thread_start.config_snapshot", + otel.name = "app_server.thread_start.config_snapshot", + )) + .await; + let mut thread = build_thread_from_snapshot( + thread_id, + &config_snapshot, + session_configured.rollout_path.clone(), + ); + + // Auto-attach a thread listener when starting a thread. + Self::log_listener_attach_result( + Self::ensure_conversation_listener_task( + listener_task_context.clone(), thread_id, request_id.connection_id, - "thread", - ); + experimental_raw_events, + ) + .instrument(tracing::info_span!( + "app_server.thread_start.attach_listener", + otel.name = "app_server.thread_start.attach_listener", + thread_start.experimental_raw_events = experimental_raw_events, + )) + .await, + thread_id, + request_id.connection_id, + "thread", + ); + listener_task_context + .thread_watch_manager + .upsert_thread_silently(thread.clone()) + .instrument(tracing::info_span!( + "app_server.thread_start.upsert_thread", + otel.name = "app_server.thread_start.upsert_thread", + )) + .await; + + thread.status = resolve_thread_status( listener_task_context .thread_watch_manager - .upsert_thread_silently(thread.clone()) + .loaded_status_for_thread(&thread.id) .instrument(tracing::info_span!( - "app_server.thread_start.upsert_thread", - otel.name = "app_server.thread_start.upsert_thread", + "app_server.thread_start.resolve_status", + otel.name = "app_server.thread_start.resolve_status", )) - .await; + .await, + /*has_in_progress_turn*/ false, + ); - thread.status = resolve_thread_status( - listener_task_context - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .instrument(tracing::info_span!( - "app_server.thread_start.resolve_status", - otel.name = "app_server.thread_start.resolve_status", - )) - .await, - /*has_in_progress_turn*/ false, - ); + let sandbox = thread_response_sandbox_policy( + &config_snapshot.permission_profile, + config_snapshot.cwd.as_path(), + ); + let active_permission_profile = + thread_response_active_permission_profile(config_snapshot.active_permission_profile); - let sandbox = thread_response_sandbox_policy( - &config_snapshot.permission_profile, - config_snapshot.cwd.as_path(), - ); - let active_permission_profile = thread_response_active_permission_profile( - config_snapshot.active_permission_profile, - ); + let response = ThreadStartResponse { + thread: thread.clone(), + model: config_snapshot.model, + model_provider: config_snapshot.model_provider_id, + service_tier: config_snapshot.service_tier, + cwd: config_snapshot.cwd, + instruction_sources, + approval_policy: config_snapshot.approval_policy.into(), + approvals_reviewer: config_snapshot.approvals_reviewer.into(), + sandbox, + permission_profile: Some(config_snapshot.permission_profile.into()), + active_permission_profile, + reasoning_effort: config_snapshot.reasoning_effort, + }; + let notif = thread_started_notification(thread); + listener_task_context + .outgoing + .send_response(request_id, response) + .instrument(tracing::info_span!( + "app_server.thread_start.send_response", + otel.name = "app_server.thread_start.send_response", + )) + .await; - let response = ThreadStartResponse { - thread: thread.clone(), - model: config_snapshot.model, - model_provider: config_snapshot.model_provider_id, - service_tier: config_snapshot.service_tier, - cwd: config_snapshot.cwd, - instruction_sources, - approval_policy: config_snapshot.approval_policy.into(), - approvals_reviewer: config_snapshot.approvals_reviewer.into(), - sandbox, - permission_profile: Some(config_snapshot.permission_profile.into()), - active_permission_profile, - reasoning_effort: config_snapshot.reasoning_effort, - }; - Ok::<_, JSONRPCErrorError>((response, thread_started_notification(thread))) - } - .await; - - match result { - Ok((response, notif)) => { - listener_task_context - .outgoing - .send_response(request_id, response) - .instrument(tracing::info_span!( - "app_server.thread_start.send_response", - otel.name = "app_server.thread_start.send_response", - )) - .await; - - listener_task_context - .outgoing - .send_server_notification(ServerNotification::ThreadStarted(notif)) - .instrument(tracing::info_span!( - "app_server.thread_start.notify_started", - otel.name = "app_server.thread_start.notify_started", - )) - .await; - } - Err(error) => { - listener_task_context - .outgoing - .send_error(request_id, error) - .await; - } - } + listener_task_context + .outgoing + .send_server_notification(ServerNotification::ThreadStarted(notif)) + .instrument(tracing::info_span!( + "app_server.thread_start.notify_started", + otel.name = "app_server.thread_start.notify_started", + )) + .await; + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -3019,31 +2978,12 @@ impl CodexMessageProcessor { Ok(environment_selections) } - async fn thread_archive(&self, request_id: ConnectionRequestId, params: ThreadArchiveParams) { - let _thread_list_state_permit = match self.acquire_thread_list_state_permit().await { - Ok(permit) => permit, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - let result = self.thread_archive_response(params).await; - let archived_thread_ids = result - .as_ref() - .ok() - .map(|(_, thread_ids)| thread_ids.clone()); - self.outgoing - .send_result(request_id, result.map(|(response, _)| response)) - .await; - - if let Some(archived_thread_ids) = archived_thread_ids { - for thread_id in archived_thread_ids { - let notification = ThreadArchivedNotification { thread_id }; - self.outgoing - .send_server_notification(ServerNotification::ThreadArchived(notification)) - .await; - } - } + async fn thread_archive( + &self, + params: ThreadArchiveParams, + ) -> Result<(ThreadArchiveResponse, Vec), JSONRPCErrorError> { + let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; + self.thread_archive_response(params).await } async fn thread_archive_response( @@ -3156,68 +3096,41 @@ impl CodexMessageProcessor { async fn thread_increment_elicitation( &self, - request_id: ConnectionRequestId, params: ThreadIncrementElicitationParams, - ) { - let result = async { - let (_, thread) = self.load_thread(¶ms.thread_id).await?; - let count = thread - .increment_out_of_band_elicitation_count() - .await - .map_err(|err| { - internal_error(format!( - "failed to increment out-of-band elicitation counter: {err}" - )) - })?; - Ok::<_, JSONRPCErrorError>(ThreadIncrementElicitationResponse { - count, - paused: count > 0, - }) - } - .await; - self.outgoing.send_result(request_id, result).await; + ) -> Result { + let (_, thread) = self.load_thread(¶ms.thread_id).await?; + let count = thread + .increment_out_of_band_elicitation_count() + .await + .map_err(|err| { + internal_error(format!( + "failed to increment out-of-band elicitation counter: {err}" + )) + })?; + Ok(ThreadIncrementElicitationResponse { + count, + paused: count > 0, + }) } async fn thread_decrement_elicitation( &self, - request_id: ConnectionRequestId, params: ThreadDecrementElicitationParams, - ) { - let result = async { - let (_, thread) = self.load_thread(¶ms.thread_id).await?; - let count = thread - .decrement_out_of_band_elicitation_count() - .await - .map_err(|err| match err { - CodexErr::InvalidRequest(message) => invalid_request(message), - err => internal_error(format!( - "failed to decrement out-of-band elicitation counter: {err}" - )), - })?; - Ok::<_, JSONRPCErrorError>(ThreadDecrementElicitationResponse { - count, - paused: count > 0, - }) - } - .await; - self.outgoing.send_result(request_id, result).await; - } - - async fn thread_set_name(&self, request_id: ConnectionRequestId, params: ThreadSetNameParams) { - let result = self.thread_set_name_response(&request_id, params).await; - let notification = result - .as_ref() - .ok() - .and_then(|(_, notification)| notification.clone()); - self.outgoing - .send_result(request_id, result.map(|(response, _)| response)) - .await; - - if let Some(notification) = notification { - self.outgoing - .send_server_notification(ServerNotification::ThreadNameUpdated(notification)) - .await; - } + ) -> Result { + let (_, thread) = self.load_thread(¶ms.thread_id).await?; + let count = thread + .decrement_out_of_band_elicitation_count() + .await + .map_err(|err| match err { + CodexErr::InvalidRequest(message) => invalid_request(message), + err => internal_error(format!( + "failed to decrement out-of-band elicitation counter: {err}" + )), + })?; + Ok(ThreadDecrementElicitationResponse { + count, + paused: count > 0, + }) } async fn thread_set_name_response( @@ -3262,15 +3175,6 @@ impl CodexMessageProcessor { )) } - async fn thread_memory_mode_set( - &self, - request_id: ConnectionRequestId, - params: ThreadMemoryModeSetParams, - ) { - let result = self.thread_memory_mode_set_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_memory_mode_set_response( &self, params: ThreadMemoryModeSetParams, @@ -3310,11 +3214,6 @@ impl CodexMessageProcessor { Ok(ThreadMemoryModeSetResponse {}) } - async fn memory_reset(&self, request_id: ConnectionRequestId, _params: Option<()>) { - let result = self.memory_reset_response().await; - self.outgoing.send_result(request_id, result).await; - } - async fn memory_reset_response(&self) -> Result { let state_db = StateRuntime::init( self.config.sqlite_home.clone(), @@ -3341,15 +3240,6 @@ impl CodexMessageProcessor { Ok(MemoryResetResponse {}) } - async fn thread_metadata_update( - &self, - request_id: ConnectionRequestId, - params: ThreadMetadataUpdateParams, - ) { - let result = self.thread_metadata_update_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_metadata_update_response( &self, params: ThreadMetadataUpdateParams, @@ -3569,33 +3459,11 @@ impl CodexMessageProcessor { async fn thread_unarchive( &self, - request_id: ConnectionRequestId, params: ThreadUnarchiveParams, - ) { - let _thread_list_state_permit = match self.acquire_thread_list_state_permit().await { - Ok(permit) => permit, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - let result = self.thread_unarchive_response(params).await; - let notification = - result - .as_ref() - .ok() - .map(|(_, thread_id)| ThreadUnarchivedNotification { - thread_id: thread_id.clone(), - }); - self.outgoing - .send_result(request_id, result.map(|(response, _)| response)) - .await; - - if let Some(notification) = notification { - self.outgoing - .send_server_notification(ServerNotification::ThreadUnarchived(notification)) - .await; - } + ) -> Result<(ThreadUnarchiveResponse, ThreadUnarchivedNotification), JSONRPCErrorError> { + let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; + let (response, thread_id) = self.thread_unarchive_response(params).await?; + Ok((response, ThreadUnarchivedNotification { thread_id })) } async fn thread_unarchive_response( @@ -3632,12 +3500,12 @@ impl CodexMessageProcessor { Ok((ThreadUnarchiveResponse { thread }, thread_id)) } - async fn thread_rollback(&self, request_id: ConnectionRequestId, params: ThreadRollbackParams) { - let result = self - .thread_rollback_start(&request_id, params) - .await - .map(|()| None::); - self.send_optional_result(request_id, result).await; + async fn thread_rollback( + &self, + request_id: &ConnectionRequestId, + params: ThreadRollbackParams, + ) -> Result<(), JSONRPCErrorError> { + self.thread_rollback_start(request_id, params).await } async fn thread_rollback_start( @@ -3694,95 +3562,74 @@ impl CodexMessageProcessor { async fn thread_compact_start( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadCompactStartParams, - ) { + ) -> Result { let ThreadCompactStartParams { thread_id } = params; - let result = async { - let (_, thread) = self.load_thread(&thread_id).await?; - self.submit_core_op(&request_id, thread.as_ref(), Op::Compact) - .await - .map_err(|err| internal_error(format!("failed to start compaction: {err}")))?; - Ok::<_, JSONRPCErrorError>(ThreadCompactStartResponse {}) - } - .await; - self.outgoing.send_result(request_id, result).await; + let (_, thread) = self.load_thread(&thread_id).await?; + self.submit_core_op(request_id, thread.as_ref(), Op::Compact) + .await + .map_err(|err| internal_error(format!("failed to start compaction: {err}")))?; + Ok(ThreadCompactStartResponse {}) } async fn thread_background_terminals_clean( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadBackgroundTerminalsCleanParams, - ) { + ) -> Result { let ThreadBackgroundTerminalsCleanParams { thread_id } = params; - let result = async { - let (_, thread) = self.load_thread(&thread_id).await?; - self.submit_core_op(&request_id, thread.as_ref(), Op::CleanBackgroundTerminals) - .await - .map_err(|err| { - internal_error(format!("failed to clean background terminals: {err}")) - })?; - Ok::<_, JSONRPCErrorError>(ThreadBackgroundTerminalsCleanResponse {}) - } - .await; - self.outgoing.send_result(request_id, result).await; + let (_, thread) = self.load_thread(&thread_id).await?; + self.submit_core_op(request_id, thread.as_ref(), Op::CleanBackgroundTerminals) + .await + .map_err(|err| { + internal_error(format!("failed to clean background terminals: {err}")) + })?; + Ok(ThreadBackgroundTerminalsCleanResponse {}) } async fn thread_shell_command( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadShellCommandParams, - ) { - let result = async { - let ThreadShellCommandParams { thread_id, command } = params; - let command = command.trim().to_string(); - if command.is_empty() { - return Err(invalid_request("command must not be empty")); - } - - let (_, thread) = self.load_thread(&thread_id).await?; - self.submit_core_op( - &request_id, - thread.as_ref(), - Op::RunUserShellCommand { command }, - ) - .await - .map_err(|err| internal_error(format!("failed to start shell command: {err}")))?; - Ok::<_, JSONRPCErrorError>(ThreadShellCommandResponse {}) + ) -> Result { + let ThreadShellCommandParams { thread_id, command } = params; + let command = command.trim().to_string(); + if command.is_empty() { + return Err(invalid_request("command must not be empty")); } - .await; - self.outgoing.send_result(request_id, result).await; + + let (_, thread) = self.load_thread(&thread_id).await?; + self.submit_core_op( + request_id, + thread.as_ref(), + Op::RunUserShellCommand { command }, + ) + .await + .map_err(|err| internal_error(format!("failed to start shell command: {err}")))?; + Ok(ThreadShellCommandResponse {}) } async fn thread_approve_guardian_denied_action( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadApproveGuardianDeniedActionParams, - ) { - let result = async { - let ThreadApproveGuardianDeniedActionParams { thread_id, event } = params; - let event = serde_json::from_value(event) - .map_err(|err| invalid_request(format!("invalid Guardian denial event: {err}")))?; - let (_, thread) = self.load_thread(&thread_id).await?; + ) -> Result { + let ThreadApproveGuardianDeniedActionParams { thread_id, event } = params; + let event = serde_json::from_value(event) + .map_err(|err| invalid_request(format!("invalid Guardian denial event: {err}")))?; + let (_, thread) = self.load_thread(&thread_id).await?; - self.submit_core_op( - &request_id, - thread.as_ref(), - Op::ApproveGuardianDeniedAction { event }, - ) - .await - .map_err(|err| internal_error(format!("failed to approve Guardian denial: {err}")))?; - Ok::<_, JSONRPCErrorError>(ThreadApproveGuardianDeniedActionResponse {}) - } - .await; - self.outgoing.send_result(request_id, result).await; - } - - async fn thread_list(&self, request_id: ConnectionRequestId, params: ThreadListParams) { - let result = self.thread_list_response(params).await; - self.outgoing.send_result(request_id, result).await; + self.submit_core_op( + request_id, + thread.as_ref(), + Op::ApproveGuardianDeniedAction { event }, + ) + .await + .map_err(|err| internal_error(format!("failed to approve Guardian denial: {err}")))?; + Ok(ThreadApproveGuardianDeniedActionResponse {}) } async fn thread_list_response( @@ -3866,15 +3713,6 @@ impl CodexMessageProcessor { }) } - async fn thread_loaded_list( - &self, - request_id: ConnectionRequestId, - params: ThreadLoadedListParams, - ) { - let result = self.thread_loaded_list_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_loaded_list_response( &self, params: ThreadLoadedListParams, @@ -3922,11 +3760,6 @@ impl CodexMessageProcessor { }) } - async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) { - let result = self.thread_read_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_read_response( &self, params: ThreadReadParams, @@ -4110,15 +3943,6 @@ impl CodexMessageProcessor { Ok(()) } - async fn thread_turns_list( - &self, - request_id: ConnectionRequestId, - params: ThreadTurnsListParams, - ) { - let result = self.thread_turns_list_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_turns_list_response( &self, params: ThreadTurnsListParams, @@ -4295,7 +4119,11 @@ impl CodexMessageProcessor { } } - async fn thread_resume(&self, request_id: ConnectionRequestId, params: ThreadResumeParams) { + async fn thread_resume( + &self, + request_id: ConnectionRequestId, + params: ThreadResumeParams, + ) -> Result<(), JSONRPCErrorError> { if let Ok(thread_id) = ThreadId::from_string(¶ms.thread_id) && self .pending_thread_unloads @@ -4311,7 +4139,7 @@ impl CodexMessageProcessor { )), ) .await; - return; + return Ok(()); } if params.sandbox.is_some() && params.permissions.is_some() { @@ -4321,22 +4149,22 @@ impl CodexMessageProcessor { invalid_request("`permissions` cannot be combined with `sandbox`"), ) .await; - return; + return Ok(()); } let _thread_list_state_permit = match self.acquire_thread_list_state_permit().await { Ok(permit) => permit, Err(error) => { self.outgoing.send_error(request_id, error).await; - return; + return Ok(()); } }; match self.resume_running_thread(&request_id, ¶ms).await { - Ok(true) => return, + Ok(true) => return Ok(()), Ok(false) => {} Err(error) => { self.outgoing.send_error(request_id, error).await; - return; + return Ok(()); } } @@ -4373,7 +4201,7 @@ impl CodexMessageProcessor { Ok(value) => value, Err(error) => { self.outgoing.send_error(request_id, error).await; - return; + return Ok(()); } }; @@ -4408,7 +4236,7 @@ impl CodexMessageProcessor { Err(err) => { let error = config_load_error(&err); self.outgoing.send_error(request_id, error).await; - return; + return Ok(()); } }; @@ -4437,7 +4265,7 @@ impl CodexMessageProcessor { let error = internal_error(format!("rollout path missing for thread {thread_id}")); self.outgoing.send_error(request_id, error).await; - return; + return Ok(()); }; // Auto-attach a thread listener when resuming a thread. Self::log_listener_attach_result( @@ -4468,7 +4296,7 @@ impl CodexMessageProcessor { self.outgoing .send_error(request_id, internal_error(message)) .await; - return; + return Ok(()); } }; @@ -4547,6 +4375,7 @@ impl CodexMessageProcessor { self.outgoing.send_error(request_id, error).await; } } + Ok(()) } async fn load_and_apply_persisted_resume_metadata( @@ -4942,7 +4771,11 @@ impl CodexMessageProcessor { } } - async fn thread_fork(&self, request_id: ConnectionRequestId, params: ThreadForkParams) { + async fn thread_fork( + &self, + request_id: ConnectionRequestId, + params: ThreadForkParams, + ) -> Result<(), JSONRPCErrorError> { let ThreadForkParams { thread_id, path, @@ -4962,206 +4795,183 @@ impl CodexMessageProcessor { persist_extended_history, } = params; let include_turns = !exclude_turns; - let result = async { - if sandbox.is_some() && permissions.is_some() { - return Err(invalid_request( - "`permissions` cannot be combined with `sandbox`", - )); - } + if sandbox.is_some() && permissions.is_some() { + return Err(invalid_request( + "`permissions` cannot be combined with `sandbox`", + )); + } - let source_thread = self - .read_stored_thread_for_resume( - &thread_id, - path.as_ref(), - /*include_history*/ true, - ) - .await?; - let source_thread_id = source_thread.thread_id; - let history_items = source_thread - .history - .as_ref() - .map(|history| history.items.clone()) - .ok_or_else(|| { - internal_error(format!( - "thread {source_thread_id} did not include persisted history" - )) - })?; - let history_cwd = Some(source_thread.cwd.clone()); + let source_thread = self + .read_stored_thread_for_resume(&thread_id, path.as_ref(), /*include_history*/ true) + .await?; + let source_thread_id = source_thread.thread_id; + let history_items = source_thread + .history + .as_ref() + .map(|history| history.items.clone()) + .ok_or_else(|| { + internal_error(format!( + "thread {source_thread_id} did not include persisted history" + )) + })?; + let history_cwd = Some(source_thread.cwd.clone()); - // Persist Windows sandbox mode. - let mut cli_overrides = cli_overrides.unwrap_or_default(); - if cfg!(windows) { - match WindowsSandboxLevel::from_config(&self.config) { - WindowsSandboxLevel::Elevated => { - cli_overrides - .insert("windows.sandbox".to_string(), serde_json::json!("elevated")); - } - WindowsSandboxLevel::RestrictedToken => { - cli_overrides.insert( - "windows.sandbox".to_string(), - serde_json::json!("unelevated"), - ); - } - WindowsSandboxLevel::Disabled => {} + // Persist Windows sandbox mode. + let mut cli_overrides = cli_overrides.unwrap_or_default(); + if cfg!(windows) { + match WindowsSandboxLevel::from_config(&self.config) { + WindowsSandboxLevel::Elevated => { + cli_overrides + .insert("windows.sandbox".to_string(), serde_json::json!("elevated")); } + WindowsSandboxLevel::RestrictedToken => { + cli_overrides.insert( + "windows.sandbox".to_string(), + serde_json::json!("unelevated"), + ); + } + WindowsSandboxLevel::Disabled => {} } - let request_overrides = if cli_overrides.is_empty() { - None - } else { - Some(cli_overrides) - }; - let mut typesafe_overrides = self.build_thread_config_overrides( - model, - model_provider, - service_tier, - cwd, - approval_policy, - approvals_reviewer, - sandbox, - permissions, - base_instructions, - developer_instructions, - /*personality*/ None, - ); - typesafe_overrides.ephemeral = ephemeral.then_some(true); - // Derive a Config using the same logic as new conversation, honoring overrides if provided. - let config = self - .config_manager - .load_for_cwd(request_overrides, typesafe_overrides, history_cwd) - .await - .map_err(|err| config_load_error(&err))?; + } + let request_overrides = if cli_overrides.is_empty() { + None + } else { + Some(cli_overrides) + }; + let mut typesafe_overrides = self.build_thread_config_overrides( + model, + model_provider, + service_tier, + cwd, + approval_policy, + approvals_reviewer, + sandbox, + permissions, + base_instructions, + developer_instructions, + /*personality*/ None, + ); + typesafe_overrides.ephemeral = ephemeral.then_some(true); + // Derive a Config using the same logic as new conversation, honoring overrides if provided. + let config = self + .config_manager + .load_for_cwd(request_overrides, typesafe_overrides, history_cwd) + .await + .map_err(|err| config_load_error(&err))?; - let fallback_model_provider = config.model_provider_id.clone(); - let instruction_sources = Self::instruction_sources_from_config(&config).await; + let fallback_model_provider = config.model_provider_id.clone(); + let instruction_sources = Self::instruction_sources_from_config(&config).await; - let NewThread { - thread_id, - thread: forked_thread, - session_configured, - .. - } = self - .thread_manager - .fork_thread_from_history( - ForkSnapshot::Interrupted, - config, - InitialHistory::Resumed(ResumedHistory { - conversation_id: source_thread_id, - history: history_items.clone(), - rollout_path: source_thread.rollout_path.clone(), - }), - persist_extended_history, - self.request_trace_context(&request_id).await, - ) - .await - .map_err(|err| match err { - CodexErr::Io(_) | CodexErr::Json(_) => { - invalid_request(format!("failed to load thread {source_thread_id}: {err}")) - } - CodexErr::InvalidRequest(message) => invalid_request(message), - err => internal_error(format!("error forking thread: {err}")), - })?; + let NewThread { + thread_id, + thread: forked_thread, + session_configured, + .. + } = self + .thread_manager + .fork_thread_from_history( + ForkSnapshot::Interrupted, + config, + InitialHistory::Resumed(ResumedHistory { + conversation_id: source_thread_id, + history: history_items.clone(), + rollout_path: source_thread.rollout_path.clone(), + }), + persist_extended_history, + self.request_trace_context(&request_id).await, + ) + .await + .map_err(|err| match err { + CodexErr::Io(_) | CodexErr::Json(_) => { + invalid_request(format!("failed to load thread {source_thread_id}: {err}")) + } + CodexErr::InvalidRequest(message) => invalid_request(message), + err => internal_error(format!("error forking thread: {err}")), + })?; - // Auto-attach a conversation listener when forking a thread. - Self::log_listener_attach_result( - self.ensure_conversation_listener( - thread_id, - request_id.connection_id, - /*raw_events_enabled*/ false, - ) - .await, + // Auto-attach a conversation listener when forking a thread. + Self::log_listener_attach_result( + self.ensure_conversation_listener( thread_id, request_id.connection_id, - "thread", - ); + /*raw_events_enabled*/ false, + ) + .await, + thread_id, + request_id.connection_id, + "thread", + ); - // Persistent forks materialize their own rollout immediately. Ephemeral forks stay - // pathless, so they rebuild their visible history from the copied source history instead. - let mut thread = - if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() { - let stored_thread = self - .read_stored_thread_for_new_fork(thread_id, include_turns) - .await?; - self.stored_thread_to_api_thread( - stored_thread, - fallback_model_provider.as_str(), - include_turns, - ) - .await - .map_err(|message| { - internal_error(format!( - "failed to load rollout `{}` for thread {thread_id}: {message}", - fork_rollout_path.display() - )) - })? - } else { - let config_snapshot = forked_thread.config_snapshot().await; - // forked thread names do not inherit the source thread name - let mut thread = - build_thread_from_snapshot(thread_id, &config_snapshot, /*path*/ None); - thread.preview = preview_from_rollout_items(&history_items); - thread.forked_from_id = Some(source_thread_id.to_string()); - if include_turns { - populate_thread_turns_from_history( - &mut thread, - &history_items, - /*active_turn*/ None, - ) - .map_err(internal_error)?; - } - thread - }; - - self.thread_watch_manager - .upsert_thread_silently(thread.clone()) - .await; - - thread.status = resolve_thread_status( - self.thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await, - /*has_in_progress_turn*/ false, - ); + // Persistent forks materialize their own rollout immediately. Ephemeral forks stay + // pathless, so they rebuild their visible history from the copied source history instead. + let mut thread = if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() { + let stored_thread = self + .read_stored_thread_for_new_fork(thread_id, include_turns) + .await?; + self.stored_thread_to_api_thread( + stored_thread, + fallback_model_provider.as_str(), + include_turns, + ) + .await + .map_err(|message| { + internal_error(format!( + "failed to load rollout `{}` for thread {thread_id}: {message}", + fork_rollout_path.display() + )) + })? + } else { let config_snapshot = forked_thread.config_snapshot().await; - let sandbox = thread_response_sandbox_policy( - &config_snapshot.permission_profile, - config_snapshot.cwd.as_path(), - ); - let active_permission_profile = thread_response_active_permission_profile( - config_snapshot.active_permission_profile, - ); - - let response = ThreadForkResponse { - thread: thread.clone(), - model: session_configured.model, - model_provider: session_configured.model_provider_id, - service_tier: session_configured.service_tier, - cwd: session_configured.cwd, - instruction_sources, - approval_policy: session_configured.approval_policy.into(), - approvals_reviewer: session_configured.approvals_reviewer.into(), - sandbox, - permission_profile: Some(config_snapshot.permission_profile.into()), - active_permission_profile, - reasoning_effort: session_configured.reasoning_effort, - }; - - Ok::<_, JSONRPCErrorError>(( - response, - thread_id, - forked_thread, - history_items, - thread_started_notification(thread), - )) - } - .await; - - let (response, thread_id, forked_thread, history_items, notif) = match result { - Ok(value) => value, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; + // forked thread names do not inherit the source thread name + let mut thread = + build_thread_from_snapshot(thread_id, &config_snapshot, /*path*/ None); + thread.preview = preview_from_rollout_items(&history_items); + thread.forked_from_id = Some(source_thread_id.to_string()); + if include_turns { + populate_thread_turns_from_history( + &mut thread, + &history_items, + /*active_turn*/ None, + ) + .map_err(internal_error)?; } + thread }; + + self.thread_watch_manager + .upsert_thread_silently(thread.clone()) + .await; + + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + /*has_in_progress_turn*/ false, + ); + let config_snapshot = forked_thread.config_snapshot().await; + let sandbox = thread_response_sandbox_policy( + &config_snapshot.permission_profile, + config_snapshot.cwd.as_path(), + ); + let active_permission_profile = + thread_response_active_permission_profile(config_snapshot.active_permission_profile); + + let response = ThreadForkResponse { + thread: thread.clone(), + model: session_configured.model, + model_provider: session_configured.model_provider_id, + service_tier: session_configured.service_tier, + cwd: session_configured.cwd, + instruction_sources, + approval_policy: session_configured.approval_policy.into(), + approvals_reviewer: session_configured.approvals_reviewer.into(), + sandbox, + permission_profile: Some(config_snapshot.permission_profile.into()), + active_permission_profile, + reasoning_effort: session_configured.reasoning_effort, + }; + + let notif = thread_started_notification(thread); let connection_id = request_id.connection_id; let token_usage_thread = include_turns.then(|| response.thread.clone()); self.outgoing.send_response(request_id, response).await; @@ -5194,15 +5004,7 @@ impl CodexMessageProcessor { self.outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) .await; - } - - async fn get_thread_summary( - &self, - request_id: ConnectionRequestId, - params: GetConversationSummaryParams, - ) { - let result = self.get_thread_summary_response(params).await; - self.outgoing.send_result(request_id, result).await; + Ok(()) } async fn get_thread_summary_response( @@ -5357,64 +5159,56 @@ impl CodexMessageProcessor { } async fn list_models( - outgoing: Arc, thread_manager: Arc, - request_id: ConnectionRequestId, params: ModelListParams, - ) { - let result = async { - let ModelListParams { - limit, - cursor, - include_hidden, - } = params; - let models = supported_models(thread_manager, include_hidden.unwrap_or(false)).await; - let total = models.len(); + ) -> Result { + let ModelListParams { + limit, + cursor, + include_hidden, + } = params; + let models = supported_models(thread_manager, include_hidden.unwrap_or(false)).await; + let total = models.len(); - if total == 0 { - return Ok(ModelListResponse { - data: Vec::new(), - next_cursor: None, - }); - } - - let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; - let effective_limit = effective_limit.min(total); - let start = match cursor { - Some(cursor) => cursor - .parse::() - .map_err(|_| invalid_request(format!("invalid cursor: {cursor}")))?, - None => 0, - }; - - if start > total { - return Err(invalid_request(format!( - "cursor {start} exceeds total models {total}" - ))); - } - - let end = start.saturating_add(effective_limit).min(total); - let items = models[start..end].to_vec(); - let next_cursor = if end < total { - Some(end.to_string()) - } else { - None - }; - Ok::<_, JSONRPCErrorError>(ModelListResponse { - data: items, - next_cursor, - }) + if total == 0 { + return Ok(ModelListResponse { + data: Vec::new(), + next_cursor: None, + }); } - .await; - outgoing.send_result(request_id, result).await; + + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; + let effective_limit = effective_limit.min(total); + let start = match cursor { + Some(cursor) => cursor + .parse::() + .map_err(|_| invalid_request(format!("invalid cursor: {cursor}")))?, + None => 0, + }; + + if start > total { + return Err(invalid_request(format!( + "cursor {start} exceeds total models {total}" + ))); + } + + let end = start.saturating_add(effective_limit).min(total); + let items = models[start..end].to_vec(); + let next_cursor = if end < total { + Some(end.to_string()) + } else { + None + }; + Ok(ModelListResponse { + data: items, + next_cursor, + }) } async fn list_collaboration_modes( - outgoing: Arc, thread_manager: Arc, - request_id: ConnectionRequestId, params: CollaborationModeListParams, - ) { + ) -> Result { let CollaborationModeListParams {} = params; let items = thread_manager .list_collaboration_modes() @@ -5422,16 +5216,7 @@ impl CodexMessageProcessor { .map(Into::into) .collect(); let response = CollaborationModeListResponse { data: items }; - outgoing.send_response(request_id, response).await; - } - - async fn experimental_feature_list( - &self, - request_id: ConnectionRequestId, - params: ExperimentalFeatureListParams, - ) { - let result = self.experimental_feature_list_response(params).await; - self.outgoing.send_result(request_id, result).await; + Ok(response) } async fn experimental_feature_list_response( @@ -5524,22 +5309,20 @@ impl CodexMessageProcessor { async fn mock_experimental_method( &self, - request_id: ConnectionRequestId, params: MockExperimentalMethodParams, - ) { + ) -> Result { let MockExperimentalMethodParams { value } = params; let response = MockExperimentalMethodResponse { echoed: value }; - self.outgoing.send_response(request_id, response).await; + Ok(response) } - async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) { - let result = async { - let config = self.load_latest_config(/*fallback_cwd*/ None).await?; - Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?; - Ok::<_, JSONRPCErrorError>(McpServerRefreshResponse {}) - } - .await; - self.outgoing.send_result(request_id, result).await; + async fn mcp_server_refresh( + &self, + _params: Option<()>, + ) -> Result { + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; + Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?; + Ok(McpServerRefreshResponse {}) } async fn queue_mcp_server_refresh_for_config( @@ -5586,15 +5369,6 @@ impl CodexMessageProcessor { Ok(()) } - async fn mcp_server_oauth_login( - &self, - request_id: ConnectionRequestId, - params: McpServerOauthLoginParams, - ) { - let result = self.mcp_server_oauth_login_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn mcp_server_oauth_login_response( &self, params: McpServerOauthLoginParams, @@ -5678,19 +5452,13 @@ impl CodexMessageProcessor { async fn list_mcp_server_status( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ListMcpServerStatusParams, - ) { + ) -> Result<(), JSONRPCErrorError> { let request = request_id.clone(); let outgoing = Arc::clone(&self.outgoing); - let config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(error) => { - self.outgoing.send_error(request, error).await; - return; - } - }; + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; let mcp_config = config .to_mcp_config(self.thread_manager.plugins_manager().as_ref()) .await; @@ -5720,6 +5488,7 @@ impl CodexMessageProcessor { ) .await; }); + Ok(()) } async fn list_mcp_server_status_task( @@ -5833,9 +5602,9 @@ impl CodexMessageProcessor { async fn read_mcp_resource( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: McpResourceReadParams, - ) { + ) -> Result<(), JSONRPCErrorError> { let outgoing = Arc::clone(&self.outgoing); let McpResourceReadParams { thread_id, @@ -5844,28 +5613,17 @@ impl CodexMessageProcessor { } = params; if let Some(thread_id) = thread_id { - let (_, thread) = match self.load_thread(&thread_id).await { - Ok(thread) => thread, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let (_, thread) = self.load_thread(&thread_id).await?; + let request_id = request_id.clone(); tokio::spawn(async move { let result = thread.read_mcp_resource(&server, &uri).await; Self::send_mcp_resource_read_response(outgoing, request_id, result).await; }); - return; + return Ok(()); } - let config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; let mcp_config = config .to_mcp_config(self.thread_manager.plugins_manager().as_ref()) .await; @@ -5879,9 +5637,10 @@ impl CodexMessageProcessor { // is used only by executor-backed stdio MCPs whose config omits `cwd`. McpRuntimeEnvironment::new(environment, config.cwd.to_path_buf()) }; + let request_id = request_id.clone(); tokio::spawn(async move { - let result = match read_mcp_resource_without_thread( + let result = read_mcp_resource_without_thread( &mcp_config, auth.as_ref(), runtime_environment, @@ -5889,12 +5648,10 @@ impl CodexMessageProcessor { &uri, ) .await - { - Ok(result) => serde_json::to_value(result).map_err(anyhow::Error::from), - Err(error) => Err(error), - }; + .and_then(|result| serde_json::to_value(result).map_err(anyhow::Error::from)); Self::send_mcp_resource_read_response(outgoing, request_id, result).await; }); + Ok(()) } async fn send_mcp_resource_read_response( @@ -5916,19 +5673,14 @@ impl CodexMessageProcessor { async fn call_mcp_server_tool( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: McpServerToolCallParams, - ) { + ) -> Result<(), JSONRPCErrorError> { let outgoing = Arc::clone(&self.outgoing); let thread_id = params.thread_id.clone(); - let (_, thread) = match self.load_thread(&thread_id).await { - Ok(thread) => thread, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let (_, thread) = self.load_thread(&thread_id).await?; let meta = with_mcp_tool_call_thread_id_meta(params.meta, &thread_id); + let request_id = request_id.clone(); tokio::spawn(async move { let result = thread @@ -5938,22 +5690,7 @@ impl CodexMessageProcessor { .map_err(|error| internal_error(format!("{error:#}"))); outgoing.send_result(request_id, result).await; }); - } - - async fn send_optional_result( - &self, - request_id: ConnectionRequestId, - result: Result, JSONRPCErrorError>, - ) where - T: Into, - { - match result { - Ok(Some(response)) => self.outgoing.send_response(request_id, response).await, - Ok(None) => {} - Err(error) => { - self.outgoing.send_error(request_id, error).await; - } - } + Ok(()) } fn input_too_large_error(actual_chars: usize) -> JSONRPCErrorError { @@ -6051,17 +5788,6 @@ impl CodexMessageProcessor { }); } - async fn thread_unsubscribe( - &self, - request_id: ConnectionRequestId, - params: ThreadUnsubscribeParams, - ) { - let result = self - .thread_unsubscribe_response(params, request_id.connection_id) - .await; - self.outgoing.send_result(request_id, result).await; - } - async fn thread_unsubscribe_response( &self, params: ThreadUnsubscribeParams, @@ -6113,23 +5839,15 @@ impl CodexMessageProcessor { self.finalize_thread_teardown(thread_id).await; } - async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) { - let mut config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + async fn apps_list( + &self, + request_id: &ConnectionRequestId, + params: AppsListParams, + ) -> Result, JSONRPCErrorError> { + let mut config = self.load_latest_config(/*fallback_cwd*/ None).await?; if let Some(thread_id) = params.thread_id.as_deref() { - let (_, thread) = match self.load_thread(thread_id).await { - Ok(result) => result, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let (_, thread) = self.load_thread(thread_id).await?; let _ = config .features @@ -6141,32 +5859,20 @@ impl CodexMessageProcessor { .features .apps_enabled_for_auth(auth.as_ref().is_some_and(CodexAuth::uses_codex_backend)) { - self.outgoing - .send_response( - request_id, - AppsListResponse { - data: Vec::new(), - next_cursor: None, - }, - ) - .await; - return; + return Ok(Some(AppsListResponse { + data: Vec::new(), + next_cursor: None, + })); } if !self .workspace_codex_plugins_enabled(&config, auth.as_ref()) .await { - self.outgoing - .send_response( - request_id, - AppsListResponse { - data: Vec::new(), - next_cursor: None, - }, - ) - .await; - return; + return Ok(Some(AppsListResponse { + data: Vec::new(), + next_cursor: None, + })); } let request = request_id.clone(); @@ -6175,6 +5881,7 @@ impl CodexMessageProcessor { tokio::spawn(async move { Self::apps_list_task(outgoing, request, params, config, environment_manager).await; }); + Ok(None) } async fn apps_list_task( @@ -6331,11 +6038,6 @@ impl CodexMessageProcessor { } } - async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) { - let result = self.skills_list_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn skills_list_response( &self, params: SkillsListParams, @@ -6444,11 +6146,6 @@ impl CodexMessageProcessor { Ok(SkillsListResponse { data }) } - async fn hooks_list(&self, request_id: ConnectionRequestId, params: HooksListParams) { - let result = self.hooks_list_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - /// Handle `hooks/list` by resolving hooks for each requested cwd. async fn hooks_list_response( &self, @@ -6526,10 +6223,9 @@ impl CodexMessageProcessor { async fn marketplace_remove( &self, - request_id: ConnectionRequestId, params: MarketplaceRemoveParams, - ) { - let result = remove_marketplace( + ) -> Result { + remove_marketplace( self.config.codex_home.to_path_buf(), CoreMarketplaceRemoveRequest { marketplace_name: params.marketplace_name, @@ -6543,17 +6239,7 @@ impl CodexMessageProcessor { .map_err(|err| match err { MarketplaceRemoveError::InvalidRequest(message) => invalid_request(message), MarketplaceRemoveError::Internal(message) => internal_error(message), - }); - self.outgoing.send_result(request_id, result).await; - } - - async fn marketplace_upgrade( - &self, - request_id: ConnectionRequestId, - params: MarketplaceUpgradeParams, - ) { - let result = self.marketplace_upgrade_response(params).await; - self.outgoing.send_result(request_id, result).await; + }) } async fn marketplace_upgrade_response( @@ -6589,8 +6275,11 @@ impl CodexMessageProcessor { }) } - async fn marketplace_add(&self, request_id: ConnectionRequestId, params: MarketplaceAddParams) { - let result = add_marketplace_to_codex_home( + async fn marketplace_add( + &self, + params: MarketplaceAddParams, + ) -> Result { + add_marketplace_to_codex_home( self.config.codex_home.to_path_buf(), MarketplaceAddRequest { source: params.source, @@ -6607,17 +6296,7 @@ impl CodexMessageProcessor { .map_err(|err| match err { MarketplaceAddError::InvalidRequest(message) => invalid_request(message), MarketplaceAddError::Internal(message) => internal_error(message), - }); - self.outgoing.send_result(request_id, result).await; - } - - async fn skills_config_write( - &self, - request_id: ConnectionRequestId, - params: SkillsConfigWriteParams, - ) { - let result = self.skills_config_write_response(params).await; - self.outgoing.send_result(request_id, result).await; + }) } async fn skills_config_write_response( @@ -6664,224 +6343,201 @@ impl CodexMessageProcessor { params: TurnStartParams, app_server_client_name: Option, app_server_client_version: Option, - ) { - let result = async { - if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { - self.track_error_response( - &request_id, - &error, - Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), - ); - return Err(error); - } - let (thread_id, thread) = - self.load_thread(¶ms.thread_id) - .await - .inspect_err(|error| { - self.track_error_response(&request_id, error, /*error_type*/ None); - })?; - Self::set_app_server_client_info( - thread.as_ref(), - app_server_client_name, - app_server_client_version, - ) - .await - .inspect_err(|error| { - self.track_error_response(&request_id, error, /*error_type*/ None); - })?; + ) -> Result { + if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { + self.track_error_response( + &request_id, + &error, + Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), + ); + return Err(error); + } + let (thread_id, thread) = + self.load_thread(¶ms.thread_id) + .await + .inspect_err(|error| { + self.track_error_response(&request_id, error, /*error_type*/ None); + })?; + Self::set_app_server_client_info( + thread.as_ref(), + app_server_client_name, + app_server_client_version, + ) + .await + .inspect_err(|error| { + self.track_error_response(&request_id, error, /*error_type*/ None); + })?; - let collaboration_mode = params - .collaboration_mode - .map(|mode| self.normalize_turn_start_collaboration_mode(mode)); - let environment_selections = self.parse_environment_selections(params.environments)?; + let collaboration_mode = params + .collaboration_mode + .map(|mode| self.normalize_turn_start_collaboration_mode(mode)); + let environment_selections = self.parse_environment_selections(params.environments)?; - // Map v2 input items to core input items. - let mapped_items: Vec = params - .input - .into_iter() - .map(V2UserInput::into_core) - .collect(); - let turn_has_input = !mapped_items.is_empty(); + // Map v2 input items to core input items. + let mapped_items: Vec = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect(); + let turn_has_input = !mapped_items.is_empty(); - let has_any_overrides = params.cwd.is_some() - || params.approval_policy.is_some() - || params.approvals_reviewer.is_some() - || params.sandbox_policy.is_some() - || params.permissions.is_some() - || params.model.is_some() - || params.service_tier.is_some() - || params.effort.is_some() - || params.summary.is_some() - || collaboration_mode.is_some() - || params.personality.is_some(); + let has_any_overrides = params.cwd.is_some() + || params.approval_policy.is_some() + || params.approvals_reviewer.is_some() + || params.sandbox_policy.is_some() + || params.permissions.is_some() + || params.model.is_some() + || params.service_tier.is_some() + || params.effort.is_some() + || params.summary.is_some() + || collaboration_mode.is_some() + || params.personality.is_some(); - if params.sandbox_policy.is_some() && params.permissions.is_some() { - return Err(invalid_request( - "`permissions` cannot be combined with `sandboxPolicy`", - )); - } + if params.sandbox_policy.is_some() && params.permissions.is_some() { + return Err(invalid_request( + "`permissions` cannot be combined with `sandboxPolicy`", + )); + } - let cwd = params.cwd; - let approval_policy = params.approval_policy.map(AskForApproval::to_core); - let approvals_reviewer = params - .approvals_reviewer - .map(codex_app_server_protocol::ApprovalsReviewer::to_core); - let sandbox_policy = params.sandbox_policy.map(|p| p.to_core()); - let (permission_profile, active_permission_profile) = - if let Some(permissions) = params.permissions { - let snapshot = thread.config_snapshot().await; - let mut overrides = ConfigOverrides { - cwd: cwd.clone(), - codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(), - main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(), - ..Default::default() - }; - apply_permission_profile_selection_to_config_overrides( - &mut overrides, - Some(permissions), - ); - let config = self - .config_manager - .load_for_cwd( - /*request_overrides*/ None, - overrides, - Some(snapshot.cwd.to_path_buf()), - ) - .await - .map_err(|err| config_load_error(&err))?; - // Startup config is allowed to fall back when requirements - // disallow a configured profile. An explicit turn request - // is different: reject it before accepting user input. - if let Some(warning) = config.startup_warnings.iter().find(|warning| { - warning.contains("Configured value for `permission_profile` is disallowed") - }) { - return Err(invalid_request(format!( - "invalid turn context override: {warning}" - ))); - } - ( - Some(config.permissions.permission_profile()), - config.permissions.active_permission_profile(), - ) - } else { - (None, None) + let cwd = params.cwd; + let approval_policy = params.approval_policy.map(AskForApproval::to_core); + let approvals_reviewer = params + .approvals_reviewer + .map(codex_app_server_protocol::ApprovalsReviewer::to_core); + let sandbox_policy = params.sandbox_policy.map(|p| p.to_core()); + let (permission_profile, active_permission_profile) = + if let Some(permissions) = params.permissions { + let snapshot = thread.config_snapshot().await; + let mut overrides = ConfigOverrides { + cwd: cwd.clone(), + codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(), + main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(), + ..Default::default() }; - let model = params.model; - let effort = params.effort.map(Some); - let summary = params.summary; - let service_tier = params.service_tier; - let personality = params.personality; - - // If any overrides are provided, validate them synchronously so the - // request can fail before accepting user input. The actual update is - // still queued together with the input below to preserve submission order. - if has_any_overrides { - thread - .validate_turn_context_overrides(CodexThreadTurnContextOverrides { - cwd: cwd.clone(), - approval_policy, - approvals_reviewer, - sandbox_policy: sandbox_policy.clone(), - permission_profile: permission_profile.clone(), - active_permission_profile: active_permission_profile.clone(), - windows_sandbox_level: None, - model: model.clone(), - effort, - summary, - service_tier, - collaboration_mode: collaboration_mode.clone(), - personality, - }) + apply_permission_profile_selection_to_config_overrides( + &mut overrides, + Some(permissions), + ); + let config = self + .config_manager + .load_for_cwd( + /*request_overrides*/ None, + overrides, + Some(snapshot.cwd.to_path_buf()), + ) .await - .map_err(|err| { - invalid_request(format!("invalid turn context override: {err}")) - })?; - } + .map_err(|err| config_load_error(&err))?; + // Startup config is allowed to fall back when requirements + // disallow a configured profile. An explicit turn request + // is different: reject it before accepting user input. + if let Some(warning) = config.startup_warnings.iter().find(|warning| { + warning.contains("Configured value for `permission_profile` is disallowed") + }) { + return Err(invalid_request(format!( + "invalid turn context override: {warning}" + ))); + } + ( + Some(config.permissions.permission_profile()), + config.permissions.active_permission_profile(), + ) + } else { + (None, None) + }; + let model = params.model; + let effort = params.effort.map(Some); + let summary = params.summary; + let service_tier = params.service_tier; + let personality = params.personality; - // Start the turn by submitting the user input. Return its submission id as turn_id. - let turn_op = if has_any_overrides { - Op::UserInputWithTurnContext { - items: mapped_items, - environments: environment_selections, - final_output_json_schema: params.output_schema, - responsesapi_client_metadata: params.responsesapi_client_metadata, - cwd, + // If any overrides are provided, validate them synchronously so the + // request can fail before accepting user input. The actual update is + // still queued together with the input below to preserve submission order. + if has_any_overrides { + thread + .validate_turn_context_overrides(CodexThreadTurnContextOverrides { + cwd: cwd.clone(), approval_policy, approvals_reviewer, - sandbox_policy, - permission_profile, - active_permission_profile, + sandbox_policy: sandbox_policy.clone(), + permission_profile: permission_profile.clone(), + active_permission_profile: active_permission_profile.clone(), windows_sandbox_level: None, - model, + model: model.clone(), effort, summary, service_tier, - collaboration_mode, + collaboration_mode: collaboration_mode.clone(), personality, - } - } else { - Op::UserInput { - items: mapped_items, - environments: environment_selections, - final_output_json_schema: params.output_schema, - responsesapi_client_metadata: params.responsesapi_client_metadata, - } - }; - let turn_id = self - .submit_core_op(&request_id, thread.as_ref(), turn_op) + }) .await - .map_err(|err| { - let error = internal_error(format!("failed to start turn: {err}")); - self.track_error_response(&request_id, &error, /*error_type*/ None); - error - })?; - - if turn_has_input { - let config_snapshot = thread.config_snapshot().await; - codex_memories_write::start_memories_startup_task( - Arc::clone(&self.thread_manager), - Arc::clone(&self.auth_manager), - thread_id, - Arc::clone(&thread), - thread.config().await, - &config_snapshot.session_source, - ); - } - - self.outgoing - .record_request_turn_id(&request_id, &turn_id) - .await; - let turn = Turn { - id: turn_id, - items: vec![], - error: None, - status: TurnStatus::InProgress, - started_at: None, - completed_at: None, - duration_ms: None, - }; - - Ok::<_, JSONRPCErrorError>(TurnStartResponse { turn }) + .map_err(|err| invalid_request(format!("invalid turn context override: {err}")))?; } - .await; - match result { - Ok(response) => { - self.outgoing.send_response(request_id, response).await; + // Start the turn by submitting the user input. Return its submission id as turn_id. + let turn_op = if has_any_overrides { + Op::UserInputWithTurnContext { + items: mapped_items, + environments: environment_selections, + final_output_json_schema: params.output_schema, + responsesapi_client_metadata: params.responsesapi_client_metadata, + cwd, + approval_policy, + approvals_reviewer, + sandbox_policy, + permission_profile, + active_permission_profile, + windows_sandbox_level: None, + model, + effort, + summary, + service_tier, + collaboration_mode, + personality, } - Err(error) => { - self.outgoing.send_error(request_id, error).await; + } else { + Op::UserInput { + items: mapped_items, + environments: environment_selections, + final_output_json_schema: params.output_schema, + responsesapi_client_metadata: params.responsesapi_client_metadata, } + }; + let turn_id = self + .submit_core_op(&request_id, thread.as_ref(), turn_op) + .await + .map_err(|err| { + let error = internal_error(format!("failed to start turn: {err}")); + self.track_error_response(&request_id, &error, /*error_type*/ None); + error + })?; + + if turn_has_input { + let config_snapshot = thread.config_snapshot().await; + codex_memories_write::start_memories_startup_task( + Arc::clone(&self.thread_manager), + Arc::clone(&self.auth_manager), + thread_id, + Arc::clone(&thread), + thread.config().await, + &config_snapshot.session_source, + ); } - } - async fn thread_inject_items( - &self, - request_id: ConnectionRequestId, - params: ThreadInjectItemsParams, - ) { - let result = self.thread_inject_items_response(params).await; - self.outgoing.send_result(request_id, result).await; + self.outgoing + .record_request_turn_id(&request_id, &turn_id) + .await; + let turn = Turn { + id: turn_id, + items: vec![], + error: None, + status: TurnStatus::InProgress, + started_at: None, + completed_at: None, + duration_ms: None, + }; + + Ok(TurnStartResponse { turn }) } async fn thread_inject_items_response( @@ -6926,123 +6582,115 @@ impl CodexMessageProcessor { }) } - async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) { - let result = async { - let (_, thread) = self - .load_thread(¶ms.thread_id) - .await - .inspect_err(|error| { - self.track_error_response(&request_id, error, /*error_type*/ None); - })?; + async fn turn_steer( + &self, + request_id: &ConnectionRequestId, + params: TurnSteerParams, + ) -> Result { + let (_, thread) = self + .load_thread(¶ms.thread_id) + .await + .inspect_err(|error| { + self.track_error_response(request_id, error, /*error_type*/ None); + })?; - if params.expected_turn_id.is_empty() { - return Err(invalid_request("expectedTurnId must not be empty")); - } - self.outgoing - .record_request_turn_id(&request_id, ¶ms.expected_turn_id) - .await; - if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { - self.track_error_response( - &request_id, - &error, - Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), - ); - return Err(error); - } - - let mapped_items: Vec = params - .input - .into_iter() - .map(V2UserInput::into_core) - .collect(); - - let turn_id = thread - .steer_input( - mapped_items, - Some(¶ms.expected_turn_id), - params.responsesapi_client_metadata, - ) - .await - .map_err(|err| { - let (code, message, data, error_type) = match err { - SteerInputError::NoActiveTurn(_) => ( - INVALID_REQUEST_ERROR_CODE, - "no active turn to steer".to_string(), - None, - Some(AnalyticsJsonRpcError::TurnSteer( - TurnSteerRequestError::NoActiveTurn, - )), - ), - SteerInputError::ExpectedTurnMismatch { expected, actual } => ( - INVALID_REQUEST_ERROR_CODE, - format!("expected active turn id `{expected}` but found `{actual}`"), - None, - Some(AnalyticsJsonRpcError::TurnSteer( - TurnSteerRequestError::ExpectedTurnMismatch, - )), - ), - SteerInputError::ActiveTurnNotSteerable { turn_kind } => { - let (message, turn_steer_error) = match turn_kind { - codex_protocol::protocol::NonSteerableTurnKind::Review => ( - "cannot steer a review turn".to_string(), - TurnSteerRequestError::NonSteerableReview, - ), - codex_protocol::protocol::NonSteerableTurnKind::Compact => ( - "cannot steer a compact turn".to_string(), - TurnSteerRequestError::NonSteerableCompact, - ), - }; - let error = TurnError { - message: message.clone(), - codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable { - turn_kind: turn_kind.into(), - }), - additional_details: None, - }; - let data = match serde_json::to_value(error) { - Ok(data) => Some(data), - Err(error) => { - tracing::error!( - ?error, - "failed to serialize active-turn-not-steerable turn error" - ); - None - } - }; - ( - INVALID_REQUEST_ERROR_CODE, - message, - data, - Some(AnalyticsJsonRpcError::TurnSteer(turn_steer_error)), - ) - } - SteerInputError::EmptyInput => ( - INVALID_REQUEST_ERROR_CODE, - "input must not be empty".to_string(), - None, - Some(AnalyticsJsonRpcError::Input(InputError::Empty)), - ), - }; - let error = JSONRPCErrorError { - code, - message, - data, - }; - self.track_error_response(&request_id, &error, error_type); - error - })?; - Ok::<_, JSONRPCErrorError>(TurnSteerResponse { turn_id }) + if params.expected_turn_id.is_empty() { + return Err(invalid_request("expectedTurnId must not be empty")); } - .await; - - match result { - Ok(response) => { - self.outgoing.send_response(request_id, response).await; - } - Err(error) => { - self.outgoing.send_error(request_id, error).await; - } + self.outgoing + .record_request_turn_id(request_id, ¶ms.expected_turn_id) + .await; + if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { + self.track_error_response( + request_id, + &error, + Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), + ); + return Err(error); } + + let mapped_items: Vec = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect(); + + let turn_id = thread + .steer_input( + mapped_items, + Some(¶ms.expected_turn_id), + params.responsesapi_client_metadata, + ) + .await + .map_err(|err| { + let (code, message, data, error_type) = match err { + SteerInputError::NoActiveTurn(_) => ( + INVALID_REQUEST_ERROR_CODE, + "no active turn to steer".to_string(), + None, + Some(AnalyticsJsonRpcError::TurnSteer( + TurnSteerRequestError::NoActiveTurn, + )), + ), + SteerInputError::ExpectedTurnMismatch { expected, actual } => ( + INVALID_REQUEST_ERROR_CODE, + format!("expected active turn id `{expected}` but found `{actual}`"), + None, + Some(AnalyticsJsonRpcError::TurnSteer( + TurnSteerRequestError::ExpectedTurnMismatch, + )), + ), + SteerInputError::ActiveTurnNotSteerable { turn_kind } => { + let (message, turn_steer_error) = match turn_kind { + codex_protocol::protocol::NonSteerableTurnKind::Review => ( + "cannot steer a review turn".to_string(), + TurnSteerRequestError::NonSteerableReview, + ), + codex_protocol::protocol::NonSteerableTurnKind::Compact => ( + "cannot steer a compact turn".to_string(), + TurnSteerRequestError::NonSteerableCompact, + ), + }; + let error = TurnError { + message: message.clone(), + codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable { + turn_kind: turn_kind.into(), + }), + additional_details: None, + }; + let data = match serde_json::to_value(error) { + Ok(data) => Some(data), + Err(error) => { + tracing::error!( + ?error, + "failed to serialize active-turn-not-steerable turn error" + ); + None + } + }; + ( + INVALID_REQUEST_ERROR_CODE, + message, + data, + Some(AnalyticsJsonRpcError::TurnSteer(turn_steer_error)), + ) + } + SteerInputError::EmptyInput => ( + INVALID_REQUEST_ERROR_CODE, + "input must not be empty".to_string(), + None, + Some(AnalyticsJsonRpcError::Input(InputError::Empty)), + ), + }; + let error = JSONRPCErrorError { + code, + message, + data, + }; + self.track_error_response(request_id, &error, error_type); + error + })?; + Ok(TurnSteerResponse { turn_id }) } async fn prepare_realtime_conversation_thread( @@ -7078,140 +6726,107 @@ impl CodexMessageProcessor { async fn thread_realtime_start( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadRealtimeStartParams, - ) { - let result = async { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) - .await? - else { - return Ok(None); - }; - self.submit_core_op( - &request_id, - thread.as_ref(), - Op::RealtimeConversationStart(ConversationStartParams { - output_modality: params.output_modality, - prompt: params.prompt, - realtime_session_id: params.realtime_session_id, - transport: params.transport.map(|transport| match transport { - ThreadRealtimeStartTransport::Websocket => { - ConversationStartTransport::Websocket - } - ThreadRealtimeStartTransport::Webrtc { sdp } => { - ConversationStartTransport::Webrtc { sdp } - } - }), - voice: params.voice, + ) -> Result, JSONRPCErrorError> { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( + request_id, + thread.as_ref(), + Op::RealtimeConversationStart(ConversationStartParams { + output_modality: params.output_modality, + prompt: params.prompt, + realtime_session_id: params.realtime_session_id, + transport: params.transport.map(|transport| match transport { + ThreadRealtimeStartTransport::Websocket => { + ConversationStartTransport::Websocket + } + ThreadRealtimeStartTransport::Webrtc { sdp } => { + ConversationStartTransport::Webrtc { sdp } + } }), - ) - .await - .map_err(|err| { - internal_error(format!("failed to start realtime conversation: {err}")) - })?; - Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStartResponse::default())) - } - .await; - self.send_optional_result(request_id, result).await; + voice: params.voice, + }), + ) + .await + .map_err(|err| internal_error(format!("failed to start realtime conversation: {err}")))?; + Ok(Some(ThreadRealtimeStartResponse::default())) } async fn thread_realtime_append_audio( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadRealtimeAppendAudioParams, - ) { - let result = async { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) - .await? - else { - return Ok(None); - }; - self.submit_core_op( - &request_id, - thread.as_ref(), - Op::RealtimeConversationAudio(ConversationAudioParams { - frame: params.audio.into(), - }), - ) - .await - .map_err(|err| { - internal_error(format!( - "failed to append realtime conversation audio: {err}" - )) - })?; - Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendAudioResponse::default())) - } - .await; - self.send_optional_result(request_id, result).await; + ) -> Result, JSONRPCErrorError> { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( + request_id, + thread.as_ref(), + Op::RealtimeConversationAudio(ConversationAudioParams { + frame: params.audio.into(), + }), + ) + .await + .map_err(|err| { + internal_error(format!( + "failed to append realtime conversation audio: {err}" + )) + })?; + Ok(Some(ThreadRealtimeAppendAudioResponse::default())) } async fn thread_realtime_append_text( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadRealtimeAppendTextParams, - ) { - let result = async { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) - .await? - else { - return Ok(None); - }; - self.submit_core_op( - &request_id, - thread.as_ref(), - Op::RealtimeConversationText(ConversationTextParams { text: params.text }), - ) - .await - .map_err(|err| { - internal_error(format!( - "failed to append realtime conversation text: {err}" - )) - })?; - Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendTextResponse::default())) - } - .await; - self.send_optional_result(request_id, result).await; + ) -> Result, JSONRPCErrorError> { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( + request_id, + thread.as_ref(), + Op::RealtimeConversationText(ConversationTextParams { text: params.text }), + ) + .await + .map_err(|err| { + internal_error(format!( + "failed to append realtime conversation text: {err}" + )) + })?; + Ok(Some(ThreadRealtimeAppendTextResponse::default())) } async fn thread_realtime_stop( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: ThreadRealtimeStopParams, - ) { - let result = async { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) - .await? - else { - return Ok(None); - }; - self.submit_core_op(&request_id, thread.as_ref(), Op::RealtimeConversationClose) - .await - .map_err(|err| { - internal_error(format!("failed to stop realtime conversation: {err}")) - })?; - Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStopResponse::default())) - } - .await; - self.send_optional_result(request_id, result).await; - } - - async fn thread_realtime_list_voices( - &self, - request_id: ConnectionRequestId, - _params: ThreadRealtimeListVoicesParams, - ) { - self.outgoing - .send_response( - request_id, - ThreadRealtimeListVoicesResponse { - voices: RealtimeVoicesList::builtin(), - }, - ) - .await; + ) -> Result, JSONRPCErrorError> { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op(request_id, thread.as_ref(), Op::RealtimeConversationClose) + .await + .map_err(|err| { + internal_error(format!("failed to stop realtime conversation: {err}")) + })?; + Ok(Some(ThreadRealtimeStopResponse::default())) } fn build_review_turn(turn_id: String, display_text: &str) -> Turn { @@ -7268,21 +6883,12 @@ impl CodexMessageProcessor { parent_thread.as_ref(), Op::Review { review_request }, ) + .await + .map_err(|err| internal_error(format!("failed to start review: {err}")))?; + let turn = Self::build_review_turn(turn_id, display_text); + self.emit_review_started(request_id, turn, parent_thread_id) .await; - - match turn_id { - Ok(turn_id) => { - let turn = Self::build_review_turn(turn_id, display_text); - self.emit_review_started(request_id, turn, parent_thread_id) - .await; - Ok(()) - } - Err(err) => Err(JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to start review: {err}"), - data: None, - }), - } + Ok(()) } async fn start_detached_review( @@ -7298,15 +6904,13 @@ impl CodexMessageProcessor { } else { find_thread_path_by_id_str(&self.config.codex_home, &parent_thread_id.to_string()) .await - .map_err(|err| JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to locate thread id {parent_thread_id}: {err}"), - data: None, + .map_err(|err| { + internal_error(format!( + "failed to locate thread id {parent_thread_id}: {err}" + )) })? - .ok_or_else(|| JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("no rollout found for thread id {parent_thread_id}"), - data: None, + .ok_or_else(|| { + invalid_request(format!("no rollout found for thread id {parent_thread_id}")) })? }; @@ -7330,10 +6934,8 @@ impl CodexMessageProcessor { self.request_trace_context(request_id).await, ) .await - .map_err(|err| JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("error creating detached review thread: {err}"), - data: None, + .map_err(|err| { + internal_error(format!("error creating detached review thread: {err}")) })?; Self::log_listener_attach_result( @@ -7389,10 +6991,8 @@ impl CodexMessageProcessor { Op::Review { review_request }, ) .await - .map_err(|err| JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to start detached review turn: {err}"), - data: None, + .map_err(|err| { + internal_error(format!("failed to start detached review turn: {err}")) })?; let turn = Self::build_review_turn(turn_id, display_text); @@ -7403,108 +7003,107 @@ impl CodexMessageProcessor { Ok(()) } - async fn review_start(&self, request_id: ConnectionRequestId, params: ReviewStartParams) { + async fn review_start( + &self, + request_id: &ConnectionRequestId, + params: ReviewStartParams, + ) -> Result<(), JSONRPCErrorError> { let ReviewStartParams { thread_id, target, delivery, } = params; - let result = async { - let (parent_thread_id, parent_thread) = self.load_thread(&thread_id).await?; - let (review_request, display_text) = Self::review_request_from_target(target)?; - match delivery.unwrap_or(ApiReviewDelivery::Inline).to_core() { - CoreReviewDelivery::Inline => { - self.start_inline_review( - &request_id, - parent_thread, - review_request, - display_text.as_str(), - thread_id, - ) - .await?; - } - CoreReviewDelivery::Detached => { - self.start_detached_review( - &request_id, - parent_thread_id, - parent_thread, - review_request, - display_text.as_str(), - ) - .await?; - } + + let (parent_thread_id, parent_thread) = self.load_thread(&thread_id).await?; + let (review_request, display_text) = Self::review_request_from_target(target)?; + match delivery.unwrap_or(ApiReviewDelivery::Inline).to_core() { + CoreReviewDelivery::Inline => { + self.start_inline_review( + request_id, + parent_thread, + review_request, + display_text.as_str(), + thread_id, + ) + .await?; + } + CoreReviewDelivery::Detached => { + self.start_detached_review( + request_id, + parent_thread_id, + parent_thread, + review_request, + display_text.as_str(), + ) + .await?; } - Ok::<_, JSONRPCErrorError>(None::) } - .await; - self.send_optional_result(request_id, result).await; + Ok(()) } - async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) { + async fn turn_interrupt( + &self, + request_id: &ConnectionRequestId, + params: TurnInterruptParams, + ) -> Result, JSONRPCErrorError> { let TurnInterruptParams { thread_id, turn_id } = params; let is_startup_interrupt = turn_id.is_empty(); - let result = async { - let (thread_uuid, thread) = self.load_thread(&thread_id).await?; + let (thread_uuid, thread) = self.load_thread(&thread_id).await?; - // Record turn interrupts so we can reply when TurnAborted arrives. Startup - // interrupts do not have a turn and are acknowledged after submission. - if !is_startup_interrupt { - let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; - let is_running = matches!(thread.agent_status().await, AgentStatus::Running); - { - let mut thread_state = thread_state.lock().await; - if let Some(active_turn) = thread_state.active_turn_snapshot() { - if active_turn.id != turn_id { - return Err(invalid_request(format!( - "expected active turn id {turn_id} but found {}", - active_turn.id - ))); - } - } else if thread_state.last_terminal_turn_id.as_deref() - == Some(turn_id.as_str()) - || !is_running - { - return Err(invalid_request("no active turn to interrupt")); + // Record turn interrupts so we can reply when TurnAborted arrives. Startup + // interrupts do not have a turn and are acknowledged after submission. + if !is_startup_interrupt { + let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; + let is_running = matches!(thread.agent_status().await, AgentStatus::Running); + { + let mut thread_state = thread_state.lock().await; + if let Some(active_turn) = thread_state.active_turn_snapshot() { + if active_turn.id != turn_id { + return Err(invalid_request(format!( + "expected active turn id {turn_id} but found {}", + active_turn.id + ))); } - thread_state.pending_interrupts.push(request_id.clone()); + } else if thread_state.last_terminal_turn_id.as_deref() == Some(turn_id.as_str()) + || !is_running + { + return Err(invalid_request("no active turn to interrupt")); } - - self.outgoing - .record_request_turn_id(&request_id, &turn_id) - .await; + thread_state.pending_interrupts.push(request_id.clone()); } - // Submit the interrupt. Turn interrupts respond upon TurnAborted; startup - // interrupts respond here because startup cancellation has no turn event. - match self - .submit_core_op(&request_id, thread.as_ref(), Op::Interrupt) - .await - { - Ok(_) if is_startup_interrupt => Ok(Some(TurnInterruptResponse {})), - Ok(_) => Ok(None), - Err(err) => { - if !is_startup_interrupt { - let thread_state = - self.thread_state_manager.thread_state(thread_uuid).await; - let mut thread_state = thread_state.lock().await; - thread_state - .pending_interrupts - .retain(|pending_request_id| pending_request_id != &request_id); - } - let interrupt_target = if is_startup_interrupt { - "startup" - } else { - "turn" - }; - Err(internal_error(format!( - "failed to interrupt {interrupt_target}: {err}" - ))) + self.outgoing + .record_request_turn_id(request_id, &turn_id) + .await; + } + + // Submit the interrupt. Turn interrupts respond upon TurnAborted; startup + // interrupts respond here because startup cancellation has no turn event. + match self + .submit_core_op(request_id, thread.as_ref(), Op::Interrupt) + .await + { + Ok(_) if is_startup_interrupt => Ok(Some(TurnInterruptResponse {})), + Ok(_) => Ok(None), + Err(err) => { + if !is_startup_interrupt { + let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; + let mut thread_state = thread_state.lock().await; + thread_state + .pending_interrupts + .retain(|pending_request_id| pending_request_id != request_id); } + let interrupt_target = if is_startup_interrupt { + "startup" + } else { + "turn" + }; + Err(internal_error(format!( + "failed to interrupt {interrupt_target}: {err}" + ))) } } - .await; - self.send_optional_result(request_id, result).await; } async fn ensure_conversation_listener( @@ -7809,8 +7408,11 @@ impl CodexMessageProcessor { }); Ok(()) } - async fn git_diff_to_origin(&self, request_id: ConnectionRequestId, cwd: PathBuf) { - let result = git_diff_to_remote(&cwd) + async fn git_diff_to_origin( + &self, + cwd: PathBuf, + ) -> Result { + git_diff_to_remote(&cwd) .await .map(|value| GitDiffToRemoteResponse { sha: value.sha, @@ -7820,15 +7422,13 @@ impl CodexMessageProcessor { invalid_request(format!( "failed to compute git diff to remote for cwd: {cwd:?}" )) - }); - self.outgoing.send_result(request_id, result).await; + }) } async fn fuzzy_file_search( &self, - request_id: ConnectionRequestId, params: FuzzyFileSearchParams, - ) { + ) -> Result { let FuzzyFileSearchParams { query, roots, @@ -7864,17 +7464,7 @@ impl CodexMessageProcessor { } } - let response = FuzzyFileSearchResponse { files: results }; - self.outgoing.send_response(request_id, response).await; - } - - async fn fuzzy_file_search_session_start( - &self, - request_id: ConnectionRequestId, - params: FuzzyFileSearchSessionStartParams, - ) { - let result = self.fuzzy_file_search_session_start_response(params).await; - self.outgoing.send_result(request_id, result).await; + Ok(FuzzyFileSearchResponse { files: results }) } async fn fuzzy_file_search_session_start_response( @@ -7898,15 +7488,6 @@ impl CodexMessageProcessor { Ok(FuzzyFileSearchSessionStartResponse {}) } - async fn fuzzy_file_search_session_update( - &self, - request_id: ConnectionRequestId, - params: FuzzyFileSearchSessionUpdateParams, - ) { - let result = self.fuzzy_file_search_session_update_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - async fn fuzzy_file_search_session_update_response( &self, params: FuzzyFileSearchSessionUpdateParams, @@ -7932,23 +7513,12 @@ impl CodexMessageProcessor { async fn fuzzy_file_search_session_stop( &self, - request_id: ConnectionRequestId, params: FuzzyFileSearchSessionStopParams, - ) { + ) -> Result { let FuzzyFileSearchSessionStopParams { session_id } = params; - { - let mut sessions = self.fuzzy_search_sessions.lock().await; - sessions.remove(&session_id); - } + self.fuzzy_search_sessions.lock().await.remove(&session_id); - self.outgoing - .send_response(request_id, FuzzyFileSearchSessionStopResponse {}) - .await; - } - - async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) { - let result = self.upload_feedback_response(params).await; - self.outgoing.send_result(request_id, result).await; + Ok(FuzzyFileSearchSessionStopResponse {}) } async fn upload_feedback_response( @@ -8133,9 +7703,9 @@ impl CodexMessageProcessor { async fn windows_sandbox_setup_start( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, params: WindowsSandboxSetupStartParams, - ) { + ) -> Result<(), JSONRPCErrorError> { self.outgoing .send_response( request_id.clone(), @@ -8199,6 +7769,7 @@ impl CodexMessageProcessor { ) .await; }); + Ok(()) } async fn resolve_rollout_path( @@ -8221,26 +7792,6 @@ impl CodexMessageProcessor { None }) } - - async fn send_invalid_request_error( - &self, - request_id: ConnectionRequestId, - message: impl Into, - ) { - self.outgoing - .send_error(request_id, invalid_request(message)) - .await; - } - - async fn send_internal_error( - &self, - request_id: ConnectionRequestId, - message: impl Into, - ) { - self.outgoing - .send_error(request_id, internal_error(message)) - .await; - } } fn auto_review_rollout_filename(thread_id: ThreadId) -> String { diff --git a/codex-rs/app-server/src/codex_message_processor/plugins.rs b/codex-rs/app-server/src/codex_message_processor/plugins.rs index 5bab115517..78c1c3008a 100644 --- a/codex-rs/app-server/src/codex_message_processor/plugins.rs +++ b/codex-rs/app-server/src/codex_message_processor/plugins.rs @@ -8,15 +8,6 @@ use codex_core_plugins::remote::validate_remote_plugin_id; impl CodexMessageProcessor { pub(super) async fn plugin_list( - &self, - request_id: ConnectionRequestId, - params: PluginListParams, - ) { - let result = self.plugin_list_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_list_response( &self, params: PluginListParams, ) -> Result { @@ -174,15 +165,6 @@ impl CodexMessageProcessor { } pub(super) async fn plugin_read( - &self, - request_id: ConnectionRequestId, - params: PluginReadParams, - ) { - let result = self.plugin_read_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_read_response( &self, params: PluginReadParams, ) -> Result { @@ -303,15 +285,6 @@ impl CodexMessageProcessor { } pub(super) async fn plugin_skill_read( - &self, - request_id: ConnectionRequestId, - params: PluginSkillReadParams, - ) { - let result = self.plugin_skill_read_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_skill_read_response( &self, params: PluginSkillReadParams, ) -> Result { @@ -358,15 +331,6 @@ impl CodexMessageProcessor { } pub(super) async fn plugin_share_save( - &self, - request_id: ConnectionRequestId, - params: PluginShareSaveParams, - ) { - let result = self.plugin_share_save_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_share_save_response( &self, params: PluginShareSaveParams, ) -> Result { @@ -403,15 +367,7 @@ impl CodexMessageProcessor { pub(super) async fn plugin_share_list( &self, - request_id: ConnectionRequestId, _params: PluginShareListParams, - ) { - let result = self.plugin_share_list_response().await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_share_list_response( - &self, ) -> Result { let (config, auth) = self.load_plugin_share_config_and_auth().await?; let remote_plugin_service_config = RemotePluginServiceConfig { @@ -443,15 +399,6 @@ impl CodexMessageProcessor { } pub(super) async fn plugin_share_delete( - &self, - request_id: ConnectionRequestId, - params: PluginShareDeleteParams, - ) { - let result = self.plugin_share_delete_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_share_delete_response( &self, params: PluginShareDeleteParams, ) -> Result { @@ -490,15 +437,6 @@ impl CodexMessageProcessor { } pub(super) async fn plugin_install( - &self, - request_id: ConnectionRequestId, - params: PluginInstallParams, - ) { - let result = self.plugin_install_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_install_response( &self, params: PluginInstallParams, ) -> Result { @@ -760,15 +698,6 @@ impl CodexMessageProcessor { } pub(super) async fn plugin_uninstall( - &self, - request_id: ConnectionRequestId, - params: PluginUninstallParams, - ) { - let result = self.plugin_uninstall_response(params).await; - self.outgoing.send_result(request_id, result).await; - } - - async fn plugin_uninstall_response( &self, params: PluginUninstallParams, ) -> Result { @@ -975,28 +904,16 @@ fn remote_plugin_catalog_error_to_jsonrpc( err: RemotePluginCatalogError, context: &str, ) -> JSONRPCErrorError { - match err { + let code = match &err { RemotePluginCatalogError::AuthRequired | RemotePluginCatalogError::UnsupportedAuthMode => { - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("{context}: {err}"), - data: None, - } + INVALID_REQUEST_ERROR_CODE } RemotePluginCatalogError::UnexpectedStatus { status, .. } if status.as_u16() == 404 => { - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("{context}: {err}"), - data: None, - } + INVALID_REQUEST_ERROR_CODE } RemotePluginCatalogError::InvalidPluginPath { .. } | RemotePluginCatalogError::ArchiveTooLarge { .. } - | RemotePluginCatalogError::UnknownMarketplace { .. } => JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("{context}: {err}"), - data: None, - }, + | RemotePluginCatalogError::UnknownMarketplace { .. } => INVALID_REQUEST_ERROR_CODE, RemotePluginCatalogError::AuthToken(_) | RemotePluginCatalogError::Request { .. } | RemotePluginCatalogError::UnexpectedStatus { .. } @@ -1010,11 +927,12 @@ fn remote_plugin_catalog_error_to_jsonrpc( | RemotePluginCatalogError::ArchiveJoin(_) | RemotePluginCatalogError::MissingUploadEtag | RemotePluginCatalogError::UnexpectedResponse(_) - | RemotePluginCatalogError::CacheRemove(_) => JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("{context}: {err}"), - data: None, - }, + | RemotePluginCatalogError::CacheRemove(_) => INTERNAL_ERROR_CODE, + }; + JSONRPCErrorError { + code, + message: format!("{context}: {err}"), + data: None, } } diff --git a/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs b/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs index 049e0af21c..5359f26836 100644 --- a/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs +++ b/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs @@ -6,63 +6,26 @@ impl CodexMessageProcessor { &self, request_id: ConnectionRequestId, params: ThreadGoalSetParams, - ) { + ) -> Result<(), JSONRPCErrorError> { if !self.config.features.enabled(Feature::Goals) { - self.send_invalid_request_error(request_id, "goals feature is disabled".to_string()) - .await; - return; + return Err(invalid_request("goals feature is disabled")); } - let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) { - Ok(thread_id) => thread_id, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - let state_db = match self.state_db_for_materialized_thread(thread_id).await { - Ok(state_db) => state_db, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?; + let state_db = self.state_db_for_materialized_thread(thread_id).await?; let running_thread = self.thread_manager.get_thread(thread_id).await.ok(); let rollout_path = match running_thread.as_ref() { - Some(thread) => match thread.rollout_path() { - Some(path) => path, - None => { - self.send_invalid_request_error( - request_id, - format!("ephemeral thread does not support goals: {thread_id}"), - ) - .await; - return; - } - }, - None => { - match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) - .await - { - Ok(Some(path)) => path, - Ok(None) => { - self.send_invalid_request_error( - request_id, - format!("thread not found: {thread_id}"), - ) - .await; - return; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to locate thread id {thread_id}: {err}"), - ) - .await; - return; - } - } - } + Some(thread) => thread.rollout_path().ok_or_else(|| { + invalid_request(format!( + "ephemeral thread does not support goals: {thread_id}" + )) + })?, + None => find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) + .await + .map_err(|err| { + internal_error(format!("failed to locate thread id {thread_id}: {err}")) + })? + .ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?, }; reconcile_rollout( Some(&state_db), @@ -84,61 +47,51 @@ impl CodexMessageProcessor { let objective = params.objective.as_deref().map(str::trim); if let Some(objective) = objective { - if let Err(message) = validate_thread_goal_objective(objective) { - self.send_invalid_request_error(request_id, message).await; - return; - } - if let Err(message) = validate_goal_budget(params.token_budget.flatten()) { - self.send_invalid_request_error(request_id, message).await; - return; - } - } else if let Some(token_budget) = params.token_budget - && let Err(message) = validate_goal_budget(token_budget) - { - self.send_invalid_request_error(request_id, message).await; - return; + validate_thread_goal_objective(objective).map_err(invalid_request)?; + } + if objective.is_some() || params.token_budget.is_some() { + validate_goal_budget(params.token_budget.flatten()).map_err(invalid_request)?; } if let Some(thread) = running_thread.as_ref() { thread.prepare_external_goal_mutation().await; } - let goal = if let Some(objective) = objective { - match state_db.get_thread_goal(thread_id).await { - Ok(goal) => { - if let Some(goal) = goal.as_ref().filter(|goal| { - goal.objective == objective - && goal.status != codex_state::ThreadGoalStatus::Complete - }) { - state_db - .update_thread_goal( - thread_id, - codex_state::ThreadGoalUpdate { - status, - token_budget: params.token_budget, - expected_goal_id: Some(goal.goal_id.clone()), - }, + let goal = (if let Some(objective) = objective { + let existing_goal = state_db + .get_thread_goal(thread_id) + .await + .map_err(|err| invalid_request(err.to_string()))?; + if let Some(goal) = existing_goal.as_ref().filter(|goal| { + goal.objective == objective + && goal.status != codex_state::ThreadGoalStatus::Complete + }) { + state_db + .update_thread_goal( + thread_id, + codex_state::ThreadGoalUpdate { + status, + token_budget: params.token_budget, + expected_goal_id: Some(goal.goal_id.clone()), + }, + ) + .await + .and_then(|goal| { + goal.ok_or_else(|| { + anyhow::anyhow!( + "cannot update goal for thread {thread_id}: no goal exists" ) - .await - .and_then(|goal| { - goal.ok_or_else(|| { - anyhow::anyhow!( - "cannot update goal for thread {thread_id}: no goal exists" - ) - }) - }) - } else { - state_db - .replace_thread_goal( - thread_id, - objective, - status.unwrap_or(codex_state::ThreadGoalStatus::Active), - params.token_budget.flatten(), - ) - .await - } - } - Err(err) => Err(err), + }) + }) + } else { + state_db + .replace_thread_goal( + thread_id, + objective, + status.unwrap_or(codex_state::ThreadGoalStatus::Active), + params.token_budget.flatten(), + ) + .await } } else { state_db @@ -156,16 +109,8 @@ impl CodexMessageProcessor { anyhow::anyhow!("cannot update goal for thread {thread_id}: no goal exists") }) }) - }; - - let goal = match goal { - Ok(goal) => goal, - Err(err) => { - self.send_invalid_request_error(request_id, err.to_string()) - .await; - return; - } - }; + }) + .map_err(|err| invalid_request(err.to_string()))?; let goal_status = goal.status; let goal = api_thread_goal_from_state(goal); self.outgoing @@ -179,107 +124,51 @@ impl CodexMessageProcessor { if let Some(thread) = running_thread.as_ref() { thread.apply_external_goal_set(goal_status).await; } + Ok(()) } pub(super) async fn thread_goal_get( &self, - request_id: ConnectionRequestId, params: ThreadGoalGetParams, - ) { + ) -> Result { if !self.config.features.enabled(Feature::Goals) { - self.send_invalid_request_error(request_id, "goals feature is disabled".to_string()) - .await; - return; + return Err(invalid_request("goals feature is disabled")); } - let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) { - Ok(thread_id) => thread_id, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - let state_db = match self.state_db_for_materialized_thread(thread_id).await { - Ok(state_db) => state_db, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - let goal = match state_db.get_thread_goal(thread_id).await { - Ok(goal) => goal.map(api_thread_goal_from_state), - Err(err) => { - self.send_internal_error(request_id, format!("failed to read thread goal: {err}")) - .await; - return; - } - }; - self.outgoing - .send_response(request_id, ThreadGoalGetResponse { goal }) - .await; + let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?; + let state_db = self.state_db_for_materialized_thread(thread_id).await?; + let goal = state_db + .get_thread_goal(thread_id) + .await + .map_err(|err| internal_error(format!("failed to read thread goal: {err}")))? + .map(api_thread_goal_from_state); + Ok(ThreadGoalGetResponse { goal }) } pub(super) async fn thread_goal_clear( &self, request_id: ConnectionRequestId, params: ThreadGoalClearParams, - ) { + ) -> Result<(), JSONRPCErrorError> { if !self.config.features.enabled(Feature::Goals) { - self.send_invalid_request_error(request_id, "goals feature is disabled".to_string()) - .await; - return; + return Err(invalid_request("goals feature is disabled")); } - let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) { - Ok(thread_id) => thread_id, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - let state_db = match self.state_db_for_materialized_thread(thread_id).await { - Ok(state_db) => state_db, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?; + let state_db = self.state_db_for_materialized_thread(thread_id).await?; let running_thread = self.thread_manager.get_thread(thread_id).await.ok(); let rollout_path = match running_thread.as_ref() { - Some(thread) => match thread.rollout_path() { - Some(path) => path, - None => { - self.send_invalid_request_error( - request_id, - format!("ephemeral thread does not support goals: {thread_id}"), - ) - .await; - return; - } - }, - None => { - match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) - .await - { - Ok(Some(path)) => path, - Ok(None) => { - self.send_invalid_request_error( - request_id, - format!("thread not found: {thread_id}"), - ) - .await; - return; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to locate thread id {thread_id}: {err}"), - ) - .await; - return; - } - } - } + Some(thread) => thread.rollout_path().ok_or_else(|| { + invalid_request(format!( + "ephemeral thread does not support goals: {thread_id}" + )) + })?, + None => find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) + .await + .map_err(|err| { + internal_error(format!("failed to locate thread id {thread_id}: {err}")) + })? + .ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?, }; reconcile_rollout( Some(&state_db), @@ -301,14 +190,10 @@ impl CodexMessageProcessor { let thread_state = thread_state.lock().await; thread_state.listener_command_tx() }; - let cleared = match state_db.delete_thread_goal(thread_id).await { - Ok(cleared) => cleared, - Err(err) => { - self.send_internal_error(request_id, format!("failed to clear thread goal: {err}")) - .await; - return; - } - }; + let cleared = state_db + .delete_thread_goal(thread_id) + .await + .map_err(|err| internal_error(format!("failed to clear thread goal: {err}")))?; if cleared && let Some(thread) = running_thread.as_ref() { thread.apply_external_goal_clear().await; @@ -321,6 +206,7 @@ impl CodexMessageProcessor { self.emit_thread_goal_cleared_ordered(thread_id, listener_command_tx) .await; } + Ok(()) } async fn state_db_for_materialized_thread( @@ -337,18 +223,12 @@ impl CodexMessageProcessor { return Ok(state_db); } } else { - match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await - { - Ok(Some(_)) => {} - Ok(None) => { - return Err(invalid_request(format!("thread not found: {thread_id}"))); - } - Err(err) => { - return Err(internal_error(format!( - "failed to locate thread id {thread_id}: {err}" - ))); - } - } + find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) + .await + .map_err(|err| { + internal_error(format!("failed to locate thread id {thread_id}: {err}")) + })? + .ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?; } open_state_db_for_direct_thread_lookup(&self.config) diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 7b394c3d8c..47ac0aadf0 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -38,14 +38,8 @@ use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ClientResponsePayload; -use codex_app_server_protocol::ConfigBatchWriteParams; -use codex_app_server_protocol::ConfigValueWriteParams; use codex_app_server_protocol::ConfigWarningNotification; -use codex_app_server_protocol::DeviceKeyCreateParams; -use codex_app_server_protocol::DeviceKeyPublicParams; -use codex_app_server_protocol::DeviceKeySignParams; use codex_app_server_protocol::ExperimentalApi; -use codex_app_server_protocol::ExperimentalFeatureEnablementSetParams; use codex_app_server_protocol::ExternalAgentConfigImportCompletedNotification; use codex_app_server_protocol::ExternalAgentConfigImportParams; use codex_app_server_protocol::ExternalAgentConfigImportResponse; @@ -394,25 +388,29 @@ impl MessageProcessor { Arc::clone(&self.outgoing), request_context.clone(), async { - let result = async { - let request_json = serde_json::to_value(&request) - .map_err(|err| invalid_request(format!("Invalid request: {err}")))?; - let codex_request = serde_json::from_value::(request_json) - .map_err(|err| invalid_request(format!("Invalid request: {err}")))?; - // Websocket callers finalize outbound readiness in lib.rs after mirroring - // session state into outbound state and sending initialize notifications to - // this specific connection. Passing `None` avoids marking the connection - // ready too early from inside the shared request handler. - self.handle_client_request( - request_id.clone(), - codex_request, - Arc::clone(&session), - /*outbound_initialized*/ None, - request_context.clone(), - ) - .await - } - .await; + let codex_request = serde_json::to_value(&request) + .map_err(|err| invalid_request(format!("Invalid request: {err}"))) + .and_then(|request_json| { + serde_json::from_value::(request_json) + .map_err(|err| invalid_request(format!("Invalid request: {err}"))) + }); + let result = match codex_request { + Ok(codex_request) => { + // Websocket callers finalize outbound readiness in lib.rs after mirroring + // session state into outbound state and sending initialize notifications to + // this specific connection. Passing `None` avoids marking the connection + // ready too early from inside the shared request handler. + self.handle_client_request( + request_id.clone(), + codex_request, + Arc::clone(&session), + /*outbound_initialized*/ None, + request_context.clone(), + ) + .await + } + Err(error) => Err(error), + }; if let Err(error) = result { self.outgoing.send_error(request_id.clone(), error).await; } @@ -792,161 +790,141 @@ impl MessageProcessor { device_key_requests_allowed: bool, ) -> Result<(), JSONRPCErrorError> { let connection_id = connection_request_id.connection_id; - let request_id_for_connection = |request_id| ConnectionRequestId { + let request_id = ConnectionRequestId { connection_id, - request_id, + request_id: codex_request.id().clone(), }; - match codex_request { - ClientRequest::ConfigRead { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.config_api.read(params).await, + let result: Result, JSONRPCErrorError> = match codex_request { + ClientRequest::ConfigRead { params, .. } => self + .config_api + .read(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ExternalAgentConfigDetect { params, .. } => self + .external_agent_config_api + .detect(params) + .await + .map(|response| Some(response.into())), + ClientRequest::ExternalAgentConfigImport { params, .. } => self + .handle_external_agent_config_import(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::ConfigValueWrite { params, .. } => self + .handle_config_mutation_result(self.config_api.write_value(params).await) + .await + .map(|response| Some(ClientResponsePayload::ConfigValueWrite(response))), + ClientRequest::ConfigBatchWrite { params, .. } => self + .handle_config_mutation_result(self.config_api.batch_write(params).await) + .await + .map(|response| Some(ClientResponsePayload::ConfigBatchWrite(response))), + ClientRequest::ExperimentalFeatureEnablementSet { params, .. } => { + let should_refresh_apps_list = params.enablement.get("apps").copied() == Some(true); + match self + .handle_config_mutation_result( + self.config_api + .set_experimental_feature_enablement(params) + .await, ) - .await; + .await + { + Ok(response) => { + self.outgoing + .send_response_as( + request_id.clone(), + ClientResponsePayload::ExperimentalFeatureEnablementSet(response), + ) + .await; + if should_refresh_apps_list { + self.refresh_apps_list_after_experimental_feature_enablement_set() + .await; + } + Ok(None) + } + Err(error) => Err(error), + } } - ClientRequest::ExternalAgentConfigDetect { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.external_agent_config_api.detect(params).await, - ) - .await; - } - ClientRequest::ExternalAgentConfigImport { request_id, params } => { - self.handle_external_agent_config_import( - request_id_for_connection(request_id), - params, - ) - .await?; - } - ClientRequest::ConfigValueWrite { request_id, params } => { - self.handle_config_value_write(request_id_for_connection(request_id), params) - .await; - } - ClientRequest::ConfigBatchWrite { request_id, params } => { - self.handle_config_batch_write(request_id_for_connection(request_id), params) - .await; - } - ClientRequest::ExperimentalFeatureEnablementSet { request_id, params } => { - self.handle_experimental_feature_enablement_set( - request_id_for_connection(request_id), - params, - ) - .await; - } - ClientRequest::ConfigRequirementsRead { - request_id, - params: _, - } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.config_api.config_requirements_read().await, - ) - .await; - } - ClientRequest::DeviceKeyCreate { request_id, params } => { - self.handle_device_key_create( - request_id_for_connection(request_id), - params, + ClientRequest::ConfigRequirementsRead { params: _, .. } => self + .config_api + .config_requirements_read() + .await + .map(|response| Some(response.into())), + ClientRequest::DeviceKeyCreate { params, .. } => { + self.spawn_device_key_request( + request_id.clone(), + "device/key/create", device_key_requests_allowed, + move |device_key_api| async move { device_key_api.create(params).await }, ); + Ok(None) } - ClientRequest::DeviceKeyPublic { request_id, params } => { - self.handle_device_key_public( - request_id_for_connection(request_id), - params, + ClientRequest::DeviceKeyPublic { params, .. } => { + self.spawn_device_key_request( + request_id.clone(), + "device/key/public", device_key_requests_allowed, + move |device_key_api| async move { device_key_api.public(params).await }, ); + Ok(None) } - ClientRequest::DeviceKeySign { request_id, params } => { - self.handle_device_key_sign( - request_id_for_connection(request_id), - params, + ClientRequest::DeviceKeySign { params, .. } => { + self.spawn_device_key_request( + request_id.clone(), + "device/key/sign", device_key_requests_allowed, + move |device_key_api| async move { device_key_api.sign(params).await }, ); + Ok(None) } - ClientRequest::FsReadFile { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.read_file(params).await, - ) - .await; - } - ClientRequest::FsWriteFile { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.write_file(params).await, - ) - .await; - } - ClientRequest::FsCreateDirectory { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.create_directory(params).await, - ) - .await; - } - ClientRequest::FsGetMetadata { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.get_metadata(params).await, - ) - .await; - } - ClientRequest::FsReadDirectory { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.read_directory(params).await, - ) - .await; - } - ClientRequest::FsRemove { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.remove(params).await, - ) - .await; - } - ClientRequest::FsCopy { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_api.copy(params).await, - ) - .await; - } - ClientRequest::FsWatch { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_watch_manager.watch(connection_id, params).await, - ) - .await; - } - ClientRequest::FsUnwatch { request_id, params } => { - self.outgoing - .send_result( - request_id_for_connection(request_id), - self.fs_watch_manager.unwatch(connection_id, params).await, - ) - .await; - } - ClientRequest::ModelProviderCapabilitiesRead { - request_id, - params: _, - } => { - self.handle_model_provider_capabilities_read(request_id_for_connection(request_id)) - .await; - } + ClientRequest::FsReadFile { params, .. } => self + .fs_api + .read_file(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsWriteFile { params, .. } => self + .fs_api + .write_file(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsCreateDirectory { params, .. } => self + .fs_api + .create_directory(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsGetMetadata { params, .. } => self + .fs_api + .get_metadata(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsReadDirectory { params, .. } => self + .fs_api + .read_directory(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsRemove { params, .. } => self + .fs_api + .remove(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsCopy { params, .. } => self + .fs_api + .copy(params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsWatch { params, .. } => self + .fs_watch_manager + .watch(connection_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::FsUnwatch { params, .. } => self + .fs_watch_manager + .unwatch(connection_id, params) + .await + .map(|response| Some(response.into())), + ClientRequest::ModelProviderCapabilitiesRead { params: _, .. } => self + .handle_model_provider_capabilities_read() + .await + .map(|response| Some(response.into())), other => { // Box the delegated future so this wrapper's async state machine does not // inline the full `CodexMessageProcessor::process_request` future, which @@ -961,78 +939,38 @@ impl MessageProcessor { ) .boxed() .await; + Ok(None) + } + }; + + match result { + Ok(Some(response)) => { + self.outgoing + .send_response_as(request_id.clone(), response) + .await; + } + Ok(None) => {} + Err(error) => { + self.outgoing.send_error(request_id.clone(), error).await; } } Ok(()) } - async fn handle_model_provider_capabilities_read(&self, request_id: ConnectionRequestId) { - let result = async { - let config = self - .config_api - .load_latest_config(/*fallback_cwd*/ None) - .await?; - let provider = create_model_provider(config.model_provider, /*auth_manager*/ None); - let capabilities = provider.capabilities(); - Ok::<_, JSONRPCErrorError>(ModelProviderCapabilitiesReadResponse { - namespace_tools: capabilities.namespace_tools, - image_generation: capabilities.image_generation, - web_search: capabilities.web_search, - }) - } - .await; - self.outgoing.send_result(request_id, result).await; - } - - async fn handle_config_value_write( + async fn handle_model_provider_capabilities_read( &self, - request_id: ConnectionRequestId, - params: ConfigValueWriteParams, - ) { - let result = self.config_api.write_value(params).await; - self.handle_config_mutation_result( - request_id, - result, - ClientResponsePayload::ConfigValueWrite, - ) - .await - } - - async fn handle_config_batch_write( - &self, - request_id: ConnectionRequestId, - params: ConfigBatchWriteParams, - ) { - let result = self.config_api.batch_write(params).await; - self.handle_config_mutation_result( - request_id, - result, - ClientResponsePayload::ConfigBatchWrite, - ) - .await; - } - - async fn handle_experimental_feature_enablement_set( - &self, - request_id: ConnectionRequestId, - params: ExperimentalFeatureEnablementSetParams, - ) { - let should_refresh_apps_list = params.enablement.get("apps").copied() == Some(true); - let result = self + ) -> Result { + let config = self .config_api - .set_experimental_feature_enablement(params) - .await; - let is_ok = result.is_ok(); - self.handle_config_mutation_result( - request_id, - result, - ClientResponsePayload::ExperimentalFeatureEnablementSet, - ) - .await; - if should_refresh_apps_list && is_ok { - self.refresh_apps_list_after_experimental_feature_enablement_set() - .await; - } + .load_latest_config(/*fallback_cwd*/ None) + .await?; + let provider = create_model_provider(config.model_provider, /*auth_manager*/ None); + let capabilities = provider.capabilities(); + Ok(ModelProviderCapabilitiesReadResponse { + namespace_tools: capabilities.namespace_tools, + image_generation: capabilities.image_generation, + web_search: capabilities.web_search, + }) } async fn refresh_apps_list_after_experimental_feature_enablement_set(&self) { @@ -1106,19 +1044,11 @@ impl MessageProcessor { async fn handle_config_mutation_result( &self, - request_id: ConnectionRequestId, result: std::result::Result, - wrap_success: impl FnOnce(T) -> ClientResponsePayload, - ) { - match result { - Ok(response) => { - self.handle_config_mutation().await; - self.outgoing - .send_response_as(request_id, wrap_success(response)) - .await; - } - Err(error) => self.outgoing.send_error(request_id, error).await, - } + ) -> Result { + let response = result?; + self.handle_config_mutation().await; + Ok(response) } async fn handle_config_mutation(&self) { @@ -1144,48 +1074,6 @@ impl MessageProcessor { } } - fn handle_device_key_create( - &self, - request_id: ConnectionRequestId, - params: DeviceKeyCreateParams, - device_key_requests_allowed: bool, - ) { - self.spawn_device_key_request( - request_id, - "device/key/create", - device_key_requests_allowed, - move |device_key_api| async move { device_key_api.create(params).await }, - ); - } - - fn handle_device_key_public( - &self, - request_id: ConnectionRequestId, - params: DeviceKeyPublicParams, - device_key_requests_allowed: bool, - ) { - self.spawn_device_key_request( - request_id, - "device/key/public", - device_key_requests_allowed, - move |device_key_api| async move { device_key_api.public(params).await }, - ); - } - - fn handle_device_key_sign( - &self, - request_id: ConnectionRequestId, - params: DeviceKeySignParams, - device_key_requests_allowed: bool, - ) { - self.spawn_device_key_request( - request_id, - "device/key/sign", - device_key_requests_allowed, - move |device_key_api| async move { device_key_api.sign(params).await }, - ); - } - fn spawn_device_key_request( &self, request_id: ConnectionRequestId, @@ -1200,15 +1088,13 @@ impl MessageProcessor { let device_key_api = self.device_key_api.clone(); let outgoing = Arc::clone(&self.outgoing); tokio::spawn(async move { - let result = async { - if !device_key_requests_allowed { - return Err(invalid_request(format!( - "{method} is not available over remote transports" - ))); - } + let result = if !device_key_requests_allowed { + Err(invalid_request(format!( + "{method} is not available over remote transports" + ))) + } else { run_request(device_key_api).await - } - .await; + }; outgoing.send_result(request_id, result).await; }); }