diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index ed13b346aa..00f3635f3d 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1,4 +1,3 @@ -use crate::codex_message_processor::ApiVersion; use crate::codex_message_processor::read_rollout_items_from_rollout; use crate::codex_message_processor::read_summary_from_rollout; use crate::codex_message_processor::summary_to_thread; @@ -16,8 +15,6 @@ use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile; use codex_app_server_protocol::AgentMessageDeltaNotification; -use codex_app_server_protocol::ApplyPatchApprovalParams; -use codex_app_server_protocol::ApplyPatchApprovalResponse; use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo; use codex_app_server_protocol::CollabAgentState as V2CollabAgentStatus; use codex_app_server_protocol::CollabAgentTool; @@ -29,14 +26,11 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionSource; use codex_app_server_protocol::CommandExecutionStatus; -use codex_app_server_protocol::ContextCompactedNotification; use codex_app_server_protocol::DeprecationNoticeNotification; use codex_app_server_protocol::DynamicToolCallOutputContentItem; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::DynamicToolCallStatus; use codex_app_server_protocol::ErrorNotification; -use codex_app_server_protocol::ExecCommandApprovalParams; -use codex_app_server_protocol::ExecCommandApprovalResponse; use codex_app_server_protocol::ExecPolicyAmendment as V2ExecPolicyAmendment; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeOutputDeltaNotification; @@ -48,7 +42,6 @@ use codex_app_server_protocol::GrantedPermissionProfile as V2GrantedPermissionPr use codex_app_server_protocol::GuardianWarningNotification; use codex_app_server_protocol::HookCompletedNotification; use codex_app_server_protocol::HookStartedNotification; -use codex_app_server_protocol::InterruptConversationResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::McpServerElicitationAction; @@ -120,7 +113,6 @@ use codex_core::review_format::format_review_findings_block; use codex_core::review_prompts; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem; -use codex_protocol::dynamic_tools::DynamicToolResponse as CoreDynamicToolResponse; use codex_protocol::items::parse_hook_prompt_message; use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile; use codex_protocol::plan_tool::UpdatePlanArgs; @@ -179,7 +171,6 @@ pub(crate) async fn apply_bespoke_event_handling( thread_state: Arc>, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, - api_version: ApiVersion, fallback_model_provider: String, codex_home: &Path, ) { @@ -194,36 +185,34 @@ pub(crate) async fn apply_bespoke_event_handling( thread_watch_manager .note_turn_started(&conversation_id.to_string()) .await; - if let ApiVersion::V2 = api_version { - let turn = { - let state = thread_state.lock().await; - state.active_turn_snapshot().unwrap_or_else(|| Turn { - id: payload.turn_id.clone(), - items: Vec::new(), - error: None, - status: TurnStatus::InProgress, - started_at: payload.started_at, - completed_at: None, - duration_ms: None, - }) - }; - let notification = TurnStartedNotification { - thread_id: conversation_id.to_string(), - turn, - }; - if let Some(analytics_events_client) = analytics_events_client.as_ref() { - analytics_events_client - .track_notification(ServerNotification::TurnStarted(notification.clone())); - } - outgoing - .send_server_notification(ServerNotification::TurnStarted(notification)) - .await; + let turn = { + let state = thread_state.lock().await; + state.active_turn_snapshot().unwrap_or_else(|| Turn { + id: payload.turn_id.clone(), + items: Vec::new(), + error: None, + status: TurnStatus::InProgress, + started_at: payload.started_at, + completed_at: None, + duration_ms: None, + }) + }; + let notification = TurnStartedNotification { + thread_id: conversation_id.to_string(), + turn, + }; + if let Some(analytics_events_client) = analytics_events_client.as_ref() { + analytics_events_client + .track_notification(ServerNotification::TurnStarted(notification.clone())); } + outgoing + .send_server_notification(ServerNotification::TurnStarted(notification)) + .await; } EventMsg::TurnComplete(turn_complete_event) => { // All per-thread requests are bound to a turn, so abort them. outgoing.abort_pending_server_requests().await; - respond_to_pending_interrupts(&thread_state, &outgoing, /*abort_reason*/ None).await; + respond_to_pending_interrupts(&thread_state, &outgoing).await; let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some(); thread_watch_manager .note_turn_completed(&conversation_id.to_string(), turn_failed) @@ -239,435 +228,377 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::SkillsUpdateAvailable => { - if let ApiVersion::V2 = api_version { - outgoing - .send_server_notification(ServerNotification::SkillsChanged( - SkillsChangedNotification {}, - )) - .await; - } + outgoing + .send_server_notification(ServerNotification::SkillsChanged( + SkillsChangedNotification {}, + )) + .await; } EventMsg::McpStartupUpdate(update) => { - if let ApiVersion::V2 = api_version { - let (status, error) = match update.status { - codex_protocol::protocol::McpStartupStatus::Starting => { - (McpServerStartupState::Starting, None) - } - codex_protocol::protocol::McpStartupStatus::Ready => { - (McpServerStartupState::Ready, None) - } - codex_protocol::protocol::McpStartupStatus::Failed { error } => { - (McpServerStartupState::Failed, Some(error)) - } - codex_protocol::protocol::McpStartupStatus::Cancelled => { - (McpServerStartupState::Cancelled, None) - } - }; - let notification = McpServerStatusUpdatedNotification { - name: update.server, - status, - error, - }; - outgoing - .send_server_notification(ServerNotification::McpServerStatusUpdated( - notification, - )) - .await; - } + let (status, error) = match update.status { + codex_protocol::protocol::McpStartupStatus::Starting => { + (McpServerStartupState::Starting, None) + } + codex_protocol::protocol::McpStartupStatus::Ready => { + (McpServerStartupState::Ready, None) + } + codex_protocol::protocol::McpStartupStatus::Failed { error } => { + (McpServerStartupState::Failed, Some(error)) + } + codex_protocol::protocol::McpStartupStatus::Cancelled => { + (McpServerStartupState::Cancelled, None) + } + }; + let notification = McpServerStatusUpdatedNotification { + name: update.server, + status, + error, + }; + outgoing + .send_server_notification(ServerNotification::McpServerStatusUpdated(notification)) + .await; } EventMsg::Warning(warning_event) => { - if let ApiVersion::V2 = api_version { - let notification = WarningNotification { - thread_id: Some(conversation_id.to_string()), - message: warning_event.message, - }; - if let Some(analytics_events_client) = analytics_events_client.as_ref() { - analytics_events_client - .track_notification(ServerNotification::Warning(notification.clone())); - } - outgoing - .send_server_notification(ServerNotification::Warning(notification)) - .await; + let notification = WarningNotification { + thread_id: Some(conversation_id.to_string()), + message: warning_event.message, + }; + if let Some(analytics_events_client) = analytics_events_client.as_ref() { + analytics_events_client + .track_notification(ServerNotification::Warning(notification.clone())); } + outgoing + .send_server_notification(ServerNotification::Warning(notification)) + .await; } EventMsg::GuardianWarning(warning_event) => { - if let ApiVersion::V2 = api_version { - let notification = GuardianWarningNotification { - thread_id: conversation_id.to_string(), - message: warning_event.message, - }; - if let Some(analytics_events_client) = analytics_events_client.as_ref() { - analytics_events_client.track_notification( - ServerNotification::GuardianWarning(notification.clone()), - ); - } - outgoing - .send_server_notification(ServerNotification::GuardianWarning(notification)) - .await; + let notification = GuardianWarningNotification { + thread_id: conversation_id.to_string(), + message: warning_event.message, + }; + if let Some(analytics_events_client) = analytics_events_client.as_ref() { + analytics_events_client + .track_notification(ServerNotification::GuardianWarning(notification.clone())); } + outgoing + .send_server_notification(ServerNotification::GuardianWarning(notification)) + .await; } EventMsg::GuardianAssessment(assessment) => { - if let ApiVersion::V2 = api_version { - let pending_command_execution = match build_item_from_guardian_event( - &assessment, - CommandExecutionStatus::InProgress, - ) { - Some(ThreadItem::CommandExecution { - id, + let pending_command_execution = match build_item_from_guardian_event( + &assessment, + CommandExecutionStatus::InProgress, + ) { + Some(ThreadItem::CommandExecution { + id, + command, + cwd, + command_actions, + .. + }) => Some(( + id, + CommandExecutionCompletionItem { command, cwd, command_actions, - .. - }) => Some(( - id, - CommandExecutionCompletionItem { - command, - cwd, - command_actions, - }, - )), - Some(_) | None => None, - }; - let assessment_turn_id = if assessment.turn_id.is_empty() { - event_turn_id.clone() - } else { - assessment.turn_id.clone() - }; - if assessment.status - == codex_protocol::protocol::GuardianAssessmentStatus::InProgress - && let Some((target_item_id, completion_item)) = - pending_command_execution.as_ref() - { - start_command_execution_item( - &conversation_id, - assessment_turn_id.clone(), - target_item_id.clone(), - completion_item.command.clone(), - completion_item.cwd.clone(), - completion_item.command_actions.clone(), - CommandExecutionSource::Agent, - &outgoing, - &thread_state, - ) - .await; - } - let notification = guardian_auto_approval_review_notification( + }, + )), + Some(_) | None => None, + }; + let assessment_turn_id = if assessment.turn_id.is_empty() { + event_turn_id.clone() + } else { + assessment.turn_id.clone() + }; + if assessment.status == codex_protocol::protocol::GuardianAssessmentStatus::InProgress + && let Some((target_item_id, completion_item)) = pending_command_execution.as_ref() + { + start_command_execution_item( &conversation_id, - &event_turn_id, - &assessment, - ); - outgoing.send_server_notification(notification).await; - let completion_status = match assessment.status { - codex_protocol::protocol::GuardianAssessmentStatus::Denied - | codex_protocol::protocol::GuardianAssessmentStatus::Aborted => { - Some(CommandExecutionStatus::Declined) - } - codex_protocol::protocol::GuardianAssessmentStatus::TimedOut => { - Some(CommandExecutionStatus::Failed) - } - codex_protocol::protocol::GuardianAssessmentStatus::InProgress - | codex_protocol::protocol::GuardianAssessmentStatus::Approved => None, - }; - if let Some(completion_status) = completion_status - && let Some((target_item_id, completion_item)) = pending_command_execution - { - complete_command_execution_item( - &conversation_id, - assessment_turn_id, - target_item_id, - completion_item.command, - completion_item.cwd, - /*process_id*/ None, - CommandExecutionSource::Agent, - completion_item.command_actions, - completion_status, - &outgoing, - &thread_state, - ) - .await; + assessment_turn_id.clone(), + target_item_id.clone(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; + } + let notification = guardian_auto_approval_review_notification( + &conversation_id, + &event_turn_id, + &assessment, + ); + outgoing.send_server_notification(notification).await; + let completion_status = match assessment.status { + codex_protocol::protocol::GuardianAssessmentStatus::Denied + | codex_protocol::protocol::GuardianAssessmentStatus::Aborted => { + Some(CommandExecutionStatus::Declined) } + codex_protocol::protocol::GuardianAssessmentStatus::TimedOut => { + Some(CommandExecutionStatus::Failed) + } + codex_protocol::protocol::GuardianAssessmentStatus::InProgress + | codex_protocol::protocol::GuardianAssessmentStatus::Approved => None, + }; + if let Some(completion_status) = completion_status + && let Some((target_item_id, completion_item)) = pending_command_execution + { + complete_command_execution_item( + &conversation_id, + assessment_turn_id, + target_item_id, + completion_item.command, + completion_item.cwd, + /*process_id*/ None, + CommandExecutionSource::Agent, + completion_item.command_actions, + completion_status, + &outgoing, + &thread_state, + ) + .await; } } EventMsg::ModelReroute(event) => { - if let ApiVersion::V2 = api_version { - let notification = ModelReroutedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), - from_model: event.from_model, - to_model: event.to_model, - reason: event.reason.into(), - }; - outgoing - .send_server_notification(ServerNotification::ModelRerouted(notification)) - .await; - } + let notification = ModelReroutedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + from_model: event.from_model, + to_model: event.to_model, + reason: event.reason.into(), + }; + outgoing + .send_server_notification(ServerNotification::ModelRerouted(notification)) + .await; } EventMsg::ModelVerification(event) => { - if let ApiVersion::V2 = api_version { - let notification = ModelVerificationNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), - verifications: event.verifications.into_iter().map(Into::into).collect(), - }; - outgoing - .send_server_notification(ServerNotification::ModelVerification(notification)) - .await; - } + let notification = ModelVerificationNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + verifications: event.verifications.into_iter().map(Into::into).collect(), + }; + outgoing + .send_server_notification(ServerNotification::ModelVerification(notification)) + .await; } EventMsg::RealtimeConversationStarted(event) => { - if let ApiVersion::V2 = api_version { - let notification = ThreadRealtimeStartedNotification { - thread_id: conversation_id.to_string(), - session_id: event.session_id, - version: event.version, - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeStarted( - notification, - )) - .await; - } + let notification = ThreadRealtimeStartedNotification { + thread_id: conversation_id.to_string(), + session_id: event.session_id, + version: event.version, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeStarted(notification)) + .await; } EventMsg::RealtimeConversationSdp(event) => { - if let ApiVersion::V2 = api_version { - let notification = ThreadRealtimeSdpNotification { + let notification = ThreadRealtimeSdpNotification { + thread_id: conversation_id.to_string(), + sdp: event.sdp, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeSdp(notification)) + .await; + } + EventMsg::RealtimeConversationRealtime(event) => match event.payload { + RealtimeEvent::SessionUpdated { .. } => {} + RealtimeEvent::InputAudioSpeechStarted(event) => { + let notification = ThreadRealtimeItemAddedNotification { thread_id: conversation_id.to_string(), - sdp: event.sdp, + item: serde_json::json!({ + "type": "input_audio_buffer.speech_started", + "item_id": event.item_id, + }), }; outgoing - .send_server_notification(ServerNotification::ThreadRealtimeSdp(notification)) - .await; - } - } - EventMsg::RealtimeConversationRealtime(event) => { - if let ApiVersion::V2 = api_version { - match event.payload { - RealtimeEvent::SessionUpdated { .. } => {} - RealtimeEvent::InputAudioSpeechStarted(event) => { - let notification = ThreadRealtimeItemAddedNotification { - thread_id: conversation_id.to_string(), - item: serde_json::json!({ - "type": "input_audio_buffer.speech_started", - "item_id": event.item_id, - }), - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( - notification, - )) - .await; - } - RealtimeEvent::InputTranscriptDelta(event) => { - let notification = ThreadRealtimeTranscriptDeltaNotification { - thread_id: conversation_id.to_string(), - role: "user".to_string(), - delta: event.delta, - }; - outgoing - .send_server_notification( - ServerNotification::ThreadRealtimeTranscriptDelta(notification), - ) - .await; - } - RealtimeEvent::InputTranscriptDone(event) => { - let notification = ThreadRealtimeTranscriptDoneNotification { - thread_id: conversation_id.to_string(), - role: "user".to_string(), - text: event.text, - }; - outgoing - .send_server_notification( - ServerNotification::ThreadRealtimeTranscriptDone(notification), - ) - .await; - } - RealtimeEvent::OutputTranscriptDelta(event) => { - let notification = ThreadRealtimeTranscriptDeltaNotification { - thread_id: conversation_id.to_string(), - role: "assistant".to_string(), - delta: event.delta, - }; - outgoing - .send_server_notification( - ServerNotification::ThreadRealtimeTranscriptDelta(notification), - ) - .await; - } - RealtimeEvent::OutputTranscriptDone(event) => { - let notification = ThreadRealtimeTranscriptDoneNotification { - thread_id: conversation_id.to_string(), - role: "assistant".to_string(), - text: event.text, - }; - outgoing - .send_server_notification( - ServerNotification::ThreadRealtimeTranscriptDone(notification), - ) - .await; - } - RealtimeEvent::AudioOut(audio) => { - let notification = ThreadRealtimeOutputAudioDeltaNotification { - thread_id: conversation_id.to_string(), - audio: audio.into(), - }; - outgoing - .send_server_notification( - ServerNotification::ThreadRealtimeOutputAudioDelta(notification), - ) - .await; - } - RealtimeEvent::ResponseCreated(_) => {} - RealtimeEvent::ResponseCancelled(event) => { - let notification = ThreadRealtimeItemAddedNotification { - thread_id: conversation_id.to_string(), - item: serde_json::json!({ - "type": "response.cancelled", - "response_id": event.response_id, - }), - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( - notification, - )) - .await; - } - RealtimeEvent::ResponseDone(_) => {} - RealtimeEvent::ConversationItemAdded(item) => { - let notification = ThreadRealtimeItemAddedNotification { - thread_id: conversation_id.to_string(), - item, - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( - notification, - )) - .await; - } - RealtimeEvent::ConversationItemDone { .. } - | RealtimeEvent::NoopRequested(_) => {} - RealtimeEvent::HandoffRequested(handoff) => { - let notification = ThreadRealtimeItemAddedNotification { - thread_id: conversation_id.to_string(), - item: serde_json::json!({ - "type": "handoff_request", - "handoff_id": handoff.handoff_id, - "item_id": handoff.item_id, - "input_transcript": handoff.input_transcript, - "active_transcript": handoff.active_transcript, - }), - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( - notification, - )) - .await; - } - RealtimeEvent::Error(message) => { - let notification = ThreadRealtimeErrorNotification { - thread_id: conversation_id.to_string(), - message, - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeError( - notification, - )) - .await; - } - } - } - } - EventMsg::RealtimeConversationClosed(event) => { - if let ApiVersion::V2 = api_version { - let notification = ThreadRealtimeClosedNotification { - thread_id: conversation_id.to_string(), - reason: event.reason, - }; - outgoing - .send_server_notification(ServerNotification::ThreadRealtimeClosed( + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( notification, )) .await; } + RealtimeEvent::InputTranscriptDelta(event) => { + let notification = ThreadRealtimeTranscriptDeltaNotification { + thread_id: conversation_id.to_string(), + role: "user".to_string(), + delta: event.delta, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeTranscriptDelta( + notification, + )) + .await; + } + RealtimeEvent::InputTranscriptDone(event) => { + let notification = ThreadRealtimeTranscriptDoneNotification { + thread_id: conversation_id.to_string(), + role: "user".to_string(), + text: event.text, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeTranscriptDone( + notification, + )) + .await; + } + RealtimeEvent::OutputTranscriptDelta(event) => { + let notification = ThreadRealtimeTranscriptDeltaNotification { + thread_id: conversation_id.to_string(), + role: "assistant".to_string(), + delta: event.delta, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeTranscriptDelta( + notification, + )) + .await; + } + RealtimeEvent::OutputTranscriptDone(event) => { + let notification = ThreadRealtimeTranscriptDoneNotification { + thread_id: conversation_id.to_string(), + role: "assistant".to_string(), + text: event.text, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeTranscriptDone( + notification, + )) + .await; + } + RealtimeEvent::AudioOut(audio) => { + let notification = ThreadRealtimeOutputAudioDeltaNotification { + thread_id: conversation_id.to_string(), + audio: audio.into(), + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeOutputAudioDelta( + notification, + )) + .await; + } + RealtimeEvent::ResponseCreated(_) => {} + RealtimeEvent::ResponseCancelled(event) => { + let notification = ThreadRealtimeItemAddedNotification { + thread_id: conversation_id.to_string(), + item: serde_json::json!({ + "type": "response.cancelled", + "response_id": event.response_id, + }), + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( + notification, + )) + .await; + } + RealtimeEvent::ResponseDone(_) => {} + RealtimeEvent::ConversationItemAdded(item) => { + let notification = ThreadRealtimeItemAddedNotification { + thread_id: conversation_id.to_string(), + item, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( + notification, + )) + .await; + } + RealtimeEvent::ConversationItemDone { .. } | RealtimeEvent::NoopRequested(_) => {} + RealtimeEvent::HandoffRequested(handoff) => { + let notification = ThreadRealtimeItemAddedNotification { + thread_id: conversation_id.to_string(), + item: serde_json::json!({ + "type": "handoff_request", + "handoff_id": handoff.handoff_id, + "item_id": handoff.item_id, + "input_transcript": handoff.input_transcript, + "active_transcript": handoff.active_transcript, + }), + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( + notification, + )) + .await; + } + RealtimeEvent::Error(message) => { + let notification = ThreadRealtimeErrorNotification { + thread_id: conversation_id.to_string(), + message, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeError(notification)) + .await; + } + }, + EventMsg::RealtimeConversationClosed(event) => { + let notification = ThreadRealtimeClosedNotification { + thread_id: conversation_id.to_string(), + reason: event.reason, + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeClosed(notification)) + .await; } EventMsg::ApplyPatchApprovalRequest(event) => { let permission_guard = thread_watch_manager .note_permission_requested(&conversation_id.to_string()) .await; - match api_version { - ApiVersion::V1 => { - let params = ApplyPatchApprovalParams { - conversation_id, - call_id: event.call_id.clone(), - file_changes: event.changes.clone(), - reason: event.reason.clone(), - grant_root: event.grant_root.clone(), - }; - let (_pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::ApplyPatchApproval(params)) - .await; - let call_id = event.call_id.clone(); - tokio::spawn(async move { - let _permission_guard = permission_guard; - on_patch_approval_response(call_id, rx, conversation).await; - }); - } - ApiVersion::V2 => { - // Until we migrate the core to be aware of a first class FileChangeItem - // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. - let item_id = event.call_id.clone(); - let patch_changes = convert_patch_changes(&event.changes); - let first_start = { - let mut state = thread_state.lock().await; - state - .turn_summary - .file_change_started - .insert(item_id.clone()) - }; - if first_start { - let item = build_file_change_approval_request_item(&event); - let notification = ItemStartedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), - item, - }; - outgoing - .send_server_notification(ServerNotification::ItemStarted(notification)) - .await; - } - - let params = FileChangeRequestApprovalParams { - thread_id: conversation_id.to_string(), - turn_id: event.turn_id.clone(), - item_id: item_id.clone(), - reason: event.reason.clone(), - grant_root: event.grant_root.clone(), - }; - let (pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::FileChangeRequestApproval(params)) - .await; - tokio::spawn(async move { - on_file_change_request_approval_response( - event_turn_id, - conversation_id, - item_id, - patch_changes, - pending_request_id, - rx, - conversation, - outgoing, - thread_state.clone(), - permission_guard, - ) - .await; - }); - } + // Until we migrate the core to be aware of a first class FileChangeItem + // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. + let item_id = event.call_id.clone(); + let patch_changes = convert_patch_changes(&event.changes); + let first_start = { + let mut state = thread_state.lock().await; + state + .turn_summary + .file_change_started + .insert(item_id.clone()) + }; + if first_start { + let item = build_file_change_approval_request_item(&event); + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + item, + }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; } + + let params = FileChangeRequestApprovalParams { + thread_id: conversation_id.to_string(), + turn_id: event.turn_id.clone(), + item_id: item_id.clone(), + reason: event.reason.clone(), + grant_root: event.grant_root.clone(), + }; + let (pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::FileChangeRequestApproval(params)) + .await; + tokio::spawn(async move { + on_file_change_request_approval_response( + event_turn_id, + conversation_id, + item_id, + patch_changes, + pending_request_id, + rx, + conversation, + outgoing, + thread_state.clone(), + permission_guard, + ) + .await; + }); } EventMsg::ExecApprovalRequest(ev) => { let permission_guard = thread_watch_manager .note_permission_requested(&conversation_id.to_string()) .await; - let approval_id_for_op = ev.effective_approval_id(); let available_decisions = ev .effective_available_decisions() .into_iter() @@ -687,406 +618,314 @@ pub(crate) async fn apply_bespoke_event_handling( parsed_cmd, .. } = ev; - match api_version { - ApiVersion::V1 => { - let params = ExecCommandApprovalParams { - conversation_id, - call_id: call_id.clone(), - approval_id, - command, - cwd: cwd.to_path_buf(), - reason, - parsed_cmd, - }; - let (_pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::ExecCommandApproval(params)) - .await; - tokio::spawn(async move { - let _permission_guard = permission_guard; - on_exec_approval_response( - approval_id_for_op, - event_turn_id, - rx, - conversation, - ) - .await; - }); - } - ApiVersion::V2 => { - let command_actions = parsed_cmd - .iter() - .cloned() - .map(|parsed| V2ParsedCommand::from_core_with_cwd(parsed, &cwd)) - .collect::>(); - let presentation = if let Some(network_approval_context) = - network_approval_context.map(V2NetworkApprovalContext::from) - { - CommandExecutionApprovalPresentation::Network(network_approval_context) - } else { - let command_string = shlex_join(&command); - let completion_item = CommandExecutionCompletionItem { - command: command_string, - cwd: cwd.clone(), - command_actions: command_actions.clone(), - }; - CommandExecutionApprovalPresentation::Command(completion_item) - }; - let (network_approval_context, command, cwd, command_actions, completion_item) = - match presentation { - CommandExecutionApprovalPresentation::Network( - network_approval_context, - ) => (Some(network_approval_context), None, None, None, None), - CommandExecutionApprovalPresentation::Command(completion_item) => ( - None, - Some(completion_item.command.clone()), - Some(completion_item.cwd.clone()), - Some(completion_item.command_actions.clone()), - Some(completion_item), - ), - }; - if approval_id.is_none() - && let Some(completion_item) = completion_item.as_ref() - { - start_command_execution_item( - &conversation_id, - event_turn_id.clone(), - call_id.clone(), - completion_item.command.clone(), - completion_item.cwd.clone(), - completion_item.command_actions.clone(), - CommandExecutionSource::Agent, - &outgoing, - &thread_state, - ) - .await; + let command_actions = parsed_cmd + .iter() + .cloned() + .map(|parsed| V2ParsedCommand::from_core_with_cwd(parsed, &cwd)) + .collect::>(); + let presentation = if let Some(network_approval_context) = + network_approval_context.map(V2NetworkApprovalContext::from) + { + CommandExecutionApprovalPresentation::Network(network_approval_context) + } else { + let command_string = shlex_join(&command); + let completion_item = CommandExecutionCompletionItem { + command: command_string, + cwd: cwd.clone(), + command_actions: command_actions.clone(), + }; + CommandExecutionApprovalPresentation::Command(completion_item) + }; + let (network_approval_context, command, cwd, command_actions, completion_item) = + match presentation { + CommandExecutionApprovalPresentation::Network(network_approval_context) => { + (Some(network_approval_context), None, None, None, None) } - let proposed_execpolicy_amendment_v2 = - proposed_execpolicy_amendment.map(V2ExecPolicyAmendment::from); - let proposed_network_policy_amendments_v2 = proposed_network_policy_amendments - .map(|amendments| { - amendments - .into_iter() - .map(V2NetworkPolicyAmendment::from) - .collect() - }); - let additional_permissions = - additional_permissions.map(V2AdditionalPermissionProfile::from); - - let params = CommandExecutionRequestApprovalParams { - thread_id: conversation_id.to_string(), - turn_id: turn_id.clone(), - item_id: call_id.clone(), - approval_id: approval_id.clone(), - reason, - network_approval_context, - command, - cwd, - command_actions, - additional_permissions, - proposed_execpolicy_amendment: proposed_execpolicy_amendment_v2, - proposed_network_policy_amendments: proposed_network_policy_amendments_v2, - available_decisions: Some(available_decisions), - }; - let (pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::CommandExecutionRequestApproval( - params, - )) - .await; - tokio::spawn(async move { - on_command_execution_request_approval_response( - event_turn_id, - conversation_id, - approval_id, - call_id, - completion_item, - pending_request_id, - rx, - conversation, - outgoing, - thread_state.clone(), - permission_guard, - ) - .await; - }); - } + CommandExecutionApprovalPresentation::Command(completion_item) => ( + None, + Some(completion_item.command.clone()), + Some(completion_item.cwd.clone()), + Some(completion_item.command_actions.clone()), + Some(completion_item), + ), + }; + if approval_id.is_none() + && let Some(completion_item) = completion_item.as_ref() + { + start_command_execution_item( + &conversation_id, + event_turn_id.clone(), + call_id.clone(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; } + let proposed_execpolicy_amendment_v2 = + proposed_execpolicy_amendment.map(V2ExecPolicyAmendment::from); + let proposed_network_policy_amendments_v2 = + proposed_network_policy_amendments.map(|amendments| { + amendments + .into_iter() + .map(V2NetworkPolicyAmendment::from) + .collect() + }); + let additional_permissions = + additional_permissions.map(V2AdditionalPermissionProfile::from); + + let params = CommandExecutionRequestApprovalParams { + thread_id: conversation_id.to_string(), + turn_id: turn_id.clone(), + item_id: call_id.clone(), + approval_id: approval_id.clone(), + reason, + network_approval_context, + command, + cwd, + command_actions, + additional_permissions, + proposed_execpolicy_amendment: proposed_execpolicy_amendment_v2, + proposed_network_policy_amendments: proposed_network_policy_amendments_v2, + available_decisions: Some(available_decisions), + }; + let (pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::CommandExecutionRequestApproval( + params, + )) + .await; + tokio::spawn(async move { + on_command_execution_request_approval_response( + event_turn_id, + conversation_id, + approval_id, + call_id, + completion_item, + pending_request_id, + rx, + conversation, + outgoing, + thread_state.clone(), + permission_guard, + ) + .await; + }); } EventMsg::RequestUserInput(request) => { - if matches!(api_version, ApiVersion::V2) { - let user_input_guard = thread_watch_manager - .note_user_input_requested(&conversation_id.to_string()) - .await; - let questions = request - .questions - .into_iter() - .map(|question| ToolRequestUserInputQuestion { - id: question.id, - header: question.header, - question: question.question, - is_other: question.is_other, - is_secret: question.is_secret, - options: question.options.map(|options| { - options - .into_iter() - .map(|option| ToolRequestUserInputOption { - label: option.label, - description: option.description, - }) - .collect() - }), - }) - .collect(); - let params = ToolRequestUserInputParams { - thread_id: conversation_id.to_string(), - turn_id: request.turn_id, - item_id: request.call_id, - questions, - }; - let (pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::ToolRequestUserInput(params)) - .await; - tokio::spawn(async move { - on_request_user_input_response( - event_turn_id, - pending_request_id, - rx, - conversation, - thread_state, - user_input_guard, - ) - .await; - }); - } else { - error!( - "request_user_input is only supported on api v2 (call_id: {})", - request.call_id - ); - let empty = CoreRequestUserInputResponse { - answers: HashMap::new(), - }; - if let Err(err) = conversation - .submit(Op::UserInputAnswer { - id: event_turn_id, - response: empty, - }) - .await - { - error!("failed to submit UserInputAnswer: {err}"); - } - } + let user_input_guard = thread_watch_manager + .note_user_input_requested(&conversation_id.to_string()) + .await; + let questions = request + .questions + .into_iter() + .map(|question| ToolRequestUserInputQuestion { + id: question.id, + header: question.header, + question: question.question, + is_other: question.is_other, + is_secret: question.is_secret, + options: question.options.map(|options| { + options + .into_iter() + .map(|option| ToolRequestUserInputOption { + label: option.label, + description: option.description, + }) + .collect() + }), + }) + .collect(); + let params = ToolRequestUserInputParams { + thread_id: conversation_id.to_string(), + turn_id: request.turn_id, + item_id: request.call_id, + questions, + }; + let (pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::ToolRequestUserInput(params)) + .await; + tokio::spawn(async move { + on_request_user_input_response( + event_turn_id, + pending_request_id, + rx, + conversation, + thread_state, + user_input_guard, + ) + .await; + }); } EventMsg::ElicitationRequest(request) => { - if matches!(api_version, ApiVersion::V2) { - let permission_guard = thread_watch_manager - .note_permission_requested(&conversation_id.to_string()) - .await; - let turn_id = match request.turn_id.clone() { - Some(turn_id) => Some(turn_id), - None => { - let state = thread_state.lock().await; - state.active_turn_snapshot().map(|turn| turn.id) + let permission_guard = thread_watch_manager + .note_permission_requested(&conversation_id.to_string()) + .await; + let turn_id = match request.turn_id.clone() { + Some(turn_id) => Some(turn_id), + None => { + let state = thread_state.lock().await; + state.active_turn_snapshot().map(|turn| turn.id) + } + }; + let server_name = request.server_name.clone(); + let request_body = match request.request.try_into() { + Ok(request_body) => request_body, + Err(err) => { + error!( + error = %err, + server_name, + request_id = ?request.id, + "failed to parse typed MCP elicitation schema" + ); + if let Err(err) = conversation + .submit(Op::ResolveElicitation { + server_name: request.server_name, + request_id: request.id, + decision: codex_protocol::approvals::ElicitationAction::Cancel, + content: None, + meta: None, + }) + .await + { + error!("failed to submit ResolveElicitation: {err}"); } - }; - let server_name = request.server_name.clone(); - let request_body = match request.request.try_into() { - Ok(request_body) => request_body, - Err(err) => { - error!( - error = %err, - server_name, - request_id = ?request.id, - "failed to parse typed MCP elicitation schema" - ); - if let Err(err) = conversation - .submit(Op::ResolveElicitation { - server_name: request.server_name, - request_id: request.id, - decision: codex_protocol::approvals::ElicitationAction::Cancel, - content: None, - meta: None, - }) - .await - { - error!("failed to submit ResolveElicitation: {err}"); - } - return; - } - }; - let params = McpServerElicitationRequestParams { - thread_id: conversation_id.to_string(), - turn_id, - server_name: request.server_name.clone(), - request: request_body, - }; - let (pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::McpServerElicitationRequest(params)) - .await; - tokio::spawn(async move { - on_mcp_server_elicitation_response( - request.server_name, - request.id, - pending_request_id, - rx, - conversation, - thread_state, - permission_guard, - ) - .await; - }); - } + return; + } + }; + let params = McpServerElicitationRequestParams { + thread_id: conversation_id.to_string(), + turn_id, + server_name: request.server_name.clone(), + request: request_body, + }; + let (pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::McpServerElicitationRequest(params)) + .await; + tokio::spawn(async move { + on_mcp_server_elicitation_response( + request.server_name, + request.id, + pending_request_id, + rx, + conversation, + thread_state, + permission_guard, + ) + .await; + }); } EventMsg::RequestPermissions(request) => { - if matches!(api_version, ApiVersion::V2) { - let permission_guard = thread_watch_manager - .note_permission_requested(&conversation_id.to_string()) - .await; - let requested_permissions = request.permissions.clone(); - let request_cwd = match request.cwd.clone() { - Some(cwd) => cwd, - None => conversation.config_snapshot().await.cwd, - }; - let params = PermissionsRequestApprovalParams { - thread_id: conversation_id.to_string(), - turn_id: request.turn_id.clone(), - item_id: request.call_id.clone(), - cwd: request_cwd.clone(), - reason: request.reason, - permissions: request.permissions.into(), - }; - let (pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::PermissionsRequestApproval(params)) - .await; - let pending_response = PendingRequestPermissionsResponse { - call_id: request.call_id, - requested_permissions, - request_cwd, - pending_request_id, - receiver: rx, - request_permissions_guard: permission_guard, - }; - tokio::spawn(async move { - on_request_permissions_response(pending_response, conversation, thread_state) - .await; - }); - } else { - error!( - "request_permissions is only supported on api v2 (call_id: {})", - request.call_id - ); - let empty = CoreRequestPermissionsResponse { - permissions: Default::default(), - scope: CorePermissionGrantScope::Turn, - strict_auto_review: false, - }; - if let Err(err) = conversation - .submit(Op::RequestPermissionsResponse { - id: request.call_id, - response: empty, - }) - .await - { - error!("failed to submit RequestPermissionsResponse: {err}"); - } - } + let permission_guard = thread_watch_manager + .note_permission_requested(&conversation_id.to_string()) + .await; + let requested_permissions = request.permissions.clone(); + let request_cwd = match request.cwd.clone() { + Some(cwd) => cwd, + None => conversation.config_snapshot().await.cwd, + }; + let params = PermissionsRequestApprovalParams { + thread_id: conversation_id.to_string(), + turn_id: request.turn_id.clone(), + item_id: request.call_id.clone(), + cwd: request_cwd.clone(), + reason: request.reason, + permissions: request.permissions.into(), + }; + let (pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::PermissionsRequestApproval(params)) + .await; + let pending_response = PendingRequestPermissionsResponse { + call_id: request.call_id, + requested_permissions, + request_cwd, + pending_request_id, + receiver: rx, + request_permissions_guard: permission_guard, + }; + tokio::spawn(async move { + on_request_permissions_response(pending_response, conversation, thread_state).await; + }); } EventMsg::DynamicToolCallRequest(request) => { - if matches!(api_version, ApiVersion::V2) { - let call_id = request.call_id; - let turn_id = request.turn_id; - let namespace = request.namespace; - let tool = request.tool; - let arguments = request.arguments; - let item = ThreadItem::DynamicToolCall { - id: call_id.clone(), - namespace: namespace.clone(), - tool: tool.clone(), - arguments: arguments.clone(), - status: DynamicToolCallStatus::InProgress, - content_items: None, - success: None, - duration_ms: None, - }; - let notification = ItemStartedNotification { - thread_id: conversation_id.to_string(), - turn_id: turn_id.clone(), - item, - }; - outgoing - .send_server_notification(ServerNotification::ItemStarted(notification)) - .await; - let params = DynamicToolCallParams { - thread_id: conversation_id.to_string(), - turn_id: turn_id.clone(), - call_id: call_id.clone(), - namespace, - tool: tool.clone(), - arguments: arguments.clone(), - }; - let (_pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::DynamicToolCall(params)) - .await; - tokio::spawn(async move { - crate::dynamic_tools::on_call_response(call_id, rx, conversation).await; - }); - } else { - error!( - "dynamic tool calls are only supported on api v2 (call_id: {})", - request.call_id - ); - let call_id = request.call_id; - let _ = conversation - .submit(Op::DynamicToolResponse { - id: call_id.clone(), - response: CoreDynamicToolResponse { - content_items: vec![CoreDynamicToolCallOutputContentItem::InputText { - text: "dynamic tool calls require api v2".to_string(), - }], - success: false, - }, - }) - .await; - } + let call_id = request.call_id; + let turn_id = request.turn_id; + let namespace = request.namespace; + let tool = request.tool; + let arguments = request.arguments; + let item = ThreadItem::DynamicToolCall { + id: call_id.clone(), + namespace: namespace.clone(), + tool: tool.clone(), + arguments: arguments.clone(), + status: DynamicToolCallStatus::InProgress, + content_items: None, + success: None, + duration_ms: None, + }; + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: turn_id.clone(), + item, + }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + let params = DynamicToolCallParams { + thread_id: conversation_id.to_string(), + turn_id: turn_id.clone(), + call_id: call_id.clone(), + namespace, + tool: tool.clone(), + arguments: arguments.clone(), + }; + let (_pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::DynamicToolCall(params)) + .await; + tokio::spawn(async move { + crate::dynamic_tools::on_call_response(call_id, rx, conversation).await; + }); } EventMsg::DynamicToolCallResponse(response) => { - if matches!(api_version, ApiVersion::V2) { - let status = if response.success { - DynamicToolCallStatus::Completed - } else { - DynamicToolCallStatus::Failed - }; - let duration_ms = i64::try_from(response.duration.as_millis()).ok(); - let item = ThreadItem::DynamicToolCall { - id: response.call_id, - namespace: response.namespace, - tool: response.tool, - arguments: response.arguments, - status, - content_items: Some( - response - .content_items - .into_iter() - .map(|item| match item { - CoreDynamicToolCallOutputContentItem::InputText { text } => { - DynamicToolCallOutputContentItem::InputText { text } - } - CoreDynamicToolCallOutputContentItem::InputImage { image_url } => { - DynamicToolCallOutputContentItem::InputImage { image_url } - } - }) - .collect(), - ), - success: Some(response.success), - duration_ms, - }; - let notification = ItemCompletedNotification { - thread_id: conversation_id.to_string(), - turn_id: response.turn_id, - item, - }; - outgoing - .send_server_notification(ServerNotification::ItemCompleted(notification)) - .await; - } + let status = if response.success { + DynamicToolCallStatus::Completed + } else { + DynamicToolCallStatus::Failed + }; + let duration_ms = i64::try_from(response.duration.as_millis()).ok(); + let item = ThreadItem::DynamicToolCall { + id: response.call_id, + namespace: response.namespace, + tool: response.tool, + arguments: response.arguments, + status, + content_items: Some( + response + .content_items + .into_iter() + .map(|item| match item { + CoreDynamicToolCallOutputContentItem::InputText { text } => { + DynamicToolCallOutputContentItem::InputText { text } + } + CoreDynamicToolCallOutputContentItem::InputImage { image_url } => { + DynamicToolCallOutputContentItem::InputImage { image_url } + } + }) + .collect(), + ), + success: Some(response.success), + duration_ms, + }; + let notification = ItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id: response.turn_id, + item, + }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; } // TODO(celia): properly construct McpToolCall TurnItem in core. EventMsg::McpToolCallBegin(begin_event) => { @@ -1397,16 +1236,6 @@ pub(crate) async fn apply_bespoke_event_handling( EventMsg::ContextCompacted(..) => { // Core still fans out this deprecated event for legacy clients; // v2 clients receive the canonical ContextCompaction item instead. - if matches!(api_version, ApiVersion::V2) { - return; - } - let notification = ContextCompactedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), - }; - outgoing - .send_server_notification(ServerNotification::ContextCompacted(notification)) - .await; } EventMsg::DeprecationNotice(event) => { let notification = DeprecationNoticeNotification { @@ -1590,28 +1419,24 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::HookStarted(event) => { - if let ApiVersion::V2 = api_version { - let notification = HookStartedNotification { - thread_id: conversation_id.to_string(), - turn_id: event.turn_id, - run: event.run.into(), - }; - outgoing - .send_server_notification(ServerNotification::HookStarted(notification)) - .await; - } + let notification = HookStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event.turn_id, + run: event.run.into(), + }; + outgoing + .send_server_notification(ServerNotification::HookStarted(notification)) + .await; } EventMsg::HookCompleted(event) => { - if let ApiVersion::V2 = api_version { - let notification = HookCompletedNotification { - thread_id: conversation_id.to_string(), - turn_id: event.turn_id, - run: event.run.into(), - }; - outgoing - .send_server_notification(ServerNotification::HookCompleted(notification)) - .await; - } + let notification = HookCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id: event.turn_id, + run: event.run.into(), + }; + outgoing + .send_server_notification(ServerNotification::HookCompleted(notification)) + .await; } EventMsg::ExitedReviewMode(review_event) => { let review = match review_event.review_output { @@ -1641,7 +1466,6 @@ pub(crate) async fn apply_bespoke_event_handling( } EventMsg::RawResponseItem(raw_response_item_event) => { maybe_emit_hook_prompt_item_completed( - api_version, conversation_id, &event_turn_id, &raw_response_item_event.item, @@ -1649,7 +1473,6 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; maybe_emit_raw_response_item_completed( - api_version, conversation_id, &event_turn_id, raw_response_item_event.item, @@ -1707,12 +1530,10 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::ExecCommandBegin(exec_command_begin_event) => { - if matches!(api_version, ApiVersion::V2) - && matches!( - exec_command_begin_event.source, - codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction - ) - { + if matches!( + exec_command_begin_event.source, + codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction + ) { // TerminalInteraction is the v2 surface for unified exec // stdin/poll events. Suppress the legacy CommandExecution // item so clients do not render the same wait twice. @@ -1821,12 +1642,10 @@ pub(crate) async fn apply_bespoke_event_handling( .command_execution_started .remove(&call_id); } - if matches!(api_version, ApiVersion::V2) - && matches!( - exec_command_end_event.source, - codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction - ) - { + if matches!( + exec_command_end_event.source, + codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction + ) { // The paired begin event is suppressed above; keep the // completion out of v2 as well so no orphan legacy item is // emitted for unified exec interactions. @@ -1848,12 +1667,7 @@ pub(crate) async fn apply_bespoke_event_handling( EventMsg::TurnAborted(turn_aborted_event) => { // All per-thread requests are bound to a turn, so abort them. outgoing.abort_pending_server_requests().await; - respond_to_pending_interrupts( - &thread_state, - &outgoing, - Some(turn_aborted_event.reason.clone()), - ) - .await; + respond_to_pending_interrupts(&thread_state, &outgoing).await; thread_watch_manager .note_turn_interrupted(&conversation_id.to_string()) @@ -1957,48 +1771,36 @@ pub(crate) async fn apply_bespoke_event_handling( } } EventMsg::ThreadNameUpdated(thread_name_event) => { - if let ApiVersion::V2 = api_version { - let notification = ThreadNameUpdatedNotification { - thread_id: thread_name_event.thread_id.to_string(), - thread_name: thread_name_event.thread_name, - }; - outgoing - .send_global_server_notification(ServerNotification::ThreadNameUpdated( - notification, - )) - .await; - } + let notification = ThreadNameUpdatedNotification { + thread_id: thread_name_event.thread_id.to_string(), + thread_name: thread_name_event.thread_name, + }; + outgoing + .send_global_server_notification(ServerNotification::ThreadNameUpdated( + notification, + )) + .await; } EventMsg::ThreadGoalUpdated(thread_goal_event) => { - if let ApiVersion::V2 = api_version { - let notification = ThreadGoalUpdatedNotification { - thread_id: thread_goal_event.thread_id.to_string(), - turn_id: thread_goal_event.turn_id, - goal: thread_goal_event.goal.clone().into(), - }; - outgoing - .send_global_server_notification(ServerNotification::ThreadGoalUpdated( - notification, - )) - .await; - } + let notification = ThreadGoalUpdatedNotification { + thread_id: thread_goal_event.thread_id.to_string(), + turn_id: thread_goal_event.turn_id, + goal: thread_goal_event.goal.clone().into(), + }; + outgoing + .send_global_server_notification(ServerNotification::ThreadGoalUpdated( + notification, + )) + .await; } EventMsg::TurnDiff(turn_diff_event) => { - handle_turn_diff( - conversation_id, - &event_turn_id, - turn_diff_event, - api_version, - &outgoing, - ) - .await; + handle_turn_diff(conversation_id, &event_turn_id, turn_diff_event, &outgoing).await; } EventMsg::PlanUpdate(plan_update_event) => { handle_turn_plan_update( conversation_id, &event_turn_id, plan_update_event, - api_version, &outgoing, ) .await; @@ -2017,44 +1819,38 @@ async fn handle_turn_diff( conversation_id: ThreadId, event_turn_id: &str, turn_diff_event: TurnDiffEvent, - api_version: ApiVersion, outgoing: &ThreadScopedOutgoingMessageSender, ) { - if let ApiVersion::V2 = api_version { - let notification = TurnDiffUpdatedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.to_string(), - diff: turn_diff_event.unified_diff, - }; - outgoing - .send_server_notification(ServerNotification::TurnDiffUpdated(notification)) - .await; - } + let notification = TurnDiffUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.to_string(), + diff: turn_diff_event.unified_diff, + }; + outgoing + .send_server_notification(ServerNotification::TurnDiffUpdated(notification)) + .await; } async fn handle_turn_plan_update( conversation_id: ThreadId, event_turn_id: &str, plan_update_event: UpdatePlanArgs, - api_version: ApiVersion, outgoing: &ThreadScopedOutgoingMessageSender, ) { // `update_plan` is a todo/checklist tool; it is not related to plan-mode updates - if let ApiVersion::V2 = api_version { - let notification = TurnPlanUpdatedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.to_string(), - explanation: plan_update_event.explanation, - plan: plan_update_event - .plan - .into_iter() - .map(TurnPlanStep::from) - .collect(), - }; - outgoing - .send_server_notification(ServerNotification::TurnPlanUpdated(notification)) - .await; - } + let notification = TurnPlanUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.to_string(), + explanation: plan_update_event.explanation, + plan: plan_update_event + .plan + .into_iter() + .map(TurnPlanStep::from) + .collect(), + }; + outgoing + .send_server_notification(ServerNotification::TurnPlanUpdated(notification)) + .await; } struct TurnCompletionMetadata { @@ -2208,16 +2004,11 @@ async fn complete_command_execution_item( } async fn maybe_emit_raw_response_item_completed( - api_version: ApiVersion, conversation_id: ThreadId, turn_id: &str, item: codex_protocol::models::ResponseItem, outgoing: &ThreadScopedOutgoingMessageSender, ) { - let ApiVersion::V2 = api_version else { - return; - }; - let notification = RawResponseItemCompletedNotification { thread_id: conversation_id.to_string(), turn_id: turn_id.to_string(), @@ -2229,16 +2020,11 @@ async fn maybe_emit_raw_response_item_completed( } pub(crate) async fn maybe_emit_hook_prompt_item_completed( - api_version: ApiVersion, conversation_id: ThreadId, turn_id: &str, item: &codex_protocol::models::ResponseItem, outgoing: &ThreadScopedOutgoingMessageSender, ) { - let ApiVersion::V2 = api_version else { - return; - }; - let codex_protocol::models::ResponseItem::Message { role, content, id, .. } = item @@ -2354,27 +2140,16 @@ async fn handle_thread_rollback_failed( async fn respond_to_pending_interrupts( thread_state: &Arc>, outgoing: &ThreadScopedOutgoingMessageSender, - abort_reason: Option, ) { let pending = { let mut state = thread_state.lock().await; std::mem::take(&mut state.pending_interrupts) }; - for (rid, ver) in pending { - match ver { - ApiVersion::V1 => { - let Some(abort_reason) = abort_reason.clone() else { - debug_assert!(false, "v1 interrupts only resolve from TurnAborted"); - continue; - }; - let response = InterruptConversationResponse { abort_reason }; - outgoing.send_response(rid, response).await; - } - ApiVersion::V2 => { - outgoing.send_response(rid, TurnInterruptResponse {}).await; - } - } + for request_id in pending { + outgoing + .send_response(request_id, TurnInterruptResponse {}) + .await; } } @@ -2415,105 +2190,6 @@ async fn handle_error( state.turn_summary.last_error = Some(error); } -async fn on_patch_approval_response( - call_id: String, - receiver: oneshot::Receiver, - codex: Arc, -) { - let response = receiver.await; - let value = match response { - Ok(Ok(value)) => value, - Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return, - Ok(Err(err)) => { - error!("request failed with client error: {err:?}"); - if let Err(submit_err) = codex - .submit(Op::PatchApproval { - id: call_id.clone(), - decision: ReviewDecision::Denied, - }) - .await - { - error!("failed to submit denied PatchApproval after request failure: {submit_err}"); - } - return; - } - Err(err) => { - error!("request failed: {err:?}"); - if let Err(submit_err) = codex - .submit(Op::PatchApproval { - id: call_id.clone(), - decision: ReviewDecision::Denied, - }) - .await - { - error!("failed to submit denied PatchApproval after request failure: {submit_err}"); - } - return; - } - }; - - let response = - serde_json::from_value::(value).unwrap_or_else(|err| { - error!("failed to deserialize ApplyPatchApprovalResponse: {err}"); - ApplyPatchApprovalResponse { - decision: ReviewDecision::Denied, - } - }); - - if let Err(err) = codex - .submit(Op::PatchApproval { - id: call_id, - decision: response.decision, - }) - .await - { - error!("failed to submit PatchApproval: {err}"); - } -} - -async fn on_exec_approval_response( - call_id: String, - turn_id: String, - receiver: oneshot::Receiver, - conversation: Arc, -) { - let response = receiver.await; - let value = match response { - Ok(Ok(value)) => value, - Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return, - Ok(Err(err)) => { - error!("request failed with client error: {err:?}"); - return; - } - Err(err) => { - error!("request failed: {err:?}"); - return; - } - }; - - // Try to deserialize `value` and then make the appropriate call to `codex`. - let response = - serde_json::from_value::(value).unwrap_or_else(|err| { - error!("failed to deserialize ExecCommandApprovalResponse: {err}"); - // If we cannot deserialize the response, we deny the request to be - // conservative. - ExecCommandApprovalResponse { - decision: ReviewDecision::Denied, - } - }); - - if let Err(err) = conversation - .submit(Op::ExecApproval { - id: call_id, - turn_id: Some(turn_id), - decision: response.decision, - }) - .await - { - error!("failed to submit ExecApproval: {err}"); - } -} - async fn on_request_user_input_response( event_turn_id: String, pending_request_id: RequestId, @@ -3287,7 +2963,6 @@ mod tests { self.thread_state.clone(), self.thread_watch_manager.clone(), Arc::new(tokio::sync::Semaphore::new(/*permits*/ 1)), - ApiVersion::V2, "test-provider".to_string(), &self.codex_home, ) @@ -4425,14 +4100,7 @@ mod tests { let conversation_id = ThreadId::new(); - handle_turn_plan_update( - conversation_id, - "turn-123", - update, - ApiVersion::V2, - &outgoing, - ) - .await; + handle_turn_plan_update(conversation_id, "turn-123", update, &outgoing).await; let msg = recv_broadcast_message(&mut rx).await?; match msg { @@ -4908,7 +4576,6 @@ mod tests { TurnDiffEvent { unified_diff: unified_diff.clone(), }, - ApiVersion::V2, &outgoing, ) .await; @@ -4928,35 +4595,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> { - let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new( - tx, - codex_analytics::AnalyticsEventsClient::disabled(), - )); - let outgoing = ThreadScopedOutgoingMessageSender::new( - outgoing, - vec![ConnectionId(1)], - ThreadId::new(), - ); - let conversation_id = ThreadId::new(); - - handle_turn_diff( - conversation_id, - "turn-1", - TurnDiffEvent { - unified_diff: "diff".to_string(), - }, - ApiVersion::V1, - &outgoing, - ) - .await; - - assert!(rx.try_recv().is_err(), "no messages expected"); - Ok(()) - } - #[tokio::test] async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -4976,14 +4614,7 @@ mod tests { ]) .expect("hook prompt message"); - maybe_emit_hook_prompt_item_completed( - ApiVersion::V2, - conversation_id, - "turn-1", - &item, - &outgoing, - ) - .await; + maybe_emit_hook_prompt_item_completed(conversation_id, "turn-1", &item, &outgoing).await; let msg = recv_broadcast_message(&mut rx).await?; match msg { diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index ffd858ef0d..72e265953a 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -544,14 +544,6 @@ pub(crate) struct CodexMessageProcessor { log_db: Option, } -#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] -pub(crate) enum ApiVersion { - #[allow(dead_code)] - V1, - #[default] - V2, -} - #[derive(Clone)] struct ListenerTaskContext { thread_manager: Arc, @@ -2832,7 +2824,6 @@ impl CodexMessageProcessor { thread_id, request_id.connection_id, experimental_raw_events, - ApiVersion::V2, ) .instrument(tracing::info_span!( "app_server.thread_start.attach_listener", @@ -4204,7 +4195,6 @@ impl CodexMessageProcessor { thread_id, connection_id, /*raw_events_enabled*/ false, - ApiVersion::V2, ) .await, thread_id, @@ -4365,7 +4355,6 @@ impl CodexMessageProcessor { thread_id, request_id.connection_id, /*raw_events_enabled*/ false, - ApiVersion::V2, ) .await, thread_id, @@ -4544,7 +4533,6 @@ impl CodexMessageProcessor { existing_thread_id, existing_thread.clone(), thread_state.clone(), - ApiVersion::V2, ) .await?; @@ -4941,7 +4929,6 @@ impl CodexMessageProcessor { thread_id, request_id.connection_id, /*raw_events_enabled*/ false, - ApiVersion::V2, ) .await, thread_id, @@ -6904,7 +6891,6 @@ impl CodexMessageProcessor { thread_id, request_id.connection_id, /*raw_events_enabled*/ false, - ApiVersion::V2, ) .await { @@ -7190,7 +7176,6 @@ impl CodexMessageProcessor { thread_id, request_id.connection_id, /*raw_events_enabled*/ false, - ApiVersion::V2, ) .await, thread_id, @@ -7317,9 +7302,7 @@ impl CodexMessageProcessor { { return Err(invalid_request("no active turn to interrupt")); } - thread_state - .pending_interrupts - .push((request_id.clone(), ApiVersion::V2)); + thread_state.pending_interrupts.push(request_id.clone()); } self.outgoing @@ -7342,7 +7325,7 @@ impl CodexMessageProcessor { let mut thread_state = thread_state.lock().await; thread_state .pending_interrupts - .retain(|(pending_request_id, _)| pending_request_id != &request_id); + .retain(|pending_request_id| pending_request_id != &request_id); } let interrupt_target = if is_startup_interrupt { "startup" @@ -7364,7 +7347,6 @@ impl CodexMessageProcessor { conversation_id: ThreadId, connection_id: ConnectionId, raw_events_enabled: bool, - api_version: ApiVersion, ) -> Result { Self::ensure_conversation_listener_task( ListenerTaskContext { @@ -7381,7 +7363,6 @@ impl CodexMessageProcessor { conversation_id, connection_id, raw_events_enabled, - api_version, ) .await } @@ -7395,7 +7376,6 @@ impl CodexMessageProcessor { conversation_id: ThreadId, connection_id: ConnectionId, raw_events_enabled: bool, - api_version: ApiVersion, ) -> Result { let conversation = match listener_task_context .thread_manager @@ -7440,7 +7420,6 @@ impl CodexMessageProcessor { conversation_id, conversation, thread_state, - api_version, ) .await { @@ -7482,7 +7461,6 @@ impl CodexMessageProcessor { conversation_id: ThreadId, conversation: Arc, thread_state: Arc>, - api_version: ApiVersion, ) -> Result<(), JSONRPCErrorError> { Self::ensure_listener_task_running_task( ListenerTaskContext { @@ -7499,7 +7477,6 @@ impl CodexMessageProcessor { conversation_id, conversation, thread_state, - api_version, ) .await } @@ -7509,7 +7486,6 @@ impl CodexMessageProcessor { conversation_id: ThreadId, conversation: Arc, thread_state: Arc>, - api_version: ApiVersion, ) -> Result<(), JSONRPCErrorError> { let (cancel_tx, mut cancel_rx) = oneshot::channel(); let Some(mut unloading_state) = UnloadingState::new( @@ -7601,7 +7577,6 @@ impl CodexMessageProcessor { && !raw_events_enabled { maybe_emit_hook_prompt_item_completed( - api_version, conversation_id, &event.id, &raw_response_item_event.item, @@ -7621,7 +7596,6 @@ impl CodexMessageProcessor { thread_state.clone(), thread_watch_manager.clone(), thread_list_state_permit.clone(), - api_version, fallback_model_provider.clone(), codex_home.as_path(), ) diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 73d1c5961b..5122334843 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -22,10 +22,7 @@ use tokio::sync::oneshot; use tokio::sync::watch; use tracing::error; -type PendingInterruptQueue = Vec<( - ConnectionRequestId, - crate::codex_message_processor::ApiVersion, -)>; +type PendingInterruptQueue = Vec; pub(crate) struct PendingThreadResumeRequest { pub(crate) request_id: ConnectionRequestId,