diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 08db8633d1..66d4098dfd 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -31,6 +31,7 @@ use crate::stream_events_utils::HandleOutputCtx; use crate::stream_events_utils::handle_non_tool_response_item; use crate::stream_events_utils::handle_output_item_done; use crate::stream_events_utils::last_assistant_message_from_item; +use crate::stream_events_utils::response_input_to_response_item; use crate::terminal; use crate::transport_manager::TransportManager; use crate::truncate::TruncationPolicy; @@ -208,6 +209,7 @@ use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::models::ContentItem; use codex_protocol::models::DeveloperInstructions; +use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::models::render_command_prefix_list; @@ -1368,9 +1370,10 @@ impl Session { &self, previous_collaboration_mode: &CollaborationMode, next_collaboration_mode: Option<&CollaborationMode>, + force_collaboration_instructions: bool, ) -> Option { if let Some(next_mode) = next_collaboration_mode { - if previous_collaboration_mode == next_mode { + if !force_collaboration_instructions && previous_collaboration_mode == next_mode { return None; } // If the next mode has empty developer instructions, this returns None and we emit no @@ -1387,6 +1390,7 @@ impl Session { current_context: &TurnContext, previous_collaboration_mode: &CollaborationMode, next_collaboration_mode: Option<&CollaborationMode>, + force_collaboration_instructions: bool, ) -> Vec { let mut update_items = Vec::new(); if let Some(env_item) = @@ -1402,6 +1406,7 @@ impl Session { if let Some(collaboration_mode_item) = self.build_collaboration_mode_update_item( previous_collaboration_mode, next_collaboration_mode, + force_collaboration_instructions, ) { update_items.push(collaboration_mode_item); } @@ -1676,6 +1681,7 @@ impl Session { pub async fn notify_user_input_response( &self, sub_id: &str, + call_id: Option, response: RequestUserInputResponse, ) { let entry = { @@ -1694,6 +1700,36 @@ impl Session { } None => { warn!("No pending user input found for sub_id: {sub_id}"); + if response.answers.is_empty() { + warn!( + "dropping empty request_user_input response for sub_id: {sub_id}; likely cancelled" + ); + return; + } + let call_id = call_id.unwrap_or_else(|| sub_id.to_string()); + let content = match response.to_tool_output_content() { + Ok(content) => content, + Err(err) => { + warn!( + "failed to serialize request_user_input response for call_id: {call_id}: {err}" + ); + return; + } + }; + let response_input = ResponseInputItem::FunctionCallOutput { + call_id: call_id.clone(), + output: FunctionCallOutputPayload { + content, + success: Some(true), + ..Default::default() + }, + }; + let Some(response_item) = response_input_to_response_item(&response_input) else { + return; + }; + let turn_context = self.new_default_turn_with_sub_id(sub_id.to_string()).await; + self.record_conversation_items(&turn_context, &[response_item]) + .await; } } } @@ -2517,8 +2553,12 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv Op::PatchApproval { id, decision } => { handlers::patch_approval(&sess, id, decision).await; } - Op::UserInputAnswer { id, response } => { - handlers::request_user_input_response(&sess, id, response).await; + Op::UserInputAnswer { + id, + call_id, + response, + } => { + handlers::request_user_input_response(&sess, id, call_id, response).await; } Op::DynamicToolResponse { id, response } => { handlers::dynamic_tool_response(&sess, id, response).await; @@ -2621,16 +2661,10 @@ mod handlers { use codex_protocol::request_user_input::RequestUserInputResponse; use crate::context_manager::is_user_turn_boundary; - use crate::models_manager::collaboration_mode_presets::mask_from_instructions; use codex_protocol::config_types::CollaborationMode; - use codex_protocol::config_types::CollaborationModeMask; use codex_protocol::config_types::ModeKind; use codex_protocol::config_types::Settings; use codex_protocol::dynamic_tools::DynamicToolResponse; - use codex_protocol::models::ContentItem; - use codex_protocol::models::ResponseItem; - use codex_protocol::protocol::COLLABORATION_MODE_CLOSE_TAG; - use codex_protocol::protocol::COLLABORATION_MODE_OPEN_TAG; use codex_protocol::user_input::UserInput; use codex_rmcp_client::ElicitationAction; use codex_rmcp_client::ElicitationResponse; @@ -2679,11 +2713,18 @@ mod handlers { } let current_context = sess.new_default_turn_with_sub_id(sub_id).await; + let force_collaboration_instructions = { + let mut state = sess.state.lock().await; + let force = state.force_collaboration_instructions; + state.force_collaboration_instructions = false; + force + }; let update_items = sess.build_settings_update_items( Some(&previous_context), ¤t_context, &previous_collaboration_mode, next_collaboration_mode.as_ref(), + force_collaboration_instructions, ); if !update_items.is_empty() { sess.record_conversation_items(¤t_context, &update_items) @@ -2767,11 +2808,18 @@ mod handlers { // Attempt to inject input into current task if let Err(items) = sess.inject_input(items).await { sess.seed_initial_context_if_needed(¤t_context).await; + let force_collaboration_instructions = { + let mut state = sess.state.lock().await; + let force = state.force_collaboration_instructions; + state.force_collaboration_instructions = false; + force + }; let update_items = sess.build_settings_update_items( previous_context.as_ref(), ¤t_context, &previous_collaboration_mode, next_collaboration_mode.as_ref(), + force_collaboration_instructions, ); if !update_items.is_empty() { sess.record_conversation_items(¤t_context, &update_items) @@ -2878,9 +2926,11 @@ mod handlers { pub async fn request_user_input_response( sess: &Arc, id: String, + call_id: Option, response: RequestUserInputResponse, ) { - sess.notify_user_input_response(&id, response).await; + sess.notify_user_input_response(&id, call_id, response) + .await; } pub async fn dynamic_tool_response( @@ -3089,12 +3139,7 @@ mod handlers { state.session_configuration.collaboration_mode = collaboration_mode; applied = true; } - if !applied && let Some(mask) = last_collaboration_mask(history.raw_items()) { - state.session_configuration.collaboration_mode = state - .session_configuration - .collaboration_mode - .apply_mask(&mask); - } + state.force_collaboration_instructions = !applied; sess.recompute_token_usage(turn_context.as_ref()).await; sess.send_event_raw_flushed(Event { @@ -3217,33 +3262,6 @@ mod handlers { true } - fn last_collaboration_mask(items: &[ResponseItem]) -> Option { - items.iter().rev().find_map(|item| { - let ResponseItem::Message { role, content, .. } = item else { - return None; - }; - if role != "developer" { - return None; - } - let text = content.iter().find_map(|item| { - if let ContentItem::InputText { text } = item { - Some(text.as_str()) - } else { - None - } - })?; - let instructions = extract_collaboration_instructions(text)?; - mask_from_instructions(instructions) - }) - } - - fn extract_collaboration_instructions(text: &str) -> Option<&str> { - let start = text.find(COLLABORATION_MODE_OPEN_TAG)? + COLLABORATION_MODE_OPEN_TAG.len(); - let rest = text.get(start..)?; - let end = rest.find(COLLABORATION_MODE_CLOSE_TAG)?; - rest.get(..end) - } - pub async fn review( sess: &Arc, config: &Arc, @@ -4656,6 +4674,7 @@ mod tests { use codex_app_server_protocol::AppInfo; use codex_app_server_protocol::AuthMode; use codex_protocol::models::ContentItem; + use codex_protocol::models::DeveloperInstructions; use codex_protocol::models::ResponseItem; use std::path::Path; use std::time::Duration; @@ -5059,6 +5078,69 @@ mod tests { assert_eq!(current_mode.mode, ModeKind::Plan); } + #[tokio::test] + async fn thread_rollback_missing_turn_context_forces_collaboration_instructions() { + let (sess, tc, rx) = make_session_and_context_with_rx().await; + + let initial_context = sess.build_initial_context(tc.as_ref()).await; + sess.record_into_history(&initial_context, tc.as_ref()) + .await; + + let base_mode = sess.current_collaboration_mode().await; + let collaboration_mode = CollaborationMode { + mode: ModeKind::Custom, + settings: Settings { + model: base_mode.settings.model.clone(), + reasoning_effort: base_mode.settings.reasoning_effort, + developer_instructions: Some("rollback instructions".to_string()), + }, + }; + { + let mut state = sess.state.lock().await; + state.session_configuration.collaboration_mode = collaboration_mode.clone(); + } + + let user_turn = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "turn without turn context".to_string(), + }], + end_turn: None, + }]; + sess.record_into_history(&user_turn, tc.as_ref()).await; + + handlers::thread_rollback(&sess, "sub-1".to_string(), 1).await; + let rollback_event = wait_for_thread_rolled_back(&rx).await; + assert_eq!(rollback_event.num_turns, 1); + + let force_collaboration_instructions = { + let mut state = sess.state.lock().await; + let force = state.force_collaboration_instructions; + state.force_collaboration_instructions = false; + force + }; + assert!(force_collaboration_instructions); + + let current_context = sess.new_default_turn_with_sub_id("sub-2".to_string()).await; + let previous_collaboration_mode = { + let state = sess.state.lock().await; + state.session_configuration.collaboration_mode.clone() + }; + let update_items = sess.build_settings_update_items( + None, + ¤t_context, + &previous_collaboration_mode, + Some(&previous_collaboration_mode), + force_collaboration_instructions, + ); + let expected_item: ResponseItem = + DeveloperInstructions::from_collaboration_mode(&previous_collaboration_mode) + .expect("expected collaboration instructions") + .into(); + assert!(update_items.contains(&expected_item)); + } + #[tokio::test] async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() { let (sess, tc, rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 7a611c05e2..1f61ef711d 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -380,7 +380,13 @@ async fn handle_request_user_input( cancel_token, ) .await; - let _ = codex.submit(Op::UserInputAnswer { id, response }).await; + let _ = codex + .submit(Op::UserInputAnswer { + id, + call_id: Some(event.call_id.clone()), + response, + }) + .await; } async fn await_user_input_with_cancel( @@ -399,7 +405,7 @@ where answers: HashMap::new(), }; parent_session - .notify_user_input_response(sub_id, empty.clone()) + .notify_user_input_response(sub_id, None, empty.clone()) .await; empty } diff --git a/codex-rs/core/src/mcp/skill_dependencies.rs b/codex-rs/core/src/mcp/skill_dependencies.rs index 37620d73db..8fdc413312 100644 --- a/codex-rs/core/src/mcp/skill_dependencies.rs +++ b/codex-rs/core/src/mcp/skill_dependencies.rs @@ -106,7 +106,8 @@ async fn should_install_mcp_dependencies( let empty = RequestUserInputResponse { answers: HashMap::new(), }; - sess.notify_user_input_response(sub_id, empty.clone()).await; + sess.notify_user_input_response(sub_id, None, empty.clone()) + .await; empty } response = response_fut => response.unwrap_or_else(|| RequestUserInputResponse { diff --git a/codex-rs/core/src/models_manager/collaboration_mode_presets.rs b/codex-rs/core/src/models_manager/collaboration_mode_presets.rs index 5d3cb250b2..83059b912e 100644 --- a/codex-rs/core/src/models_manager/collaboration_mode_presets.rs +++ b/codex-rs/core/src/models_manager/collaboration_mode_presets.rs @@ -18,18 +18,6 @@ pub(super) fn builtin_collaboration_mode_presets() -> Vec ] } -pub(crate) fn mask_from_instructions(instructions: &str) -> Option { - let normalized = instructions.trim(); - builtin_collaboration_mode_presets() - .into_iter() - .find(|mask| { - mask.developer_instructions - .as_ref() - .and_then(|value| value.as_ref()) - .is_some_and(|text| text.trim() == normalized) - }) -} - #[cfg(any(test, feature = "test-support"))] pub fn test_builtin_collaboration_mode_presets() -> Vec { builtin_collaboration_mode_presets() diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 4300056f27..fee67b5aff 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -17,6 +17,7 @@ pub(crate) struct SessionState { pub(crate) session_configuration: SessionConfiguration, pub(crate) history: ContextManager, pub(crate) turn_context_history: Vec>, + pub(crate) force_collaboration_instructions: bool, pub(crate) latest_rate_limits: Option, pub(crate) server_reasoning_included: bool, pub(crate) dependency_env: HashMap, @@ -36,6 +37,7 @@ impl SessionState { session_configuration, history, turn_context_history: Vec::new(), + force_collaboration_instructions: false, latest_rate_limits: None, server_reasoning_included: false, dependency_env: HashMap::new(), diff --git a/codex-rs/core/src/tools/handlers/request_user_input.rs b/codex-rs/core/src/tools/handlers/request_user_input.rs index b78bb51181..48f237f40f 100644 --- a/codex-rs/core/src/tools/handlers/request_user_input.rs +++ b/codex-rs/core/src/tools/handlers/request_user_input.rs @@ -71,7 +71,7 @@ impl ToolHandler for RequestUserInputHandler { ) })?; - let content = serde_json::to_string(&response).map_err(|err| { + let content = response.to_tool_output_content().map_err(|err| { FunctionCallError::Fatal(format!( "failed to serialize request_user_input response: {err}" )) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 6a6939620f..5ba1f4f6e3 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -222,6 +222,9 @@ pub enum Op { UserInputAnswer { /// Turn id for the in-flight request. id: String, + /// Tool call id for the in-flight request, if available. + #[serde(skip_serializing_if = "Option::is_none")] + call_id: Option, /// User-provided answers. response: RequestUserInputResponse, }, diff --git a/codex-rs/protocol/src/request_user_input.rs b/codex-rs/protocol/src/request_user_input.rs index cb076264dd..fa5d20b1ab 100644 --- a/codex-rs/protocol/src/request_user_input.rs +++ b/codex-rs/protocol/src/request_user_input.rs @@ -43,6 +43,12 @@ pub struct RequestUserInputResponse { pub answers: HashMap, } +impl RequestUserInputResponse { + pub fn to_tool_output_content(&self) -> Result { + serde_json::to_string(self) + } +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] pub struct RequestUserInputEvent { /// Responses API call id for the associated tool call, if available.