Inline response recording and remove process_items indirection (#7310)

- Inline response recording during streaming: `run_turn` now records
items as they arrive instead of building a `ProcessedResponseItem` list
and post‑processing via `process_items`.
- Simplify turn handling: `handle_output_item_done` returns the
follow‑up signal + optional tool future; `needs_follow_up` is set only
there, and in‑flight tool futures are drained once at the end (errors
logged, no extra state writes).
- Flattened stream loop: removed `process_items` indirection and the
extra output queue
- - Tests: relaxed `tool_parallelism::tool_results_grouped` to allow any
completion order while still requiring matching call/output IDs.
This commit is contained in:
Ahmed Ibrahim
2025-12-04 12:17:54 -08:00
committed by GitHub
parent 7dfc3a4dc7
commit 6e6338aa87
6 changed files with 288 additions and 212 deletions

View File

@@ -13,11 +13,12 @@ use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::features::Feature;
use crate::features::Features;
use crate::function_tool::FunctionCallError;
use crate::openai_models::models_manager::ModelsManager;
use crate::parse_command::parse_command;
use crate::parse_turn_item;
use crate::response_processing::process_items;
use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
use crate::terminal;
use crate::truncate::TruncationPolicy;
use crate::user_notification::UserNotifier;
@@ -131,7 +132,6 @@ use codex_execpolicy::Policy as ExecPolicy;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
@@ -827,7 +827,7 @@ impl Session {
}
}
async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {
pub(crate) async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {
self.send_event(
turn_context,
EventMsg::ItemStarted(ItemStartedEvent {
@@ -839,7 +839,11 @@ impl Session {
.await;
}
async fn emit_turn_item_completed(&self, turn_context: &TurnContext, item: TurnItem) {
pub(crate) async fn emit_turn_item_completed(
&self,
turn_context: &TurnContext,
item: TurnItem,
) {
self.send_event(
turn_context,
EventMsg::ItemCompleted(ItemCompletedEvent {
@@ -2060,15 +2064,16 @@ pub(crate) async fn run_task(
.await
{
Ok(turn_output) => {
let processed_items = turn_output;
let TurnRunResult {
needs_follow_up,
last_agent_message: turn_last_agent_message,
} = turn_output;
let limit = turn_context
.client
.get_auto_compact_token_limit()
.unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= limit;
let (responses, items_to_record_in_conversation_history) =
process_items(processed_items, &sess, &turn_context).await;
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached {
@@ -2081,10 +2086,8 @@ pub(crate) async fn run_task(
continue;
}
if responses.is_empty() {
last_agent_message = get_last_assistant_message_from_turn(
&items_to_record_in_conversation_history,
);
if !needs_follow_up {
last_agent_message = turn_last_agent_message;
sess.notifier()
.notify(&UserNotification::AgentTurnComplete {
thread_id: sess.conversation_id.to_string(),
@@ -2097,10 +2100,7 @@ pub(crate) async fn run_task(
}
continue;
}
Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
}) => {
let _ = process_items(processed_items, &sess, &turn_context).await;
Err(CodexErr::TurnAborted) => {
// Aborted turn is reported via a different event.
break;
}
@@ -2130,7 +2130,7 @@ async fn run_turn(
turn_diff_tracker: SharedTurnDiffTracker,
input: Vec<ResponseItem>,
cancellation_token: CancellationToken,
) -> CodexResult<Vec<ProcessedResponseItem>> {
) -> CodexResult<TurnRunResult> {
let mcp_tools = sess
.services
.mcp_connection_manager
@@ -2184,13 +2184,10 @@ async fn run_turn(
)
.await
{
// todo(aibrahim): map special cases and ? on other errors
Ok(output) => return Ok(output),
Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
}) => {
return Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
});
Err(CodexErr::TurnAborted) => {
return Err(CodexErr::TurnAborted);
}
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
@@ -2243,14 +2240,29 @@ async fn run_turn(
}
}
/// When the model is prompted, it returns a stream of events. Some of these
/// events map to a `ResponseItem`. A `ResponseItem` may need to be
/// "handled" such that it produces a `ResponseInputItem` that needs to be
/// sent back to the model on the next turn.
#[derive(Debug)]
pub struct ProcessedResponseItem {
pub item: ResponseItem,
pub response: Option<ResponseInputItem>,
struct TurnRunResult {
needs_follow_up: bool,
last_agent_message: Option<String>,
}
async fn drain_in_flight(
in_flight: &mut FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>>,
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> CodexResult<()> {
while let Some(res) = in_flight.next().await {
match res {
Ok(response_input) => {
sess.record_conversation_items(&turn_context, &[response_input.into()])
.await;
}
Err(err) => {
error_or_panic(format!("in-flight tool future failed during drain: {err}"));
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
@@ -2261,7 +2273,7 @@ async fn try_run_turn(
turn_diff_tracker: SharedTurnDiffTracker,
prompt: &Prompt,
cancellation_token: CancellationToken,
) -> CodexResult<Vec<ProcessedResponseItem>> {
) -> CodexResult<TurnRunResult> {
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
@@ -2285,114 +2297,47 @@ async fn try_run_turn(
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
);
let mut output: FuturesOrdered<BoxFuture<CodexResult<ProcessedResponseItem>>> =
let mut in_flight: FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>> =
FuturesOrdered::new();
let mut needs_follow_up = false;
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None;
loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let outcome: CodexResult<TurnRunResult> = loop {
let event = match stream.next().or_cancel(&cancellation_token).await {
Ok(event) => event,
Err(codex_async_utils::CancelErr::Cancelled) => {
let processed_items = output.try_collect().await?;
return Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
});
}
Err(codex_async_utils::CancelErr::Cancelled) => break Err(CodexErr::TurnAborted),
};
let event = match event {
Some(res) => res?,
None => {
return Err(CodexErr::Stream(
break Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
}
};
let add_completed = &mut |response_item: ProcessedResponseItem| {
output.push_back(future::ready(Ok(response_item)).boxed());
};
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let previously_active_item = active_item.take();
match ToolRouter::build_tool_call(sess.as_ref(), item.clone()).await {
Ok(Some(call)) => {
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
let mut ctx = HandleOutputCtx {
sess: sess.clone(),
turn_context: turn_context.clone(),
tool_runtime: tool_runtime.clone(),
cancellation_token: cancellation_token.child_token(),
};
let response =
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
output.push_back(
async move {
Ok(ProcessedResponseItem {
item,
response: Some(response.await?),
})
}
.boxed(),
);
}
Ok(None) => {
if let Some(turn_item) = handle_non_tool_response_item(&item).await {
if previously_active_item.is_none() {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
sess.emit_turn_item_completed(&turn_context, turn_item)
.await;
}
add_completed(ProcessedResponseItem {
item,
response: None,
});
}
Err(FunctionCallError::MissingLocalShellCallId) => {
let msg = "LocalShellCall without call_id or id";
turn_context
.client
.get_otel_event_manager()
.log_tool_failed("local_shell", msg);
error!(msg);
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: msg.to_string(),
..Default::default()
},
};
add_completed(ProcessedResponseItem {
item,
response: Some(response),
});
}
Err(FunctionCallError::RespondToModel(message))
| Err(FunctionCallError::Denied(message)) => {
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: message,
..Default::default()
},
};
add_completed(ProcessedResponseItem {
item,
response: Some(response),
});
}
Err(FunctionCallError::Fatal(message)) => {
return Err(CodexErr::Fatal(message));
}
let output_result =
handle_output_item_done(&mut ctx, item, previously_active_item).await?;
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
if let Some(agent_message) = output_result.last_agent_message {
last_agent_message = Some(agent_message);
}
needs_follow_up |= output_result.needs_follow_up;
}
ResponseEvent::OutputItemAdded(item) => {
if let Some(turn_item) = handle_non_tool_response_item(&item).await {
@@ -2413,7 +2358,6 @@ async fn try_run_turn(
} => {
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
.await;
let processed_items = output.try_collect().await?;
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
@@ -2423,7 +2367,10 @@ async fn try_run_turn(
sess.send_event(&turn_context, msg).await;
}
return Ok(processed_items);
break Ok(TurnRunResult {
needs_follow_up,
last_agent_message,
});
}
ResponseEvent::OutputTextDelta(delta) => {
// In review child threads, suppress assistant text deltas; the
@@ -2490,22 +2437,11 @@ async fn try_run_turn(
}
}
}
}
}
};
async fn handle_non_tool_response_item(item: &ResponseItem) -> Option<TurnItem> {
debug!(?item, "Output item");
drain_in_flight(&mut in_flight, sess, turn_context).await?;
match item {
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => parse_turn_item(item),
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected tool output from stream");
None
}
_ => None,
}
outcome
}
pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
@@ -2540,8 +2476,10 @@ mod tests {
use crate::config::ConfigOverrides;
use crate::config::ConfigToml;
use crate::exec::ExecToolCallOutput;
use crate::function_tool::FunctionCallError;
use crate::shell::default_user_shell;
use crate::tools::format_exec_output_str;
use codex_protocol::models::FunctionCallOutputPayload;
use crate::protocol::CompactedItem;
use crate::protocol::CreditsSnapshot;