mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
3 Commits
main
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdf9b7566f | ||
|
|
610cd53595 | ||
|
|
a6b0d4b210 |
@@ -1057,6 +1057,80 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
/// Track a freshly emitted `ResponseItem` for the active turn.
|
||||
///
|
||||
/// When the model stream produces an item (e.g. a tool call) we register it
|
||||
/// immediately so abort paths can still flush it into history even if the
|
||||
/// matching tool future has not completed yet. The returned index lets the
|
||||
/// caller later patch the slot when the response arrives.
|
||||
pub async fn append_processed_item(
|
||||
&self,
|
||||
sub_id: &str,
|
||||
item: ProcessedResponseItem,
|
||||
) -> Option<usize> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
if let Some(at) = active.as_mut() {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
Some(ts.push_processed_item(item))
|
||||
} else {
|
||||
trace!(sub_id, "dropping processed item; turn not active");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Patch a previously queued `ProcessedResponseItem` with the tool
|
||||
/// response.
|
||||
///
|
||||
/// Tool futures resolve outside of the stream loop. Once a result is
|
||||
/// available we update the placeholder slot created by
|
||||
/// [`append_processed_item`] so the turn state reflects the final
|
||||
/// `ResponseInputItem`. If the turn has already ended (e.g. abort), we skip
|
||||
/// the update—the abort handler will synthesize the appropriate output.
|
||||
pub async fn update_processed_item_response(
|
||||
&self,
|
||||
sub_id: &str,
|
||||
index: usize,
|
||||
response: Option<ResponseInputItem>,
|
||||
) {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
if let Some(at) = active.as_mut() {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.set_processed_item_response(index, response);
|
||||
} else {
|
||||
trace!(
|
||||
sub_id,
|
||||
index, "dropping processed item update; turn not active"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn take_processed_items(&self, _sub_id: &str) -> Vec<ProcessedResponseItem> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.take_processed_items()
|
||||
}
|
||||
None => Vec::with_capacity(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn process_pending_items_after_abort(
|
||||
&self,
|
||||
processed_items: Vec<ProcessedResponseItem>,
|
||||
is_review_mode: bool,
|
||||
) {
|
||||
if processed_items.is_empty() {
|
||||
return;
|
||||
}
|
||||
let ProcessedItemsSummary { history_items, .. } =
|
||||
summarize_processed_items(processed_items);
|
||||
if history_items.is_empty() || is_review_mode {
|
||||
return;
|
||||
}
|
||||
self.record_conversation_items(&history_items).await;
|
||||
}
|
||||
|
||||
pub async fn call_tool(
|
||||
&self,
|
||||
server: &str,
|
||||
@@ -1753,107 +1827,20 @@ pub(crate) async fn run_task(
|
||||
let token_limit_reached = total_usage_tokens
|
||||
.map(|tokens| (tokens as i64) >= limit)
|
||||
.unwrap_or(false);
|
||||
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
|
||||
let mut responses = Vec::<ResponseInputItem>::new();
|
||||
for processed_response_item in processed_items {
|
||||
let ProcessedResponseItem { item, response } = processed_response_item;
|
||||
match (&item, &response) {
|
||||
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
|
||||
// If the model returned a message, we need to record it.
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
}
|
||||
(
|
||||
ResponseItem::LocalShellCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::CustomToolCall { .. },
|
||||
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
let output = match result {
|
||||
Ok(call_tool_result) => {
|
||||
convert_call_tool_result_to_function_call_output_payload(
|
||||
call_tool_result,
|
||||
)
|
||||
}
|
||||
Err(err) => FunctionCallOutputPayload {
|
||||
content: err.clone(),
|
||||
success: Some(false),
|
||||
},
|
||||
};
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output,
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
encrypted_content,
|
||||
},
|
||||
None,
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
|
||||
id: id.clone(),
|
||||
summary: summary.clone(),
|
||||
content: content.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
warn!("Unexpected response item: {item:?} with response: {response:?}");
|
||||
}
|
||||
};
|
||||
if let Some(response) = response {
|
||||
responses.push(response);
|
||||
}
|
||||
}
|
||||
let ProcessedItemsSummary {
|
||||
history_items,
|
||||
responses,
|
||||
} = summarize_processed_items(processed_items);
|
||||
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
// Clear the pending buffer before any awaited work so abort paths cannot replay
|
||||
// already-recorded items.
|
||||
let _ = sess.take_processed_items(&sub_id).await;
|
||||
|
||||
if !history_items.is_empty() {
|
||||
if is_review_mode {
|
||||
review_thread_history
|
||||
.extend(items_to_record_in_conversation_history.clone());
|
||||
review_thread_history.extend(history_items.clone());
|
||||
} else {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
sess.record_conversation_items(&history_items).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1882,9 +1869,7 @@ pub(crate) async fn run_task(
|
||||
auto_compact_recently_attempted = false;
|
||||
|
||||
if responses.is_empty() {
|
||||
last_agent_message = get_last_assistant_message_from_turn(
|
||||
&items_to_record_in_conversation_history,
|
||||
);
|
||||
last_agent_message = get_last_assistant_message_from_turn(&history_items);
|
||||
sess.notifier()
|
||||
.notify(&UserNotification::AgentTurnComplete {
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
@@ -2046,12 +2031,138 @@ async fn run_turn(
|
||||
/// events map to a `ResponseItem`. A `ResponseItem` may need to be
|
||||
/// "handled" such that it produces a `ResponseInputItem` that needs to be
|
||||
/// sent back to the model on the next turn.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ProcessedResponseItem {
|
||||
pub(crate) item: ResponseItem,
|
||||
pub(crate) response: Option<ResponseInputItem>,
|
||||
}
|
||||
|
||||
struct ProcessedItemsSummary {
|
||||
history_items: Vec<ResponseItem>,
|
||||
responses: Vec<ResponseInputItem>,
|
||||
}
|
||||
|
||||
fn summarize_processed_items(processed_items: Vec<ProcessedResponseItem>) -> ProcessedItemsSummary {
|
||||
let mut summary = ProcessedItemsSummary {
|
||||
history_items: Vec::new(),
|
||||
responses: Vec::new(),
|
||||
};
|
||||
|
||||
for processed_response_item in processed_items {
|
||||
let ProcessedResponseItem { item, response } = processed_response_item;
|
||||
match (&item, &response) {
|
||||
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
|
||||
summary.history_items.push(item.clone());
|
||||
}
|
||||
(
|
||||
ResponseItem::LocalShellCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
summary.history_items.push(item.clone());
|
||||
summary
|
||||
.history_items
|
||||
.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
summary.history_items.push(item.clone());
|
||||
summary
|
||||
.history_items
|
||||
.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::CustomToolCall { .. },
|
||||
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
|
||||
) => {
|
||||
summary.history_items.push(item.clone());
|
||||
summary
|
||||
.history_items
|
||||
.push(ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
|
||||
) => {
|
||||
summary.history_items.push(item.clone());
|
||||
let output = match result {
|
||||
Ok(call_tool_result) => {
|
||||
convert_call_tool_result_to_function_call_output_payload(call_tool_result)
|
||||
}
|
||||
Err(err) => FunctionCallOutputPayload {
|
||||
content: err.clone(),
|
||||
success: Some(false),
|
||||
},
|
||||
};
|
||||
summary
|
||||
.history_items
|
||||
.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output,
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::LocalShellCall {
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
},
|
||||
None,
|
||||
) => {
|
||||
summary.history_items.push(item.clone());
|
||||
summary
|
||||
.history_items
|
||||
.push(make_aborted_function_call_output(call_id.clone()));
|
||||
}
|
||||
(ResponseItem::FunctionCall { call_id, .. }, None) => {
|
||||
summary.history_items.push(item.clone());
|
||||
summary
|
||||
.history_items
|
||||
.push(make_aborted_function_call_output(call_id.clone()));
|
||||
}
|
||||
(ResponseItem::CustomToolCall { call_id, .. }, None) => {
|
||||
summary.history_items.push(item.clone());
|
||||
summary
|
||||
.history_items
|
||||
.push(make_aborted_custom_tool_call_output(call_id.clone()));
|
||||
}
|
||||
(
|
||||
ResponseItem::Reasoning {
|
||||
id,
|
||||
summary: reasoning_summary,
|
||||
content,
|
||||
encrypted_content,
|
||||
},
|
||||
None,
|
||||
) => {
|
||||
summary.history_items.push(ResponseItem::Reasoning {
|
||||
id: id.clone(),
|
||||
summary: reasoning_summary.clone(),
|
||||
content: content.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
warn!("Unexpected response item: {item:?} with response: {response:?}");
|
||||
}
|
||||
};
|
||||
if let Some(response) = response {
|
||||
summary.responses.push(response);
|
||||
}
|
||||
}
|
||||
|
||||
summary
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TurnRunResult {
|
||||
processed_items: Vec<ProcessedResponseItem>,
|
||||
@@ -2162,10 +2273,6 @@ async fn try_run_turn(
|
||||
}
|
||||
};
|
||||
|
||||
let add_completed = &mut |response_item: ProcessedResponseItem| {
|
||||
output.push_back(future::ready(Ok(response_item)).boxed());
|
||||
};
|
||||
|
||||
match event {
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
@@ -2175,13 +2282,40 @@ async fn try_run_turn(
|
||||
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
|
||||
|
||||
let response = tool_runtime.handle_tool_call(call);
|
||||
let placeholder_index = sess
|
||||
.append_processed_item(
|
||||
sub_id,
|
||||
ProcessedResponseItem {
|
||||
item: item.clone(),
|
||||
response: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let sess_for_store = Arc::clone(&sess);
|
||||
let sub_id_for_store = sub_id.to_string();
|
||||
output.push_back(
|
||||
async move {
|
||||
Ok(ProcessedResponseItem {
|
||||
let response_item = response.await?;
|
||||
if let Some(index) = placeholder_index {
|
||||
sess_for_store
|
||||
.update_processed_item_response(
|
||||
&sub_id_for_store,
|
||||
index,
|
||||
Some(response_item.clone()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let processed = ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response.await?),
|
||||
})
|
||||
response: Some(response_item),
|
||||
};
|
||||
if placeholder_index.is_none() {
|
||||
let _ = sess_for_store
|
||||
.append_processed_item(&sub_id_for_store, processed.clone())
|
||||
.await;
|
||||
}
|
||||
Ok(processed)
|
||||
}
|
||||
.boxed(),
|
||||
);
|
||||
@@ -2194,7 +2328,9 @@ async fn try_run_turn(
|
||||
item.clone(),
|
||||
)
|
||||
.await?;
|
||||
add_completed(ProcessedResponseItem { item, response });
|
||||
let processed = ProcessedResponseItem { item, response };
|
||||
let _ = sess.append_processed_item(sub_id, processed.clone()).await;
|
||||
output.push_back(future::ready(Ok(processed)).boxed());
|
||||
}
|
||||
Err(FunctionCallError::MissingLocalShellCallId) => {
|
||||
let msg = "LocalShellCall without call_id or id";
|
||||
@@ -2211,10 +2347,12 @@ async fn try_run_turn(
|
||||
success: None,
|
||||
},
|
||||
};
|
||||
add_completed(ProcessedResponseItem {
|
||||
let processed = ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response),
|
||||
});
|
||||
};
|
||||
let _ = sess.append_processed_item(sub_id, processed.clone()).await;
|
||||
output.push_back(future::ready(Ok(processed)).boxed());
|
||||
}
|
||||
Err(FunctionCallError::RespondToModel(message)) => {
|
||||
let response = ResponseInputItem::FunctionCallOutput {
|
||||
@@ -2224,10 +2362,12 @@ async fn try_run_turn(
|
||||
success: None,
|
||||
},
|
||||
};
|
||||
add_completed(ProcessedResponseItem {
|
||||
let processed = ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response),
|
||||
});
|
||||
};
|
||||
let _ = sess.append_processed_item(sub_id, processed.clone()).await;
|
||||
output.push_back(future::ready(Ok(processed)).boxed());
|
||||
}
|
||||
Err(FunctionCallError::Fatal(message)) => {
|
||||
return Err(CodexErr::Fatal(message));
|
||||
@@ -2409,6 +2549,23 @@ fn convert_call_tool_result_to_function_call_output_payload(
|
||||
}
|
||||
}
|
||||
|
||||
fn make_aborted_function_call_output(call_id: String) -> ResponseItem {
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id,
|
||||
output: FunctionCallOutputPayload {
|
||||
content: "aborted".to_string(),
|
||||
success: Some(false),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn make_aborted_custom_tool_call_output(call_id: String) -> ResponseItem {
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id,
|
||||
output: "aborted".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
|
||||
/// and records a developer message with the review output.
|
||||
pub(crate) async fn exit_review_mode(
|
||||
@@ -2492,6 +2649,9 @@ mod tests {
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::LocalShellAction;
|
||||
use codex_protocol::models::LocalShellExecAction;
|
||||
use codex_protocol::models::LocalShellStatus;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
use mcp_types::ContentBlock;
|
||||
@@ -2571,6 +2731,33 @@ mod tests {
|
||||
assert_eq!(expected, got);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aborted_function_call_output_marks_failure() {
|
||||
let output = make_aborted_function_call_output("call-123".to_string());
|
||||
|
||||
match output {
|
||||
ResponseItem::FunctionCallOutput { call_id, output } => {
|
||||
assert_eq!(call_id, "call-123");
|
||||
assert_eq!(output.content, "aborted");
|
||||
assert_eq!(output.success, Some(false));
|
||||
}
|
||||
other => panic!("unexpected response item: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aborted_custom_tool_call_output_marks_failure() {
|
||||
let output = make_aborted_custom_tool_call_output("custom-call".to_string());
|
||||
|
||||
match output {
|
||||
ResponseItem::CustomToolCallOutput { call_id, output } => {
|
||||
assert_eq!(call_id, "custom-call");
|
||||
assert_eq!(output, "aborted");
|
||||
}
|
||||
other => panic!("unexpected response item: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn model_truncation_head_tail_by_lines() {
|
||||
// Build 400 short lines so line-count limit, not byte budget, triggers truncation
|
||||
@@ -2983,6 +3170,219 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn abort_regular_task_records_pending_tool_history() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx();
|
||||
let sub_id = "sub-regular-history".to_string();
|
||||
let input = vec![InputItem::Text {
|
||||
text: "pending tool".to_string(),
|
||||
}];
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
sub_id.clone(),
|
||||
input,
|
||||
NeverEndingTask(TaskKind::Regular),
|
||||
)
|
||||
.await;
|
||||
|
||||
let call_id = "call-regular".to_string();
|
||||
sess.append_processed_item(
|
||||
&sub_id,
|
||||
ProcessedResponseItem {
|
||||
item: ResponseItem::LocalShellCall {
|
||||
id: Some("local-regular".to_string()),
|
||||
call_id: Some(call_id.clone()),
|
||||
status: LocalShellStatus::InProgress,
|
||||
action: LocalShellAction::Exec(LocalShellExecAction {
|
||||
command: vec!["curl".to_string(), "https://example.com".to_string()],
|
||||
timeout_ms: Some(1_000),
|
||||
working_directory: None,
|
||||
env: None,
|
||||
user: None,
|
||||
}),
|
||||
},
|
||||
response: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
|
||||
let evt = rx.recv().await.expect("event");
|
||||
match evt.msg {
|
||||
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
assert!(rx.try_recv().is_err());
|
||||
|
||||
let history = sess.history_snapshot().await;
|
||||
let call_index = history.iter().position(|item| {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::LocalShellCall {
|
||||
call_id: Some(id),
|
||||
..
|
||||
} if id == &call_id
|
||||
)
|
||||
});
|
||||
let output_index = history.iter().position(|item| {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::FunctionCallOutput { call_id: id, output }
|
||||
if id == &call_id
|
||||
&& output.content == "aborted"
|
||||
&& output.success == Some(false)
|
||||
)
|
||||
});
|
||||
|
||||
assert!(call_index.is_some(), "local shell call not recorded");
|
||||
assert!(output_index.is_some(), "aborted output not recorded");
|
||||
assert!(
|
||||
call_index.unwrap() < output_index.unwrap(),
|
||||
"output should follow tool call"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn abort_review_task_ignores_pending_tool_history() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx();
|
||||
let sub_id = "sub-review-history".to_string();
|
||||
let input = vec![InputItem::Text {
|
||||
text: "pending review tool".to_string(),
|
||||
}];
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
sub_id.clone(),
|
||||
input,
|
||||
NeverEndingTask(TaskKind::Review),
|
||||
)
|
||||
.await;
|
||||
|
||||
sess.append_processed_item(
|
||||
&sub_id,
|
||||
ProcessedResponseItem {
|
||||
item: ResponseItem::LocalShellCall {
|
||||
id: Some("local-review".to_string()),
|
||||
call_id: Some("call-review".to_string()),
|
||||
status: LocalShellStatus::InProgress,
|
||||
action: LocalShellAction::Exec(LocalShellExecAction {
|
||||
command: vec!["curl".to_string(), "https://example.com".to_string()],
|
||||
timeout_ms: Some(1_000),
|
||||
working_directory: None,
|
||||
env: None,
|
||||
user: None,
|
||||
}),
|
||||
},
|
||||
response: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
|
||||
let first = rx.recv().await.expect("first event");
|
||||
match first.msg {
|
||||
EventMsg::ExitedReviewMode(ev) => assert!(ev.review_output.is_none()),
|
||||
other => panic!("unexpected first event: {other:?}"),
|
||||
}
|
||||
let second = rx.recv().await.expect("second event");
|
||||
match second.msg {
|
||||
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
|
||||
other => panic!("unexpected second event: {other:?}"),
|
||||
}
|
||||
assert!(rx.try_recv().is_err());
|
||||
|
||||
let history = sess.history_snapshot().await;
|
||||
assert!(
|
||||
!history
|
||||
.iter()
|
||||
.any(|item| matches!(item, ResponseItem::LocalShellCall { .. })),
|
||||
"review mode should not record pending tool call in main history"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_pending_items_after_abort_records_aborted_tool_calls() {
|
||||
let (session, _) = make_session_and_context();
|
||||
let call_id = "call-123".to_string();
|
||||
let processed_items = vec![ProcessedResponseItem {
|
||||
item: ResponseItem::LocalShellCall {
|
||||
id: Some("local-1".to_string()),
|
||||
call_id: Some(call_id.clone()),
|
||||
status: LocalShellStatus::InProgress,
|
||||
action: LocalShellAction::Exec(LocalShellExecAction {
|
||||
command: vec!["curl".to_string(), "https://example.com".to_string()],
|
||||
timeout_ms: Some(1_000),
|
||||
working_directory: None,
|
||||
env: None,
|
||||
user: None,
|
||||
}),
|
||||
},
|
||||
response: None,
|
||||
}];
|
||||
|
||||
session
|
||||
.process_pending_items_after_abort(processed_items, false)
|
||||
.await;
|
||||
|
||||
let history = session.history_snapshot().await;
|
||||
let call_index = history.iter().position(|item| {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::LocalShellCall {
|
||||
call_id: Some(id),
|
||||
..
|
||||
} if id == &call_id
|
||||
)
|
||||
});
|
||||
let output_index = history.iter().position(|item| {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::FunctionCallOutput { call_id: id, output }
|
||||
if id == &call_id
|
||||
&& output.content == "aborted"
|
||||
&& output.success == Some(false)
|
||||
)
|
||||
});
|
||||
|
||||
assert!(call_index.is_some(), "local shell call not recorded");
|
||||
assert!(output_index.is_some(), "aborted output not recorded");
|
||||
assert!(
|
||||
call_index.unwrap() < output_index.unwrap(),
|
||||
"output should follow tool call"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_pending_items_after_abort_skips_review_mode_history() {
|
||||
let (session, _) = make_session_and_context();
|
||||
let processed_items = vec![ProcessedResponseItem {
|
||||
item: ResponseItem::LocalShellCall {
|
||||
id: Some("review-1".to_string()),
|
||||
call_id: Some("call-456".to_string()),
|
||||
status: LocalShellStatus::InProgress,
|
||||
action: LocalShellAction::Exec(LocalShellExecAction {
|
||||
command: vec!["echo".to_string()],
|
||||
timeout_ms: Some(1_000),
|
||||
working_directory: None,
|
||||
env: None,
|
||||
user: None,
|
||||
}),
|
||||
},
|
||||
response: None,
|
||||
}];
|
||||
|
||||
session
|
||||
.process_pending_items_after_abort(processed_items, true)
|
||||
.await;
|
||||
|
||||
let history = session.history_snapshot().await;
|
||||
assert!(
|
||||
history.is_empty(),
|
||||
"no history should be recorded in review mode"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fatal_tool_error_stops_turn_and_reports_error() {
|
||||
let (session, turn_context, _rx) = make_session_and_context_with_rx();
|
||||
|
||||
@@ -9,6 +9,7 @@ use tokio::task::AbortHandle;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::codex::ProcessedResponseItem;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::tasks::SessionTask;
|
||||
|
||||
@@ -71,6 +72,7 @@ impl ActiveTurn {
|
||||
pub(crate) struct TurnState {
|
||||
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||
pending_input: Vec<ResponseInputItem>,
|
||||
pending_processed_items: Vec<ProcessedResponseItem>,
|
||||
}
|
||||
|
||||
impl TurnState {
|
||||
@@ -92,6 +94,7 @@ impl TurnState {
|
||||
pub(crate) fn clear_pending(&mut self) {
|
||||
self.pending_approvals.clear();
|
||||
self.pending_input.clear();
|
||||
self.pending_processed_items.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
|
||||
@@ -107,6 +110,31 @@ impl TurnState {
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_processed_item(&mut self, item: ProcessedResponseItem) -> usize {
|
||||
self.pending_processed_items.push(item);
|
||||
self.pending_processed_items.len() - 1
|
||||
}
|
||||
|
||||
pub(crate) fn take_processed_items(&mut self) -> Vec<ProcessedResponseItem> {
|
||||
if self.pending_processed_items.is_empty() {
|
||||
Vec::with_capacity(0)
|
||||
} else {
|
||||
let mut ret = Vec::new();
|
||||
std::mem::swap(&mut ret, &mut self.pending_processed_items);
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_processed_item_response(
|
||||
&mut self,
|
||||
index: usize,
|
||||
response: Option<ResponseInputItem>,
|
||||
) {
|
||||
if let Some(processed) = self.pending_processed_items.get_mut(index) {
|
||||
processed.response = response;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveTurn {
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::codex::ProcessedResponseItem;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::protocol::Event;
|
||||
@@ -94,7 +95,18 @@ impl Session {
|
||||
}
|
||||
|
||||
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
|
||||
for (sub_id, task) in self.take_all_running_tasks().await {
|
||||
let (pending_processed_items, tasks) = self.take_all_running_tasks().await;
|
||||
if !pending_processed_items.is_empty() {
|
||||
// Assume all pending items belong to the same turn; use the first task's kind to
|
||||
// determine whether this was a review task.
|
||||
let is_review_mode = tasks
|
||||
.first()
|
||||
.map(|(_, task)| task.kind == TaskKind::Review)
|
||||
.unwrap_or(false);
|
||||
self.process_pending_items_after_abort(pending_processed_items, is_review_mode)
|
||||
.await;
|
||||
}
|
||||
for (sub_id, task) in tasks {
|
||||
self.handle_task_abort(sub_id, task, reason.clone()).await;
|
||||
}
|
||||
}
|
||||
@@ -125,15 +137,21 @@ impl Session {
|
||||
*active = Some(turn);
|
||||
}
|
||||
|
||||
async fn take_all_running_tasks(&self) -> Vec<(String, RunningTask)> {
|
||||
async fn take_all_running_tasks(
|
||||
&self,
|
||||
) -> (Vec<ProcessedResponseItem>, Vec<(String, RunningTask)>) {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.take() {
|
||||
Some(mut at) => {
|
||||
let pending_processed_items = {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.take_processed_items()
|
||||
};
|
||||
at.clear_pending().await;
|
||||
let tasks = at.drain_tasks();
|
||||
tasks.into_iter().collect()
|
||||
(pending_processed_items, tasks.into_iter().collect())
|
||||
}
|
||||
None => Vec::new(),
|
||||
None => (Vec::new(), Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_core::protocol::EventMsg;
|
||||
@@ -5,15 +6,18 @@ use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use serde_json::json;
|
||||
|
||||
/// Integration test: spawn a long‑running shell tool via a mocked Responses SSE
|
||||
/// function call, then interrupt the session and expect TurnAborted.
|
||||
/// Confirms that interrupting a long-running tool emits the expected
|
||||
/// `TurnAborted` event. This covers the user-facing behaviour: once the shell
|
||||
/// command is interrupted we must notify the UI immediately.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
let command = vec![
|
||||
@@ -35,11 +39,11 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_once(&server, body).await;
|
||||
|
||||
let codex = test_codex().build(&server).await.unwrap().codex;
|
||||
let fixture = test_codex().build(&server).await.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
let wait_timeout = Duration::from_secs(5);
|
||||
|
||||
// Kick off a turn that triggers the function call.
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
@@ -49,7 +53,6 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait until the exec begins to avoid a race, then interrupt.
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
|
||||
@@ -59,7 +62,6 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
// Expect TurnAborted soon after.
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::TurnAborted(_)),
|
||||
@@ -67,3 +69,122 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// After an interrupt we expect the next request to the model to include both
|
||||
/// the original tool call and an `"aborted"` `function_call_output`. This test
|
||||
/// exercises the follow-up flow: it sends another user turn, inspects the mock
|
||||
/// responses server, and ensures the model receives the synthesized abort.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_tool_records_history_entries() {
|
||||
let command = vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
"sleep 60".to_string(),
|
||||
];
|
||||
let call_id = "call-history";
|
||||
|
||||
let args = json!({
|
||||
"command": command,
|
||||
"timeout_ms": 60_000
|
||||
})
|
||||
.to_string();
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-history"),
|
||||
ev_function_call(call_id, "shell", &args),
|
||||
ev_completed("resp-history"),
|
||||
]);
|
||||
let follow_up_body = sse(vec![
|
||||
ev_response_created("resp-followup"),
|
||||
ev_completed("resp-followup"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
|
||||
|
||||
let fixture = test_codex().build(&server).await.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
let wait_timeout = Duration::from_secs(5);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: "start history recording".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
|
||||
wait_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::TurnAborted(_)),
|
||||
wait_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: "follow up".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::TaskComplete(_)),
|
||||
wait_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
assert!(
|
||||
requests.len() >= 2,
|
||||
"expected at least two calls to the responses API"
|
||||
);
|
||||
|
||||
let mut call_seen = false;
|
||||
let mut abort_seen = false;
|
||||
|
||||
for request in requests {
|
||||
let input = request.input();
|
||||
for window in input.windows(2) {
|
||||
let current = &window[0];
|
||||
let next = &window[1];
|
||||
if current.get("type").and_then(|v| v.as_str()) == Some("function_call")
|
||||
&& current.get("call_id").and_then(|v| v.as_str()) == Some(call_id)
|
||||
{
|
||||
call_seen = true;
|
||||
if next.get("type").and_then(|v| v.as_str()) == Some("function_call_output")
|
||||
&& next.get("call_id").and_then(|v| v.as_str()) == Some(call_id)
|
||||
{
|
||||
let content_matches =
|
||||
next.get("output").and_then(serde_json::Value::as_str) == Some("aborted");
|
||||
if content_matches {
|
||||
abort_seen = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if call_seen && abort_seen {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(call_seen, "function call not recorded in responses payload");
|
||||
assert!(
|
||||
abort_seen,
|
||||
"aborted function call output not recorded in responses payload"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user