This commit is contained in:
Ahmed Ibrahim
2025-10-23 07:52:05 -07:00
parent d830ec9678
commit bb26652be1
6 changed files with 172 additions and 145 deletions

View File

@@ -855,7 +855,7 @@ impl Session {
/// Records input items: always append to conversation history and
/// persist these response items to rollout.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_into_history(items).await;
self.persist_rollout_response_items(items).await;
}
@@ -1608,13 +1608,14 @@ pub(crate) async fn run_task(
let token_limit_reached = total_usage_tokens
.map(|tokens| tokens >= limit)
.unwrap_or(false);
let (responses, items_to_record_in_conversation_history) = process_items(
processed_items,
is_review_mode,
&mut review_thread_history,
&sess,
)
.await;
let (responses, items_to_record_in_conversation_history) =
crate::response_processing::process_items(
processed_items,
is_review_mode,
&mut review_thread_history,
&sess,
)
.await;
if token_limit_reached {
if auto_compact_recently_attempted {
@@ -1656,7 +1657,7 @@ pub(crate) async fn run_task(
Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
}) => {
let _ = process_items(
let _ = crate::response_processing::process_items(
processed_items,
is_review_mode,
&mut review_thread_history,
@@ -1697,107 +1698,6 @@ pub(crate) async fn run_task(
last_agent_message
}
async fn process_items(
processed_items: Vec<ProcessedResponseItem>,
is_review_mode: bool,
review_thread_history: &mut ConversationHistory,
sess: &Session,
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(call_tool_result)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
});
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
if is_review_mode {
review_thread_history.record_items(items_to_record_in_conversation_history.iter());
} else {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
}
}
(responses, items_to_record_in_conversation_history)
}
/// Parse the review output; when not valid JSON, build a structured
/// fallback that carries the plain text as the overall explanation.
///
@@ -2193,7 +2093,7 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
}
})
}
fn convert_call_tool_result_to_function_call_output_payload(
pub(crate) fn convert_call_tool_result_to_function_call_output_payload(
call_tool_result: &CallToolResult,
) -> FunctionCallOutputPayload {
let CallToolResult {

View File

@@ -54,6 +54,7 @@ pub enum SandboxErr {
#[derive(Error, Debug)]
pub enum CodexErr {
// todo(aibrahim): git rid of this error carrying the dangling artifacts
#[error("turn aborted")]
TurnAborted {
dangling_artifacts: Vec<ProcessedResponseItem>,

View File

@@ -36,6 +36,7 @@ mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
mod response_processing;
pub mod sandboxing;
pub mod token_data;
mod truncate;

View File

@@ -0,0 +1,112 @@
use crate::codex::Session;
use crate::conversation_history::ConversationHistory;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use tracing::warn;
/// Process streamed `ResponseItem`s from the model into the pair of:
/// - items we should record in conversation history; and
/// - `ResponseInputItem`s to send back to the model on the next turn.
pub(crate) async fn process_items(
processed_items: Vec<crate::codex::ProcessedResponseItem>,
is_review_mode: bool,
review_thread_history: &mut ConversationHistory,
sess: &Session,
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
crate::codex::convert_call_tool_result_to_function_call_output_payload(
call_tool_result,
)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
});
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
if is_review_mode {
review_thread_history.record_items(items_to_record_in_conversation_history.iter());
} else {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
}
}
(responses, items_to_record_in_conversation_history)
}

View File

@@ -35,6 +35,22 @@ impl ResponseMock {
pub fn requests(&self) -> Vec<ResponsesRequest> {
self.requests.lock().unwrap().clone()
}
/// Returns true if any captured request contains a `function_call` with the
/// provided `call_id`.
pub fn saw_function_call(&self, call_id: &str) -> bool {
self.requests()
.iter()
.any(|req| req.has_function_call(call_id))
}
/// Returns the `output` string for a matching `function_call_output` with
/// the provided `call_id`, searching across all captured requests.
pub fn function_call_output_text(&self, call_id: &str) -> Option<String> {
self.requests()
.iter()
.find_map(|req| req.function_call_output_text(call_id))
}
}
#[derive(Debug, Clone)]
@@ -70,6 +86,27 @@ impl ResponsesRequest {
.unwrap_or_else(|| panic!("function call output {call_id} item not found in request"))
}
/// Returns true if this request's `input` contains a `function_call` with
/// the specified `call_id`.
pub fn has_function_call(&self, call_id: &str) -> bool {
self.input().iter().any(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
}
/// If present, returns the `output` string of the `function_call_output`
/// entry matching `call_id` in this request's `input`.
pub fn function_call_output_text(&self, call_id: &str) -> Option<String> {
let item = self.input().iter().find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})?;
item.get("output")
.and_then(Value::as_str)
.map(str::to_string)
}
pub fn header(&self, name: &str) -> Option<String> {
self.0
.headers

View File

@@ -105,7 +105,7 @@ async fn interrupt_tool_records_history_entries() {
let fixture = test_codex().build(&server).await.unwrap();
let codex = Arc::clone(&fixture.codex);
let wait_timeout = Duration::from_secs(5);
let wait_timeout = Duration::from_secs(0.1);
codex
.submit(Op::UserInput {
@@ -150,42 +150,18 @@ async fn interrupt_tool_records_history_entries() {
let requests = response_mock.requests();
assert!(
requests.len() >= 2,
"expected at least two calls to the responses API"
requests.len() == 2,
"expected two calls to the responses API, got {}",
requests.len()
);
let mut call_seen = false;
let mut abort_seen = false;
for request in requests {
let input = request.input();
for window in input.windows(2) {
let current = &window[0];
let next = &window[1];
if current.get("type").and_then(|v| v.as_str()) == Some("function_call")
&& current.get("call_id").and_then(|v| v.as_str()) == Some(call_id)
{
call_seen = true;
if next.get("type").and_then(|v| v.as_str()) == Some("function_call_output")
&& next.get("call_id").and_then(|v| v.as_str()) == Some(call_id)
{
let content_matches =
next.get("output").and_then(serde_json::Value::as_str) == Some("aborted");
if content_matches {
abort_seen = true;
break;
}
}
}
}
if call_seen && abort_seen {
break;
}
}
assert!(call_seen, "function call not recorded in responses payload");
assert!(
abort_seen,
response_mock.saw_function_call(call_id),
"function call not recorded in responses payload"
);
assert_eq!(
response_mock.function_call_output_text(call_id).as_deref(),
Some("aborted"),
"aborted function call output not recorded in responses payload"
);
}