Files
codex/codex-rs/core/src/stream_events_utils.rs
2026-03-03 13:10:12 -08:00

265 lines
9.6 KiB
Rust

use std::pin::Pin;
use std::sync::Arc;
use codex_protocol::config_types::ModeKind;
use codex_protocol::items::TurnItem;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::error::Result;
use crate::function_tool::FunctionCallError;
use crate::parse_turn_item;
use crate::proposed_plan_parser::strip_proposed_plan_blocks;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_taint::TaintEffect;
use codex_taint::TaintLabel;
use codex_taint::TaintSource;
use futures::Future;
use tracing::debug;
use tracing::instrument;
/// Handle a completed output item from the model stream, recording it and
/// queuing any tool execution futures. This records items immediately so
/// history and rollout stay in sync even if the turn is later cancelled.
pub(crate) type InFlightFuture<'f> =
Pin<Box<dyn Future<Output = Result<ResponseInputItem>> + Send + 'f>>;
#[derive(Default)]
pub(crate) struct OutputItemResult {
pub last_agent_message: Option<String>,
pub needs_follow_up: bool,
pub tool_future: Option<InFlightFuture<'static>>,
}
pub(crate) struct HandleOutputCtx {
pub sess: Arc<Session>,
pub turn_context: Arc<TurnContext>,
pub tool_runtime: ToolCallRuntime,
pub cancellation_token: CancellationToken,
}
#[instrument(level = "trace", skip_all)]
pub(crate) async fn handle_output_item_done(
ctx: &mut HandleOutputCtx,
item: ResponseItem,
previously_active_item: Option<TurnItem>,
) -> Result<OutputItemResult> {
let mut output = OutputItemResult::default();
let plan_mode = ctx.turn_context.collaboration_mode.mode == ModeKind::Plan;
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
Ok(Some(call)) => {
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!(
thread_id = %ctx.sess.conversation_id,
"ToolCall: {} {}",
call.tool_name,
payload_preview
);
ctx.sess
.record_conversation_items(&ctx.turn_context, std::slice::from_ref(&item))
.await;
let cancellation_token = ctx.cancellation_token.child_token();
let tool_future: InFlightFuture<'static> = Box::pin(
ctx.tool_runtime
.clone()
.handle_tool_call(call, cancellation_token),
);
output.needs_follow_up = true;
output.tool_future = Some(tool_future);
}
// No tool call: convert messages/reasoning into turn items and mark them as complete.
Ok(None) => {
if matches!(item, ResponseItem::WebSearchCall { .. }) {
ctx.sess
.apply_taint_effect(
&ctx.turn_context.sub_id,
TaintEffect::Mark {
label: TaintLabel::ExternalContent,
source: TaintSource::WebSearch,
},
)
.await;
}
if let Some(turn_item) = handle_non_tool_response_item(&item, plan_mode).await {
if previously_active_item.is_none() {
ctx.sess
.emit_turn_item_started(&ctx.turn_context, &turn_item)
.await;
}
ctx.sess
.emit_turn_item_completed(&ctx.turn_context, turn_item)
.await;
}
ctx.sess
.record_conversation_items(&ctx.turn_context, std::slice::from_ref(&item))
.await;
let last_agent_message = last_assistant_message_from_item(&item, plan_mode);
output.last_agent_message = last_agent_message;
}
// Guardrail: the model issued a LocalShellCall without an id; surface the error back into history.
Err(FunctionCallError::MissingLocalShellCallId) => {
let msg = "LocalShellCall without call_id or id";
ctx.turn_context
.otel_manager
.log_tool_failed("local_shell", msg);
tracing::error!(msg);
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
body: FunctionCallOutputBody::Text(msg.to_string()),
..Default::default()
},
};
ctx.sess
.record_conversation_items(&ctx.turn_context, std::slice::from_ref(&item))
.await;
if let Some(response_item) = response_input_to_response_item(&response) {
ctx.sess
.record_conversation_items(
&ctx.turn_context,
std::slice::from_ref(&response_item),
)
.await;
}
output.needs_follow_up = true;
}
// The tool request should be answered directly (or was denied); push that response into the transcript.
Err(FunctionCallError::RespondToModel(message)) => {
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
body: FunctionCallOutputBody::Text(message),
..Default::default()
},
};
ctx.sess
.record_conversation_items(&ctx.turn_context, std::slice::from_ref(&item))
.await;
if let Some(response_item) = response_input_to_response_item(&response) {
ctx.sess
.record_conversation_items(
&ctx.turn_context,
std::slice::from_ref(&response_item),
)
.await;
}
output.needs_follow_up = true;
}
// A fatal error occurred; surface it back into history.
Err(FunctionCallError::Fatal(message)) => {
return Err(CodexErr::Fatal(message));
}
}
Ok(output)
}
pub(crate) async fn handle_non_tool_response_item(
item: &ResponseItem,
plan_mode: bool,
) -> Option<TurnItem> {
debug!(?item, "Output item");
match item {
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let mut turn_item = parse_turn_item(item)?;
if plan_mode && let TurnItem::AgentMessage(agent_message) = &mut turn_item {
let combined = agent_message
.content
.iter()
.map(|entry| match entry {
codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(),
})
.collect::<String>();
let stripped = strip_proposed_plan_blocks(&combined);
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text { text: stripped }];
}
Some(turn_item)
}
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected tool output from stream");
None
}
_ => None,
}
}
pub(crate) fn last_assistant_message_from_item(
item: &ResponseItem,
plan_mode: bool,
) -> Option<String> {
if let ResponseItem::Message { role, content, .. } = item
&& role == "assistant"
{
let combined = content
.iter()
.filter_map(|ci| match ci {
codex_protocol::models::ContentItem::OutputText { text } => Some(text.as_str()),
_ => None,
})
.collect::<String>();
if combined.is_empty() {
return None;
}
return if plan_mode {
let stripped = strip_proposed_plan_blocks(&combined);
(!stripped.trim().is_empty()).then_some(stripped)
} else {
Some(combined)
};
}
None
}
pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Option<ResponseItem> {
match input {
ResponseInputItem::FunctionCallOutput { call_id, output } => {
Some(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
})
}
ResponseInputItem::CustomToolCallOutput { call_id, output } => {
Some(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
})
}
ResponseInputItem::McpToolCallOutput { call_id, result } => {
let output = match result {
Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result),
Err(err) => FunctionCallOutputPayload {
body: FunctionCallOutputBody::Text(err.clone()),
success: Some(false),
},
};
Some(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
})
}
_ => None,
}
}