Compare commits

...

3 Commits

Author SHA1 Message Date
Ahmed Ibrahim
fdf9b7566f tests 2025-10-16 17:18:01 -07:00
Ahmed Ibrahim
610cd53595 tests 2025-10-16 17:06:05 -07:00
Ahmed Ibrahim
a6b0d4b210 Use helper for aborted custom tool outputs 2025-10-16 15:41:46 -07:00
4 changed files with 691 additions and 124 deletions

View File

@@ -1057,6 +1057,80 @@ impl Session {
}
}
/// Track a freshly emitted `ResponseItem` for the active turn.
///
/// When the model stream produces an item (e.g. a tool call) we register it
/// immediately so abort paths can still flush it into history even if the
/// matching tool future has not completed yet. The returned index lets the
/// caller later patch the slot when the response arrives.
pub async fn append_processed_item(
&self,
sub_id: &str,
item: ProcessedResponseItem,
) -> Option<usize> {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
Some(ts.push_processed_item(item))
} else {
trace!(sub_id, "dropping processed item; turn not active");
None
}
}
/// Patch a previously queued `ProcessedResponseItem` with the tool
/// response.
///
/// Tool futures resolve outside of the stream loop. Once a result is
/// available we update the placeholder slot created by
/// [`append_processed_item`] so the turn state reflects the final
/// `ResponseInputItem`. If the turn has already ended (e.g. abort), we skip
/// the update—the abort handler will synthesize the appropriate output.
pub async fn update_processed_item_response(
&self,
sub_id: &str,
index: usize,
response: Option<ResponseInputItem>,
) {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
ts.set_processed_item_response(index, response);
} else {
trace!(
sub_id,
index, "dropping processed item update; turn not active"
);
}
}
pub async fn take_processed_items(&self, _sub_id: &str) -> Vec<ProcessedResponseItem> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.take_processed_items()
}
None => Vec::with_capacity(0),
}
}
pub(crate) async fn process_pending_items_after_abort(
&self,
processed_items: Vec<ProcessedResponseItem>,
is_review_mode: bool,
) {
if processed_items.is_empty() {
return;
}
let ProcessedItemsSummary { history_items, .. } =
summarize_processed_items(processed_items);
if history_items.is_empty() || is_review_mode {
return;
}
self.record_conversation_items(&history_items).await;
}
pub async fn call_tool(
&self,
server: &str,
@@ -1753,107 +1827,20 @@ pub(crate) async fn run_task(
let token_limit_reached = total_usage_tokens
.map(|tokens| (tokens as i64) >= limit)
.unwrap_or(false);
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(
call_tool_result,
)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
},
);
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
let ProcessedItemsSummary {
history_items,
responses,
} = summarize_processed_items(processed_items);
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
// Clear the pending buffer before any awaited work so abort paths cannot replay
// already-recorded items.
let _ = sess.take_processed_items(&sub_id).await;
if !history_items.is_empty() {
if is_review_mode {
review_thread_history
.extend(items_to_record_in_conversation_history.clone());
review_thread_history.extend(history_items.clone());
} else {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
sess.record_conversation_items(&history_items).await;
}
}
@@ -1882,9 +1869,7 @@ pub(crate) async fn run_task(
auto_compact_recently_attempted = false;
if responses.is_empty() {
last_agent_message = get_last_assistant_message_from_turn(
&items_to_record_in_conversation_history,
);
last_agent_message = get_last_assistant_message_from_turn(&history_items);
sess.notifier()
.notify(&UserNotification::AgentTurnComplete {
thread_id: sess.conversation_id.to_string(),
@@ -2046,12 +2031,138 @@ async fn run_turn(
/// events map to a `ResponseItem`. A `ResponseItem` may need to be
/// "handled" such that it produces a `ResponseInputItem` that needs to be
/// sent back to the model on the next turn.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) struct ProcessedResponseItem {
pub(crate) item: ResponseItem,
pub(crate) response: Option<ResponseInputItem>,
}
struct ProcessedItemsSummary {
history_items: Vec<ResponseItem>,
responses: Vec<ResponseInputItem>,
}
fn summarize_processed_items(processed_items: Vec<ProcessedResponseItem>) -> ProcessedItemsSummary {
let mut summary = ProcessedItemsSummary {
history_items: Vec::new(),
responses: Vec::new(),
};
for processed_response_item in processed_items {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
summary.history_items.push(item.clone());
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
summary.history_items.push(item.clone());
summary
.history_items
.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
summary.history_items.push(item.clone());
summary
.history_items
.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
summary.history_items.push(item.clone());
summary
.history_items
.push(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
summary.history_items.push(item.clone());
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(call_tool_result)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
summary
.history_items
.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
});
}
(
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
},
None,
) => {
summary.history_items.push(item.clone());
summary
.history_items
.push(make_aborted_function_call_output(call_id.clone()));
}
(ResponseItem::FunctionCall { call_id, .. }, None) => {
summary.history_items.push(item.clone());
summary
.history_items
.push(make_aborted_function_call_output(call_id.clone()));
}
(ResponseItem::CustomToolCall { call_id, .. }, None) => {
summary.history_items.push(item.clone());
summary
.history_items
.push(make_aborted_custom_tool_call_output(call_id.clone()));
}
(
ResponseItem::Reasoning {
id,
summary: reasoning_summary,
content,
encrypted_content,
},
None,
) => {
summary.history_items.push(ResponseItem::Reasoning {
id: id.clone(),
summary: reasoning_summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
summary.responses.push(response);
}
}
summary
}
#[derive(Debug)]
struct TurnRunResult {
processed_items: Vec<ProcessedResponseItem>,
@@ -2162,10 +2273,6 @@ async fn try_run_turn(
}
};
let add_completed = &mut |response_item: ProcessedResponseItem| {
output.push_back(future::ready(Ok(response_item)).boxed());
};
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
@@ -2175,13 +2282,40 @@ async fn try_run_turn(
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
let response = tool_runtime.handle_tool_call(call);
let placeholder_index = sess
.append_processed_item(
sub_id,
ProcessedResponseItem {
item: item.clone(),
response: None,
},
)
.await;
let sess_for_store = Arc::clone(&sess);
let sub_id_for_store = sub_id.to_string();
output.push_back(
async move {
Ok(ProcessedResponseItem {
let response_item = response.await?;
if let Some(index) = placeholder_index {
sess_for_store
.update_processed_item_response(
&sub_id_for_store,
index,
Some(response_item.clone()),
)
.await;
}
let processed = ProcessedResponseItem {
item,
response: Some(response.await?),
})
response: Some(response_item),
};
if placeholder_index.is_none() {
let _ = sess_for_store
.append_processed_item(&sub_id_for_store, processed.clone())
.await;
}
Ok(processed)
}
.boxed(),
);
@@ -2194,7 +2328,9 @@ async fn try_run_turn(
item.clone(),
)
.await?;
add_completed(ProcessedResponseItem { item, response });
let processed = ProcessedResponseItem { item, response };
let _ = sess.append_processed_item(sub_id, processed.clone()).await;
output.push_back(future::ready(Ok(processed)).boxed());
}
Err(FunctionCallError::MissingLocalShellCallId) => {
let msg = "LocalShellCall without call_id or id";
@@ -2211,10 +2347,12 @@ async fn try_run_turn(
success: None,
},
};
add_completed(ProcessedResponseItem {
let processed = ProcessedResponseItem {
item,
response: Some(response),
});
};
let _ = sess.append_processed_item(sub_id, processed.clone()).await;
output.push_back(future::ready(Ok(processed)).boxed());
}
Err(FunctionCallError::RespondToModel(message)) => {
let response = ResponseInputItem::FunctionCallOutput {
@@ -2224,10 +2362,12 @@ async fn try_run_turn(
success: None,
},
};
add_completed(ProcessedResponseItem {
let processed = ProcessedResponseItem {
item,
response: Some(response),
});
};
let _ = sess.append_processed_item(sub_id, processed.clone()).await;
output.push_back(future::ready(Ok(processed)).boxed());
}
Err(FunctionCallError::Fatal(message)) => {
return Err(CodexErr::Fatal(message));
@@ -2409,6 +2549,23 @@ fn convert_call_tool_result_to_function_call_output_payload(
}
}
fn make_aborted_function_call_output(call_id: String) -> ResponseItem {
ResponseItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: "aborted".to_string(),
success: Some(false),
},
}
}
fn make_aborted_custom_tool_call_output(call_id: String) -> ResponseItem {
ResponseItem::CustomToolCallOutput {
call_id,
output: "aborted".to_string(),
}
}
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
/// and records a developer message with the review output.
pub(crate) async fn exit_review_mode(
@@ -2492,6 +2649,9 @@ mod tests {
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_app_server_protocol::AuthMode;
use codex_protocol::models::ContentItem;
use codex_protocol::models::LocalShellAction;
use codex_protocol::models::LocalShellExecAction;
use codex_protocol::models::LocalShellStatus;
use codex_protocol::models::ResponseItem;
use mcp_types::ContentBlock;
@@ -2571,6 +2731,33 @@ mod tests {
assert_eq!(expected, got);
}
#[test]
fn aborted_function_call_output_marks_failure() {
let output = make_aborted_function_call_output("call-123".to_string());
match output {
ResponseItem::FunctionCallOutput { call_id, output } => {
assert_eq!(call_id, "call-123");
assert_eq!(output.content, "aborted");
assert_eq!(output.success, Some(false));
}
other => panic!("unexpected response item: {other:?}"),
}
}
#[test]
fn aborted_custom_tool_call_output_marks_failure() {
let output = make_aborted_custom_tool_call_output("custom-call".to_string());
match output {
ResponseItem::CustomToolCallOutput { call_id, output } => {
assert_eq!(call_id, "custom-call");
assert_eq!(output, "aborted");
}
other => panic!("unexpected response item: {other:?}"),
}
}
#[test]
fn model_truncation_head_tail_by_lines() {
// Build 400 short lines so line-count limit, not byte budget, triggers truncation
@@ -2983,6 +3170,219 @@ mod tests {
);
}
#[tokio::test]
async fn abort_regular_task_records_pending_tool_history() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular-history".to_string();
let input = vec![InputItem::Text {
text: "pending tool".to_string(),
}];
sess.spawn_task(
Arc::clone(&tc),
sub_id.clone(),
input,
NeverEndingTask(TaskKind::Regular),
)
.await;
let call_id = "call-regular".to_string();
sess.append_processed_item(
&sub_id,
ProcessedResponseItem {
item: ResponseItem::LocalShellCall {
id: Some("local-regular".to_string()),
call_id: Some(call_id.clone()),
status: LocalShellStatus::InProgress,
action: LocalShellAction::Exec(LocalShellExecAction {
command: vec!["curl".to_string(), "https://example.com".to_string()],
timeout_ms: Some(1_000),
working_directory: None,
env: None,
user: None,
}),
},
response: None,
},
)
.await;
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let evt = rx.recv().await.expect("event");
match evt.msg {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected event: {other:?}"),
}
assert!(rx.try_recv().is_err());
let history = sess.history_snapshot().await;
let call_index = history.iter().position(|item| {
matches!(
item,
ResponseItem::LocalShellCall {
call_id: Some(id),
..
} if id == &call_id
)
});
let output_index = history.iter().position(|item| {
matches!(
item,
ResponseItem::FunctionCallOutput { call_id: id, output }
if id == &call_id
&& output.content == "aborted"
&& output.success == Some(false)
)
});
assert!(call_index.is_some(), "local shell call not recorded");
assert!(output_index.is_some(), "aborted output not recorded");
assert!(
call_index.unwrap() < output_index.unwrap(),
"output should follow tool call"
);
}
#[tokio::test]
async fn abort_review_task_ignores_pending_tool_history() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-review-history".to_string();
let input = vec![InputItem::Text {
text: "pending review tool".to_string(),
}];
sess.spawn_task(
Arc::clone(&tc),
sub_id.clone(),
input,
NeverEndingTask(TaskKind::Review),
)
.await;
sess.append_processed_item(
&sub_id,
ProcessedResponseItem {
item: ResponseItem::LocalShellCall {
id: Some("local-review".to_string()),
call_id: Some("call-review".to_string()),
status: LocalShellStatus::InProgress,
action: LocalShellAction::Exec(LocalShellExecAction {
command: vec!["curl".to_string(), "https://example.com".to_string()],
timeout_ms: Some(1_000),
working_directory: None,
env: None,
user: None,
}),
},
response: None,
},
)
.await;
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let first = rx.recv().await.expect("first event");
match first.msg {
EventMsg::ExitedReviewMode(ev) => assert!(ev.review_output.is_none()),
other => panic!("unexpected first event: {other:?}"),
}
let second = rx.recv().await.expect("second event");
match second.msg {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected second event: {other:?}"),
}
assert!(rx.try_recv().is_err());
let history = sess.history_snapshot().await;
assert!(
!history
.iter()
.any(|item| matches!(item, ResponseItem::LocalShellCall { .. })),
"review mode should not record pending tool call in main history"
);
}
#[tokio::test]
async fn process_pending_items_after_abort_records_aborted_tool_calls() {
let (session, _) = make_session_and_context();
let call_id = "call-123".to_string();
let processed_items = vec![ProcessedResponseItem {
item: ResponseItem::LocalShellCall {
id: Some("local-1".to_string()),
call_id: Some(call_id.clone()),
status: LocalShellStatus::InProgress,
action: LocalShellAction::Exec(LocalShellExecAction {
command: vec!["curl".to_string(), "https://example.com".to_string()],
timeout_ms: Some(1_000),
working_directory: None,
env: None,
user: None,
}),
},
response: None,
}];
session
.process_pending_items_after_abort(processed_items, false)
.await;
let history = session.history_snapshot().await;
let call_index = history.iter().position(|item| {
matches!(
item,
ResponseItem::LocalShellCall {
call_id: Some(id),
..
} if id == &call_id
)
});
let output_index = history.iter().position(|item| {
matches!(
item,
ResponseItem::FunctionCallOutput { call_id: id, output }
if id == &call_id
&& output.content == "aborted"
&& output.success == Some(false)
)
});
assert!(call_index.is_some(), "local shell call not recorded");
assert!(output_index.is_some(), "aborted output not recorded");
assert!(
call_index.unwrap() < output_index.unwrap(),
"output should follow tool call"
);
}
#[tokio::test]
async fn process_pending_items_after_abort_skips_review_mode_history() {
let (session, _) = make_session_and_context();
let processed_items = vec![ProcessedResponseItem {
item: ResponseItem::LocalShellCall {
id: Some("review-1".to_string()),
call_id: Some("call-456".to_string()),
status: LocalShellStatus::InProgress,
action: LocalShellAction::Exec(LocalShellExecAction {
command: vec!["echo".to_string()],
timeout_ms: Some(1_000),
working_directory: None,
env: None,
user: None,
}),
},
response: None,
}];
session
.process_pending_items_after_abort(processed_items, true)
.await;
let history = session.history_snapshot().await;
assert!(
history.is_empty(),
"no history should be recorded in review mode"
);
}
#[tokio::test]
async fn fatal_tool_error_stops_turn_and_reports_error() {
let (session, turn_context, _rx) = make_session_and_context_with_rx();

View File

@@ -9,6 +9,7 @@ use tokio::task::AbortHandle;
use codex_protocol::models::ResponseInputItem;
use tokio::sync::oneshot;
use crate::codex::ProcessedResponseItem;
use crate::protocol::ReviewDecision;
use crate::tasks::SessionTask;
@@ -71,6 +72,7 @@ impl ActiveTurn {
pub(crate) struct TurnState {
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
pending_processed_items: Vec<ProcessedResponseItem>,
}
impl TurnState {
@@ -92,6 +94,7 @@ impl TurnState {
pub(crate) fn clear_pending(&mut self) {
self.pending_approvals.clear();
self.pending_input.clear();
self.pending_processed_items.clear();
}
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
@@ -107,6 +110,31 @@ impl TurnState {
ret
}
}
pub(crate) fn push_processed_item(&mut self, item: ProcessedResponseItem) -> usize {
self.pending_processed_items.push(item);
self.pending_processed_items.len() - 1
}
pub(crate) fn take_processed_items(&mut self) -> Vec<ProcessedResponseItem> {
if self.pending_processed_items.is_empty() {
Vec::with_capacity(0)
} else {
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut self.pending_processed_items);
ret
}
}
pub(crate) fn set_processed_item_response(
&mut self,
index: usize,
response: Option<ResponseInputItem>,
) {
if let Some(processed) = self.pending_processed_items.get_mut(index) {
processed.response = response;
}
}
}
impl ActiveTurn {

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use tracing::trace;
use crate::codex::ProcessedResponseItem;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::protocol::Event;
@@ -94,7 +95,18 @@ impl Session {
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for (sub_id, task) in self.take_all_running_tasks().await {
let (pending_processed_items, tasks) = self.take_all_running_tasks().await;
if !pending_processed_items.is_empty() {
// Assume all pending items belong to the same turn; use the first task's kind to
// determine whether this was a review task.
let is_review_mode = tasks
.first()
.map(|(_, task)| task.kind == TaskKind::Review)
.unwrap_or(false);
self.process_pending_items_after_abort(pending_processed_items, is_review_mode)
.await;
}
for (sub_id, task) in tasks {
self.handle_task_abort(sub_id, task, reason.clone()).await;
}
}
@@ -125,15 +137,21 @@ impl Session {
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<(String, RunningTask)> {
async fn take_all_running_tasks(
&self,
) -> (Vec<ProcessedResponseItem>, Vec<(String, RunningTask)>) {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
let pending_processed_items = {
let mut ts = at.turn_state.lock().await;
ts.take_processed_items()
};
at.clear_pending().await;
let tasks = at.drain_tasks();
tasks.into_iter().collect()
(pending_processed_items, tasks.into_iter().collect())
}
None => Vec::new(),
None => (Vec::new(), Vec::new()),
}
}

View File

@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration;
use codex_core::protocol::EventMsg;
@@ -5,15 +6,18 @@ use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event_with_timeout;
use serde_json::json;
/// Integration test: spawn a longrunning shell tool via a mocked Responses SSE
/// function call, then interrupt the session and expect TurnAborted.
/// Confirms that interrupting a long-running tool emits the expected
/// `TurnAborted` event. This covers the user-facing behaviour: once the shell
/// command is interrupted we must notify the UI immediately.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_long_running_tool_emits_turn_aborted() {
let command = vec![
@@ -35,11 +39,11 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
let server = start_mock_server().await;
mount_sse_once(&server, body).await;
let codex = test_codex().build(&server).await.unwrap().codex;
let fixture = test_codex().build(&server).await.unwrap();
let codex = Arc::clone(&fixture.codex);
let wait_timeout = Duration::from_secs(5);
// Kick off a turn that triggers the function call.
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
@@ -49,7 +53,6 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
.await
.unwrap();
// Wait until the exec begins to avoid a race, then interrupt.
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
@@ -59,7 +62,6 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
codex.submit(Op::Interrupt).await.unwrap();
// Expect TurnAborted soon after.
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TurnAborted(_)),
@@ -67,3 +69,122 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
)
.await;
}
/// After an interrupt we expect the next request to the model to include both
/// the original tool call and an `"aborted"` `function_call_output`. This test
/// exercises the follow-up flow: it sends another user turn, inspects the mock
/// responses server, and ensures the model receives the synthesized abort.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_tool_records_history_entries() {
let command = vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 60".to_string(),
];
let call_id = "call-history";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-history"),
ev_function_call(call_id, "shell", &args),
ev_completed("resp-history"),
]);
let follow_up_body = sse(vec![
ev_response_created("resp-followup"),
ev_completed("resp-followup"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
let fixture = test_codex().build(&server).await.unwrap();
let codex = Arc::clone(&fixture.codex);
let wait_timeout = Duration::from_secs(5);
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "start history recording".into(),
}],
})
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
wait_timeout,
)
.await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TurnAborted(_)),
wait_timeout,
)
.await;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "follow up".into(),
}],
})
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
wait_timeout,
)
.await;
let requests = response_mock.requests();
assert!(
requests.len() >= 2,
"expected at least two calls to the responses API"
);
let mut call_seen = false;
let mut abort_seen = false;
for request in requests {
let input = request.input();
for window in input.windows(2) {
let current = &window[0];
let next = &window[1];
if current.get("type").and_then(|v| v.as_str()) == Some("function_call")
&& current.get("call_id").and_then(|v| v.as_str()) == Some(call_id)
{
call_seen = true;
if next.get("type").and_then(|v| v.as_str()) == Some("function_call_output")
&& next.get("call_id").and_then(|v| v.as_str()) == Some(call_id)
{
let content_matches =
next.get("output").and_then(serde_json::Value::as_str) == Some("aborted");
if content_matches {
abort_seen = true;
break;
}
}
}
}
if call_seen && abort_seen {
break;
}
}
assert!(call_seen, "function call not recorded in responses payload");
assert!(
abort_seen,
"aborted function call output not recorded in responses payload"
);
}