Remove collaboration mode fallback parsing

This commit is contained in:
Charles Cunningham
2026-01-30 20:35:00 -08:00
parent c0812b60ec
commit 035a2df87e
8 changed files with 147 additions and 59 deletions

View File

@@ -31,6 +31,7 @@ use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
use crate::stream_events_utils::response_input_to_response_item;
use crate::terminal;
use crate::transport_manager::TransportManager;
use crate::truncate::TruncationPolicy;
@@ -208,6 +209,7 @@ use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::ContentItem;
use codex_protocol::models::DeveloperInstructions;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::render_command_prefix_list;
@@ -1368,9 +1370,10 @@ impl Session {
&self,
previous_collaboration_mode: &CollaborationMode,
next_collaboration_mode: Option<&CollaborationMode>,
force_collaboration_instructions: bool,
) -> Option<ResponseItem> {
if let Some(next_mode) = next_collaboration_mode {
if previous_collaboration_mode == next_mode {
if !force_collaboration_instructions && previous_collaboration_mode == next_mode {
return None;
}
// If the next mode has empty developer instructions, this returns None and we emit no
@@ -1387,6 +1390,7 @@ impl Session {
current_context: &TurnContext,
previous_collaboration_mode: &CollaborationMode,
next_collaboration_mode: Option<&CollaborationMode>,
force_collaboration_instructions: bool,
) -> Vec<ResponseItem> {
let mut update_items = Vec::new();
if let Some(env_item) =
@@ -1402,6 +1406,7 @@ impl Session {
if let Some(collaboration_mode_item) = self.build_collaboration_mode_update_item(
previous_collaboration_mode,
next_collaboration_mode,
force_collaboration_instructions,
) {
update_items.push(collaboration_mode_item);
}
@@ -1676,6 +1681,7 @@ impl Session {
pub async fn notify_user_input_response(
&self,
sub_id: &str,
call_id: Option<String>,
response: RequestUserInputResponse,
) {
let entry = {
@@ -1694,6 +1700,36 @@ impl Session {
}
None => {
warn!("No pending user input found for sub_id: {sub_id}");
if response.answers.is_empty() {
warn!(
"dropping empty request_user_input response for sub_id: {sub_id}; likely cancelled"
);
return;
}
let call_id = call_id.unwrap_or_else(|| sub_id.to_string());
let content = match response.to_tool_output_content() {
Ok(content) => content,
Err(err) => {
warn!(
"failed to serialize request_user_input response for call_id: {call_id}: {err}"
);
return;
}
};
let response_input = ResponseInputItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content,
success: Some(true),
..Default::default()
},
};
let Some(response_item) = response_input_to_response_item(&response_input) else {
return;
};
let turn_context = self.new_default_turn_with_sub_id(sub_id.to_string()).await;
self.record_conversation_items(&turn_context, &[response_item])
.await;
}
}
}
@@ -2517,8 +2553,12 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
Op::PatchApproval { id, decision } => {
handlers::patch_approval(&sess, id, decision).await;
}
Op::UserInputAnswer { id, response } => {
handlers::request_user_input_response(&sess, id, response).await;
Op::UserInputAnswer {
id,
call_id,
response,
} => {
handlers::request_user_input_response(&sess, id, call_id, response).await;
}
Op::DynamicToolResponse { id, response } => {
handlers::dynamic_tool_response(&sess, id, response).await;
@@ -2621,16 +2661,10 @@ mod handlers {
use codex_protocol::request_user_input::RequestUserInputResponse;
use crate::context_manager::is_user_turn_boundary;
use crate::models_manager::collaboration_mode_presets::mask_from_instructions;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::COLLABORATION_MODE_CLOSE_TAG;
use codex_protocol::protocol::COLLABORATION_MODE_OPEN_TAG;
use codex_protocol::user_input::UserInput;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
@@ -2679,11 +2713,18 @@ mod handlers {
}
let current_context = sess.new_default_turn_with_sub_id(sub_id).await;
let force_collaboration_instructions = {
let mut state = sess.state.lock().await;
let force = state.force_collaboration_instructions;
state.force_collaboration_instructions = false;
force
};
let update_items = sess.build_settings_update_items(
Some(&previous_context),
&current_context,
&previous_collaboration_mode,
next_collaboration_mode.as_ref(),
force_collaboration_instructions,
);
if !update_items.is_empty() {
sess.record_conversation_items(&current_context, &update_items)
@@ -2767,11 +2808,18 @@ mod handlers {
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
sess.seed_initial_context_if_needed(&current_context).await;
let force_collaboration_instructions = {
let mut state = sess.state.lock().await;
let force = state.force_collaboration_instructions;
state.force_collaboration_instructions = false;
force
};
let update_items = sess.build_settings_update_items(
previous_context.as_ref(),
&current_context,
&previous_collaboration_mode,
next_collaboration_mode.as_ref(),
force_collaboration_instructions,
);
if !update_items.is_empty() {
sess.record_conversation_items(&current_context, &update_items)
@@ -2878,9 +2926,11 @@ mod handlers {
pub async fn request_user_input_response(
sess: &Arc<Session>,
id: String,
call_id: Option<String>,
response: RequestUserInputResponse,
) {
sess.notify_user_input_response(&id, response).await;
sess.notify_user_input_response(&id, call_id, response)
.await;
}
pub async fn dynamic_tool_response(
@@ -3089,12 +3139,7 @@ mod handlers {
state.session_configuration.collaboration_mode = collaboration_mode;
applied = true;
}
if !applied && let Some(mask) = last_collaboration_mask(history.raw_items()) {
state.session_configuration.collaboration_mode = state
.session_configuration
.collaboration_mode
.apply_mask(&mask);
}
state.force_collaboration_instructions = !applied;
sess.recompute_token_usage(turn_context.as_ref()).await;
sess.send_event_raw_flushed(Event {
@@ -3217,33 +3262,6 @@ mod handlers {
true
}
fn last_collaboration_mask(items: &[ResponseItem]) -> Option<CollaborationModeMask> {
items.iter().rev().find_map(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return None;
};
if role != "developer" {
return None;
}
let text = content.iter().find_map(|item| {
if let ContentItem::InputText { text } = item {
Some(text.as_str())
} else {
None
}
})?;
let instructions = extract_collaboration_instructions(text)?;
mask_from_instructions(instructions)
})
}
fn extract_collaboration_instructions(text: &str) -> Option<&str> {
let start = text.find(COLLABORATION_MODE_OPEN_TAG)? + COLLABORATION_MODE_OPEN_TAG.len();
let rest = text.get(start..)?;
let end = rest.find(COLLABORATION_MODE_CLOSE_TAG)?;
rest.get(..end)
}
pub async fn review(
sess: &Arc<Session>,
config: &Arc<Config>,
@@ -4656,6 +4674,7 @@ mod tests {
use codex_app_server_protocol::AppInfo;
use codex_app_server_protocol::AuthMode;
use codex_protocol::models::ContentItem;
use codex_protocol::models::DeveloperInstructions;
use codex_protocol::models::ResponseItem;
use std::path::Path;
use std::time::Duration;
@@ -5059,6 +5078,69 @@ mod tests {
assert_eq!(current_mode.mode, ModeKind::Plan);
}
#[tokio::test]
async fn thread_rollback_missing_turn_context_forces_collaboration_instructions() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;
let base_mode = sess.current_collaboration_mode().await;
let collaboration_mode = CollaborationMode {
mode: ModeKind::Custom,
settings: Settings {
model: base_mode.settings.model.clone(),
reasoning_effort: base_mode.settings.reasoning_effort,
developer_instructions: Some("rollback instructions".to_string()),
},
};
{
let mut state = sess.state.lock().await;
state.session_configuration.collaboration_mode = collaboration_mode.clone();
}
let user_turn = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "turn without turn context".to_string(),
}],
end_turn: None,
}];
sess.record_into_history(&user_turn, tc.as_ref()).await;
handlers::thread_rollback(&sess, "sub-1".to_string(), 1).await;
let rollback_event = wait_for_thread_rolled_back(&rx).await;
assert_eq!(rollback_event.num_turns, 1);
let force_collaboration_instructions = {
let mut state = sess.state.lock().await;
let force = state.force_collaboration_instructions;
state.force_collaboration_instructions = false;
force
};
assert!(force_collaboration_instructions);
let current_context = sess.new_default_turn_with_sub_id("sub-2".to_string()).await;
let previous_collaboration_mode = {
let state = sess.state.lock().await;
state.session_configuration.collaboration_mode.clone()
};
let update_items = sess.build_settings_update_items(
None,
&current_context,
&previous_collaboration_mode,
Some(&previous_collaboration_mode),
force_collaboration_instructions,
);
let expected_item: ResponseItem =
DeveloperInstructions::from_collaboration_mode(&previous_collaboration_mode)
.expect("expected collaboration instructions")
.into();
assert!(update_items.contains(&expected_item));
}
#[tokio::test]
async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

View File

@@ -380,7 +380,13 @@ async fn handle_request_user_input(
cancel_token,
)
.await;
let _ = codex.submit(Op::UserInputAnswer { id, response }).await;
let _ = codex
.submit(Op::UserInputAnswer {
id,
call_id: Some(event.call_id.clone()),
response,
})
.await;
}
async fn await_user_input_with_cancel<F>(
@@ -399,7 +405,7 @@ where
answers: HashMap::new(),
};
parent_session
.notify_user_input_response(sub_id, empty.clone())
.notify_user_input_response(sub_id, None, empty.clone())
.await;
empty
}

View File

@@ -106,7 +106,8 @@ async fn should_install_mcp_dependencies(
let empty = RequestUserInputResponse {
answers: HashMap::new(),
};
sess.notify_user_input_response(sub_id, empty.clone()).await;
sess.notify_user_input_response(sub_id, None, empty.clone())
.await;
empty
}
response = response_fut => response.unwrap_or_else(|| RequestUserInputResponse {

View File

@@ -18,18 +18,6 @@ pub(super) fn builtin_collaboration_mode_presets() -> Vec<CollaborationModeMask>
]
}
pub(crate) fn mask_from_instructions(instructions: &str) -> Option<CollaborationModeMask> {
let normalized = instructions.trim();
builtin_collaboration_mode_presets()
.into_iter()
.find(|mask| {
mask.developer_instructions
.as_ref()
.and_then(|value| value.as_ref())
.is_some_and(|text| text.trim() == normalized)
})
}
#[cfg(any(test, feature = "test-support"))]
pub fn test_builtin_collaboration_mode_presets() -> Vec<CollaborationModeMask> {
builtin_collaboration_mode_presets()

View File

@@ -17,6 +17,7 @@ pub(crate) struct SessionState {
pub(crate) session_configuration: SessionConfiguration,
pub(crate) history: ContextManager,
pub(crate) turn_context_history: Vec<Option<TurnContextItem>>,
pub(crate) force_collaboration_instructions: bool,
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
pub(crate) server_reasoning_included: bool,
pub(crate) dependency_env: HashMap<String, String>,
@@ -36,6 +37,7 @@ impl SessionState {
session_configuration,
history,
turn_context_history: Vec::new(),
force_collaboration_instructions: false,
latest_rate_limits: None,
server_reasoning_included: false,
dependency_env: HashMap::new(),

View File

@@ -71,7 +71,7 @@ impl ToolHandler for RequestUserInputHandler {
)
})?;
let content = serde_json::to_string(&response).map_err(|err| {
let content = response.to_tool_output_content().map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize request_user_input response: {err}"
))

View File

@@ -222,6 +222,9 @@ pub enum Op {
UserInputAnswer {
/// Turn id for the in-flight request.
id: String,
/// Tool call id for the in-flight request, if available.
#[serde(skip_serializing_if = "Option::is_none")]
call_id: Option<String>,
/// User-provided answers.
response: RequestUserInputResponse,
},

View File

@@ -43,6 +43,12 @@ pub struct RequestUserInputResponse {
pub answers: HashMap<String, RequestUserInputAnswer>,
}
impl RequestUserInputResponse {
pub fn to_tool_output_content(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct RequestUserInputEvent {
/// Responses API call id for the associated tool call, if available.