Fix reasoning summaries and orphan stream deltas

Addresses #16801
Problem: Responses streams can emit assistant or reasoning deltas before an active item exists, which can panic the CLI and omit completed reasoning summaries from the live TUI.
Solution: Buffer pre-item assistant text deltas, drop orphan reasoning deltas safely, and backfill completed reasoning summaries when no live deltas were buffered.
This commit is contained in:
Eric Traut
2026-04-04 11:27:12 -07:00
parent cca36c5681
commit 03a23b57b9
6 changed files with 244 additions and 30 deletions

View File

@@ -7374,6 +7374,7 @@ async fn try_run_sampling_request(
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
let mut pending_output_text_delta = String::new();
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<SamplingRequestResult> = loop {
@@ -7470,6 +7471,13 @@ async fn try_run_sampling_request(
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
.await?;
if !pending_output_text_delta.is_empty() {
warn!(
buffered_len = pending_output_text_delta.len(),
"dropping buffered output text deltas after item completion"
);
pending_output_text_delta.clear();
}
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
@@ -7495,26 +7503,43 @@ async fn try_run_sampling_request(
.await
{
let mut turn_item = turn_item;
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
let mut seeded_item_id: Option<String> = None;
if matches!(turn_item, TurnItem::AgentMessage(_))
&& let Some(raw_text) = raw_assistant_output_text_from_item(&item)
{
let mut seeded_emit: Option<(String, ParsedAssistantTextDelta)> = None;
let mut pending_emit: Option<(String, ParsedAssistantTextDelta)> = None;
let raw_output_text = matches!(turn_item, TurnItem::AgentMessage(_))
.then(|| raw_assistant_output_text_from_item(&item))
.flatten();
if matches!(turn_item, TurnItem::AgentMessage(_)) {
let item_id = turn_item.id();
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
if let Some(raw_text) = raw_output_text.as_deref() {
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
}
seeded_emit = plan_mode.then_some((item_id.clone(), seeded));
}
if !pending_output_text_delta.is_empty() {
let mut pending_delta = std::mem::take(&mut pending_output_text_delta);
if let Some(raw_text) = raw_output_text.as_deref() {
if pending_delta.starts_with(raw_text) {
pending_delta.drain(..raw_text.len());
} else if raw_text.starts_with(&pending_delta) {
pending_delta.clear();
}
}
if !pending_delta.is_empty() {
let parsed = assistant_message_stream_parsers
.parse_delta(&item_id, &pending_delta);
pending_emit = Some((item_id, parsed));
}
}
seeded_parsed = plan_mode.then_some(seeded);
seeded_item_id = Some(item_id);
}
if let Some(state) = plan_mode_state.as_mut()
&& matches!(turn_item, TurnItem::AgentMessage(_))
@@ -7526,16 +7551,22 @@ async fn try_run_sampling_request(
} else {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
if let (Some(state), Some(item_id), Some(parsed)) = (
plan_mode_state.as_mut(),
seeded_item_id.as_deref(),
seeded_parsed,
) {
if let Some((item_id, parsed)) = seeded_emit {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
Some(state),
item_id,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
}
if let Some((item_id, parsed)) = pending_emit {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
@@ -7612,7 +7643,10 @@ async fn try_run_sampling_request(
.await;
}
} else {
error_or_panic("OutputTextDelta without active item".to_string());
if pending_output_text_delta.is_empty() {
warn!("buffering OutputTextDelta without active item");
}
pending_output_text_delta.push_str(&delta);
}
}
ResponseEvent::ReasoningSummaryDelta {
@@ -7630,7 +7664,10 @@ async fn try_run_sampling_request(
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
.await;
} else {
error_or_panic("ReasoningSummaryDelta without active item".to_string());
warn!(
summary_index,
"dropping ReasoningSummaryDelta without active item"
);
}
}
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
@@ -7642,7 +7679,10 @@ async fn try_run_sampling_request(
});
sess.send_event(&turn_context, event).await;
} else {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
warn!(
summary_index,
"dropping ReasoningSummaryPartAdded without active item"
);
}
}
ResponseEvent::ReasoningContentDelta {
@@ -7660,7 +7700,10 @@ async fn try_run_sampling_request(
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
.await;
} else {
error_or_panic("ReasoningRawContentDelta without active item".to_string());
warn!(
content_index,
"dropping ReasoningRawContentDelta without active item"
);
}
}
}

View File

@@ -719,6 +719,13 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
})
}
pub fn ev_reasoning_summary_part_added(summary_index: i64) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"summary_index": summary_index,
})
}
pub fn ev_reasoning_text_delta(delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_text.delta",

View File

@@ -21,6 +21,7 @@ use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_reasoning_summary_part_added;
use core_test_support::responses::ev_reasoning_summary_text_delta;
use core_test_support::responses::ev_reasoning_text_delta;
use core_test_support::responses::ev_response_created;
@@ -1061,6 +1062,116 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn output_text_delta_before_output_item_added_is_buffered() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;
let stream = sse(vec![
ev_response_created("resp-1"),
ev_output_text_delta("Hello "),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("world"),
ev_assistant_message("msg-1", "Hello world"),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut agent_deltas = Vec::new();
let mut completed = None;
loop {
match wait_for_event(&codex, |_| true).await {
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => completed = Some(item),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
assert_eq!(agent_deltas.concat(), "Hello world");
let completed_text: String = completed
.expect("assistant item completion should be emitted")
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
assert_eq!(completed_text, "Hello world");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn orphan_reasoning_summary_events_do_not_break_completed_reasoning_item()
-> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex { codex, .. } = test_codex().build(&server).await?;
let stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_summary_part_added(0),
ev_reasoning_summary_text_delta("Summary only"),
ev_reasoning_item("reasoning-1", &["Summary only"], &[]),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "think".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Reasoning(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;
assert_eq!(completed.summary_text, vec!["Summary only".to_string()]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -6016,7 +6016,10 @@ impl ChatWidget {
ThreadItem::Reasoning {
summary, content, ..
} => {
if from_replay {
let should_backfill_reasoning = from_replay
|| (self.reasoning_buffer.trim().is_empty()
&& self.full_reasoning_buffer.trim().is_empty());
if should_backfill_reasoning {
for delta in summary {
self.on_agent_reasoning_delta(delta);
}

View File

@@ -0,0 +1,5 @@
---
source: tui/src/chatwidget/tests/history_replay.rs
expression: rendered
---
• Summary only

View File

@@ -725,6 +725,51 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() {
assert_eq!(rendered.matches("Summary only").count(), 1);
}
#[tokio::test]
async fn live_reasoning_item_completed_renders_summary_without_prior_delta() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.show_welcome_banner = false;
chat.handle_server_notification(
ServerNotification::TurnStarted(TurnStartedNotification {
thread_id: "thread-1".to_string(),
turn: AppServerTurn {
id: "turn-1".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::InProgress,
error: None,
},
}),
/*replay_kind*/ None,
);
let _ = drain_insert_history(&mut rx);
chat.handle_server_notification(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: AppServerThreadItem::Reasoning {
id: "reasoning-1".to_string(),
summary: vec!["Summary only".to_string()],
content: Vec::new(),
},
}),
/*replay_kind*/ None,
);
let rendered = match rx.try_recv() {
Ok(AppEvent::InsertHistoryCell(cell)) => {
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
}
other => panic!("expected InsertHistoryCell, got {other:?}"),
};
assert!(rendered.contains("Summary only"));
assert_chatwidget_snapshot!(
"live_reasoning_item_completed_renders_summary_without_prior_delta",
rendered
);
}
#[tokio::test]
async fn replayed_turn_started_does_not_mark_task_running() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;