Fix rollback lock and propagate user input call_id

This commit is contained in:
Charles Cunningham
2026-01-31 11:50:33 -08:00
parent a057ee0d21
commit 37ef9a1904
4 changed files with 36 additions and 21 deletions

View File

@@ -294,17 +294,25 @@ pub(crate) async fn apply_bespoke_event_handling(
}),
})
.collect();
let call_id = request.call_id.clone();
let params = ToolRequestUserInputParams {
thread_id: conversation_id.to_string(),
turn_id: request.turn_id,
item_id: request.call_id,
item_id: call_id.clone(),
questions,
};
let rx = outgoing
.send_request(ServerRequestPayload::ToolRequestUserInput(params))
.await;
let response_call_id = call_id.clone();
tokio::spawn(async move {
on_request_user_input_response(event_turn_id, rx, conversation).await;
on_request_user_input_response(
event_turn_id,
response_call_id,
rx,
conversation,
)
.await;
});
} else {
error!(
@@ -317,6 +325,7 @@ pub(crate) async fn apply_bespoke_event_handling(
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
call_id: Some(request.call_id.clone()),
response: empty,
})
.await
@@ -1483,6 +1492,7 @@ async fn on_exec_approval_response(
async fn on_request_user_input_response(
event_turn_id: String,
call_id: String,
receiver: oneshot::Receiver<JsonValue>,
conversation: Arc<CodexThread>,
) {
@@ -1497,6 +1507,7 @@ async fn on_request_user_input_response(
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
call_id: Some(call_id.clone()),
response: empty,
})
.await
@@ -1532,6 +1543,7 @@ async fn on_request_user_input_response(
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
call_id: Some(call_id),
response,
})
.await

View File

@@ -3143,27 +3143,29 @@ mod handlers {
// Replace with the raw items. We don't want to replace with a normalized
// version of the history.
let user_turns = Self::user_turn_count(history.raw_items());
let user_turns = Session::user_turn_count(history.raw_items());
sess.replace_history(history.raw_items().to_vec()).await;
let mut state = sess.state.lock().await;
let mut updated_turn_context_history = existing_turn_context_history;
let truncated_len = updated_turn_context_history
.len()
.saturating_sub(num_turns as usize);
updated_turn_context_history.truncate(truncated_len);
if updated_turn_context_history.len() < user_turns {
updated_turn_context_history.resize_with(user_turns, || None);
}
state.set_turn_context_history(updated_turn_context_history);
let mut applied = false;
if state.turn_context_history.len() == user_turns
&& let Some(turn_context) = state.last_turn_context()
&& let Some(collaboration_mode) = turn_context.collaboration_mode.clone()
{
state.session_configuration.collaboration_mode = collaboration_mode;
applied = true;
let mut state = sess.state.lock().await;
let mut updated_turn_context_history = existing_turn_context_history;
let truncated_len = updated_turn_context_history
.len()
.saturating_sub(num_turns as usize);
updated_turn_context_history.truncate(truncated_len);
if updated_turn_context_history.len() < user_turns {
updated_turn_context_history.resize_with(user_turns, || None);
}
state.set_turn_context_history(updated_turn_context_history);
let mut applied = false;
if state.turn_context_history.len() == user_turns
&& let Some(turn_context) = state.last_turn_context()
&& let Some(collaboration_mode) = turn_context.collaboration_mode.clone()
{
state.session_configuration.collaboration_mode = collaboration_mode;
applied = true;
}
state.force_collaboration_instructions = !applied;
}
state.force_collaboration_instructions = !applied;
sess.recompute_token_usage(turn_context.as_ref()).await;
sess.send_event_raw_flushed(Event {

View File

@@ -165,6 +165,7 @@ async fn request_user_input_round_trip_resolves_pending() -> anyhow::Result<()>
codex
.submit(Op::UserInputAnswer {
id: request.turn_id.clone(),
call_id: Some(request.call_id.clone()),
response,
})
.await?;

View File

@@ -69,7 +69,7 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc
- `Op::UserInput` Legacy form of user input
- `Op::Interrupt` Interrupts a running turn
- `Op::ExecApproval` Approve or deny code execution
- `Op::UserInputAnswer` Provide answers for a `request_user_input` tool call
- `Op::UserInputAnswer` Provide answers for a `request_user_input` tool call (optionally include `call_id`)
- `Op::ListSkills` Request skills for one or more cwd values (optionally `force_reload`)
- `Op::UserTurn` and `Op::OverrideTurnContext` accept an optional `personality` override that updates the models communication style
- `EventMsg`