This commit is contained in:
Ahmed Ibrahim
2025-10-22 15:39:34 -07:00
parent 273819aaae
commit 3bf76f79f0
3 changed files with 180 additions and 127 deletions

View File

@@ -1608,99 +1608,8 @@ pub(crate) async fn run_task(
let token_limit_reached = total_usage_tokens
.map(|tokens| tokens >= limit)
.unwrap_or(false);
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(
call_tool_result,
)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
},
);
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
let (responses, items_to_record_in_conversation_history) =
process_items(processed_items);
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
if is_review_mode {
@@ -1711,7 +1620,6 @@ pub(crate) async fn run_task(
.await;
}
}
if token_limit_reached {
if auto_compact_recently_attempted {
let limit_str = limit.to_string();
@@ -1749,7 +1657,11 @@ pub(crate) async fn run_task(
}
continue;
}
Err(CodexErr::TurnAborted) => {
Err(CodexErr::TurnAborted { processed_items }) => {
let (_responses, items_to_record_in_conversation_history) =
process_items(processed_items);
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
// Aborted turn is reported via a different event.
break;
}
@@ -1784,6 +1696,94 @@ pub(crate) async fn run_task(
last_agent_message
}
fn process_items(
processed_items: Vec<ProcessedResponseItem>,
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(call_tool_result)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
});
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
(responses, items_to_record_in_conversation_history)
}
/// Parse the review output; when not valid JSON, build a structured
/// fallback that carries the plain text as the overall explanation.
///
@@ -1850,7 +1850,9 @@ async fn run_turn(
.await
{
Ok(output) => return Ok(output),
Err(CodexErr::TurnAborted) => return Err(CodexErr::TurnAborted),
Err(CodexErr::TurnAborted { processed_items }) => {
return Err(CodexErr::TurnAborted { processed_items });
}
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
Err(e @ CodexErr::Fatal(_)) => return Err(e),
@@ -1954,7 +1956,15 @@ async fn try_run_turn(
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().or_cancel(&cancellation_token).await?;
let event = match stream.next().or_cancel(&cancellation_token).await {
Ok(event) => event,
Err(codex_async_utils::CancelErr::Cancelled) => {
info!("Turn aborted");
let processed_items =
finalize_turn(&sess, &turn_context, &turn_diff_tracker, output).await?;
return Err(CodexErr::TurnAborted { processed_items });
}
};
let event = match event {
Some(res) => res?,
@@ -1978,7 +1988,8 @@ async fn try_run_turn(
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
let response = tool_runtime.handle_tool_call(call);
let response =
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
output.push_back(
async move {
@@ -2060,27 +2071,12 @@ async fn try_run_turn(
} => {
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
.await;
let processed_items = output
.try_collect()
.or_cancel(&cancellation_token)
.await??;
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
sess.send_event(&turn_context, msg).await;
}
let result = TurnRunResult {
let processed_items =
finalize_turn(&sess, &turn_context, &turn_diff_tracker, output).await?;
return Ok(TurnRunResult {
processed_items,
total_token_usage: token_usage.clone(),
};
return Ok(result);
});
}
ResponseEvent::OutputTextDelta(delta) => {
// In review child threads, suppress assistant text deltas; the
@@ -2113,6 +2109,26 @@ async fn try_run_turn(
}
}
async fn finalize_turn<'a>(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
turn_diff_tracker: &SharedTurnDiffTracker,
output: FuturesOrdered<BoxFuture<'a, CodexResult<ProcessedResponseItem>>>,
) -> CodexResult<Vec<ProcessedResponseItem>> {
let processed_items = output.try_collect().await?;
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
sess.send_event(turn_context, msg).await;
}
Ok(processed_items)
}
async fn handle_non_tool_response_item(
sess: &Session,
turn_context: Arc<TurnContext>,

View File

@@ -1,3 +1,4 @@
use crate::codex::ProcessedResponseItem;
use crate::exec::ExecToolCallOutput;
use crate::token_data::KnownPlan;
use crate::token_data::PlanType;
@@ -54,7 +55,9 @@ pub enum SandboxErr {
#[derive(Error, Debug)]
pub enum CodexErr {
#[error("turn aborted")]
TurnAborted,
TurnAborted {
processed_items: Vec<ProcessedResponseItem>,
},
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
/// handshake has succeeded but **before** it finished emitting `response.completed`.
@@ -158,7 +161,9 @@ pub enum CodexErr {
impl From<CancelErr> for CodexErr {
fn from(_: CancelErr) -> Self {
CodexErr::TurnAborted
CodexErr::TurnAborted {
processed_items: Vec::new(),
}
}
}

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::either::Either;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use crate::codex::Session;
@@ -9,8 +10,10 @@ use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::function_tool::FunctionCallError;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolPayload;
use crate::tools::router::ToolCall;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
pub(crate) struct ToolCallRuntime {
@@ -40,6 +43,7 @@ impl ToolCallRuntime {
pub(crate) fn handle_tool_call(
&self,
call: ToolCall,
cancellation_token: CancellationToken,
) -> impl std::future::Future<Output = Result<ResponseInputItem, CodexErr>> {
let supports_parallel = self.router.tool_supports_parallel(&call.tool_name);
@@ -48,18 +52,24 @@ impl ToolCallRuntime {
let turn = Arc::clone(&self.turn_context);
let tracker = Arc::clone(&self.tracker);
let lock = Arc::clone(&self.parallel_execution);
let aborted_response = Self::aborted_response(&call);
let handle: AbortOnDropHandle<Result<ResponseInputItem, FunctionCallError>> =
AbortOnDropHandle::new(tokio::spawn(async move {
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {
Either::Right(lock.write().await)
};
tokio::select! {
_ = cancellation_token.cancelled() => Ok(aborted_response),
res = async {
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {
Either::Right(lock.write().await)
};
router
.dispatch_tool_call(session, turn, tracker, call)
.await
router
.dispatch_tool_call(session, turn, tracker, call)
.await
} => res,
}
}));
async move {
@@ -74,3 +84,25 @@ impl ToolCallRuntime {
}
}
}
impl ToolCallRuntime {
fn aborted_response(call: &ToolCall) -> ResponseInputItem {
match &call.payload {
ToolPayload::Custom { .. } => ResponseInputItem::CustomToolCallOutput {
call_id: call.call_id.clone(),
output: "aborted".to_string(),
},
ToolPayload::Mcp { .. } => ResponseInputItem::McpToolCallOutput {
call_id: call.call_id.clone(),
result: Err("aborted".to_string()),
},
_ => ResponseInputItem::FunctionCallOutput {
call_id: call.call_id.clone(),
output: FunctionCallOutputPayload {
content: "aborted".to_string(),
success: None,
},
},
}
}
}