Compare commits

...

6 Commits

Author SHA1 Message Date
Eric Traut
4c903ca29c codex: address PR review feedback (#16803)
Preserve buffered assistant text across non-message item completions so delayed assistant items can still replay early deltas.
2026-04-04 12:12:32 -07:00
Eric Traut
dfb4e002eb codex: address PR review feedback (#16803)
Match section breaks when reconciling completed reasoning snapshots against streamed deltas.
2026-04-04 12:03:25 -07:00
Eric Traut
533dbd5b69 codex: fix CI failure on PR #16803
Add the required argument-comment annotation to the new reasoning-summary test helper call.
2026-04-04 11:59:06 -07:00
Eric Traut
02d2c8231c codex: address PR review feedback (#16803)
Backfill a missing reasoning-summary prefix from the completed item snapshot when late suffix deltas streamed normally after earlier orphan deltas were dropped.
2026-04-04 11:52:19 -07:00
Eric Traut
1c141e5c62 Document orphan stream delta handling 2026-04-04 11:42:30 -07:00
Eric Traut
03a23b57b9 Fix reasoning summaries and orphan stream deltas
Addresses #16801
Problem: Responses streams can emit assistant or reasoning deltas before an active item exists, which can panic the CLI and omit completed reasoning summaries from the live TUI.
Solution: Buffer pre-item assistant text deltas, drop orphan reasoning deltas safely, and backfill completed reasoning summaries when no live deltas were buffered.
2026-04-04 11:27:12 -07:00
8 changed files with 495 additions and 30 deletions

View File

@@ -7374,6 +7374,9 @@ async fn try_run_sampling_request(
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
// Some streams send output text deltas before the assistant message item is announced.
// Hold those early bytes so we can replay them once `output_item.added` arrives.
let mut pending_output_text_delta = String::new();
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<SamplingRequestResult> = loop {
@@ -7413,6 +7416,8 @@ async fn try_run_sampling_request(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let completed_assistant_message =
matches!(&item, ResponseItem::Message { role, .. } if role == "assistant");
let previously_active_item = active_item.take();
if let Some(previous) = previously_active_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
@@ -7470,6 +7475,17 @@ async fn try_run_sampling_request(
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
.await?;
if completed_assistant_message && !pending_output_text_delta.is_empty() {
// If an assistant message completed while this buffer is still non-empty,
// those bytes could not be matched to that message item. Clear them so they
// don't leak into a later assistant item, but keep the buffer across
// reasoning/tool completions so a delayed message item can still claim it.
warn!(
buffered_len = pending_output_text_delta.len(),
"dropping buffered output text deltas after item completion"
);
pending_output_text_delta.clear();
}
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
@@ -7495,26 +7511,46 @@ async fn try_run_sampling_request(
.await
{
let mut turn_item = turn_item;
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
let mut seeded_item_id: Option<String> = None;
if matches!(turn_item, TurnItem::AgentMessage(_))
&& let Some(raw_text) = raw_assistant_output_text_from_item(&item)
{
let mut seeded_emit: Option<(String, ParsedAssistantTextDelta)> = None;
let mut pending_emit: Option<(String, ParsedAssistantTextDelta)> = None;
let raw_output_text = matches!(turn_item, TurnItem::AgentMessage(_))
.then(|| raw_assistant_output_text_from_item(&item))
.flatten();
if matches!(turn_item, TurnItem::AgentMessage(_)) {
let item_id = turn_item.id();
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
if let Some(raw_text) = raw_output_text.as_deref() {
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
}
seeded_emit = plan_mode.then_some((item_id.clone(), seeded));
}
if !pending_output_text_delta.is_empty() {
let mut pending_delta = std::mem::take(&mut pending_output_text_delta);
if let Some(raw_text) = raw_output_text.as_deref() {
// The item payload may already contain all or part of the text we
// buffered from pre-item deltas. Trim the overlap so replayed
// deltas don't duplicate visible output.
if pending_delta.starts_with(raw_text) {
pending_delta.drain(..raw_text.len());
} else if raw_text.starts_with(&pending_delta) {
pending_delta.clear();
}
}
if !pending_delta.is_empty() {
let parsed = assistant_message_stream_parsers
.parse_delta(&item_id, &pending_delta);
pending_emit = Some((item_id, parsed));
}
}
seeded_parsed = plan_mode.then_some(seeded);
seeded_item_id = Some(item_id);
}
if let Some(state) = plan_mode_state.as_mut()
&& matches!(turn_item, TurnItem::AgentMessage(_))
@@ -7526,16 +7562,22 @@ async fn try_run_sampling_request(
} else {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
if let (Some(state), Some(item_id), Some(parsed)) = (
plan_mode_state.as_mut(),
seeded_item_id.as_deref(),
seeded_parsed,
) {
if let Some((item_id, parsed)) = seeded_emit {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
Some(state),
item_id,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
}
if let Some((item_id, parsed)) = pending_emit {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
@@ -7612,7 +7654,12 @@ async fn try_run_sampling_request(
.await;
}
} else {
error_or_panic("OutputTextDelta without active item".to_string());
// This event should normally follow `output_item.added`, but tolerate the
// reversed ordering by buffering text until the item metadata arrives.
if pending_output_text_delta.is_empty() {
warn!("buffering OutputTextDelta without active item");
}
pending_output_text_delta.push_str(&delta);
}
}
ResponseEvent::ReasoningSummaryDelta {
@@ -7630,7 +7677,12 @@ async fn try_run_sampling_request(
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
.await;
} else {
error_or_panic("ReasoningSummaryDelta without active item".to_string());
// Without an active reasoning item there is nowhere safe to attach this delta.
// Drop the orphan event and rely on the eventual completed item snapshot.
warn!(
summary_index,
"dropping ReasoningSummaryDelta without active item"
);
}
}
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
@@ -7642,7 +7694,12 @@ async fn try_run_sampling_request(
});
sess.send_event(&turn_context, event).await;
} else {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
// Keep section-break handling consistent with summary deltas: a break without
// an active reasoning item is orphaned stream state, not a fatal parser error.
warn!(
summary_index,
"dropping ReasoningSummaryPartAdded without active item"
);
}
}
ResponseEvent::ReasoningContentDelta {
@@ -7660,7 +7717,12 @@ async fn try_run_sampling_request(
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
.await;
} else {
error_or_panic("ReasoningRawContentDelta without active item".to_string());
// Raw reasoning deltas can also arrive before their item in malformed or
// reordered streams. Preserve liveness by dropping the orphan event.
warn!(
content_index,
"dropping ReasoningRawContentDelta without active item"
);
}
}
}

View File

@@ -719,6 +719,13 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
})
}
pub fn ev_reasoning_summary_part_added(summary_index: i64) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"summary_index": summary_index,
})
}
pub fn ev_reasoning_text_delta(delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_text.delta",

View File

@@ -21,6 +21,7 @@ use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_reasoning_summary_part_added;
use core_test_support::responses::ev_reasoning_summary_text_delta;
use core_test_support::responses::ev_reasoning_text_delta;
use core_test_support::responses::ev_response_created;
@@ -1061,6 +1062,186 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn output_text_delta_before_output_item_added_is_buffered() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;
let stream = sse(vec![
ev_response_created("resp-1"),
ev_output_text_delta("Hello "),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("world"),
ev_assistant_message("msg-1", "Hello world"),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut agent_deltas = Vec::new();
let mut completed = None;
loop {
match wait_for_event(&codex, |_| true).await {
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => completed = Some(item),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
assert_eq!(agent_deltas.concat(), "Hello world");
let completed_text: String = completed
.expect("assistant item completion should be emitted")
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
assert_eq!(completed_text, "Hello world");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn buffered_output_text_survives_intervening_reasoning_item_done() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;
let stream = sse(vec![
ev_response_created("resp-1"),
ev_output_text_delta("Hello "),
ev_reasoning_item("reasoning-1", &["thinking"], &[]),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("world"),
ev_assistant_message("msg-1", "Hello world"),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut agent_deltas = Vec::new();
let mut completed = None;
loop {
match wait_for_event(&codex, |_| true).await {
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => completed = Some(item),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
assert_eq!(agent_deltas.concat(), "Hello world");
let completed_text: String = completed
.expect("assistant item completion should be emitted")
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
assert_eq!(completed_text, "Hello world");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn orphan_reasoning_summary_events_do_not_break_completed_reasoning_item()
-> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex { codex, .. } = test_codex().build(&server).await?;
let stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_summary_part_added(/*summary_index*/ 0),
ev_reasoning_summary_text_delta("Summary only"),
ev_reasoning_item("reasoning-1", &["Summary only"], &[]),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "think".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Reasoning(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;
assert_eq!(completed.summary_text, vec!["Summary only".to_string()]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -6016,7 +6016,16 @@ impl ChatWidget {
ThreadItem::Reasoning {
summary, content, ..
} => {
if from_replay {
let mut completed_reasoning = summary.join("\n\n");
if self.config.show_raw_agent_reasoning {
if !completed_reasoning.is_empty() && !content.is_empty() {
completed_reasoning.push_str("\n\n");
}
completed_reasoning.push_str(&content.join("\n\n"));
}
let streamed_reasoning =
format!("{}{}", self.full_reasoning_buffer, self.reasoning_buffer);
if from_replay || streamed_reasoning.trim().is_empty() {
for delta in summary {
self.on_agent_reasoning_delta(delta);
}
@@ -6025,6 +6034,17 @@ impl ChatWidget {
self.on_agent_reasoning_delta(delta);
}
}
} else if !completed_reasoning.is_empty()
&& completed_reasoning != streamed_reasoning
&& let Some(missing_prefix) = completed_reasoning
.strip_suffix(&streamed_reasoning)
.filter(|missing_prefix| !missing_prefix.is_empty())
{
// If early summary deltas were dropped before the item became active, prepend
// the missing prefix from the completed item snapshot and keep the streamed
// suffix so we still avoid double-rendering.
self.full_reasoning_buffer =
format!("{missing_prefix}{}", self.full_reasoning_buffer);
}
self.on_agent_reasoning_final();
}

View File

@@ -0,0 +1,7 @@
---
source: tui/src/chatwidget/tests/history_replay.rs
expression: rendered
---
• prefix middle
tail

View File

@@ -0,0 +1,5 @@
---
source: tui/src/chatwidget/tests/history_replay.rs
expression: rendered
---
• prefix suffix

View File

@@ -0,0 +1,5 @@
---
source: tui/src/chatwidget/tests/history_replay.rs
expression: rendered
---
• Summary only

View File

@@ -1,4 +1,5 @@
use super::*;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use pretty_assertions::assert_eq;
#[tokio::test]
@@ -725,6 +726,183 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() {
assert_eq!(rendered.matches("Summary only").count(), 1);
}
#[tokio::test]
async fn live_reasoning_item_completed_renders_summary_without_prior_delta() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.show_welcome_banner = false;
chat.handle_server_notification(
ServerNotification::TurnStarted(TurnStartedNotification {
thread_id: "thread-1".to_string(),
turn: AppServerTurn {
id: "turn-1".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::InProgress,
error: None,
},
}),
/*replay_kind*/ None,
);
let _ = drain_insert_history(&mut rx);
chat.handle_server_notification(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: AppServerThreadItem::Reasoning {
id: "reasoning-1".to_string(),
summary: vec!["Summary only".to_string()],
content: Vec::new(),
},
}),
/*replay_kind*/ None,
);
let rendered = match rx.try_recv() {
Ok(AppEvent::InsertHistoryCell(cell)) => {
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
}
other => panic!("expected InsertHistoryCell, got {other:?}"),
};
assert!(rendered.contains("Summary only"));
assert_chatwidget_snapshot!(
"live_reasoning_item_completed_renders_summary_without_prior_delta",
rendered
);
}
#[tokio::test]
async fn live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.show_welcome_banner = false;
chat.handle_server_notification(
ServerNotification::TurnStarted(TurnStartedNotification {
thread_id: "thread-1".to_string(),
turn: AppServerTurn {
id: "turn-1".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::InProgress,
error: None,
},
}),
/*replay_kind*/ None,
);
let _ = drain_insert_history(&mut rx);
chat.handle_server_notification(
ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "reasoning-1".to_string(),
delta: "suffix".to_string(),
summary_index: 0,
}),
/*replay_kind*/ None,
);
chat.handle_server_notification(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: AppServerThreadItem::Reasoning {
id: "reasoning-1".to_string(),
summary: vec!["prefix suffix".to_string()],
content: Vec::new(),
},
}),
/*replay_kind*/ None,
);
let rendered = match rx.try_recv() {
Ok(AppEvent::InsertHistoryCell(cell)) => {
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
}
other => panic!("expected InsertHistoryCell, got {other:?}"),
};
assert_eq!(rendered.matches("prefix suffix").count(), 1);
assert_chatwidget_snapshot!(
"live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix",
rendered
);
}
#[tokio::test]
async fn live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.show_welcome_banner = false;
chat.handle_server_notification(
ServerNotification::TurnStarted(TurnStartedNotification {
thread_id: "thread-1".to_string(),
turn: AppServerTurn {
id: "turn-1".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::InProgress,
error: None,
},
}),
/*replay_kind*/ None,
);
let _ = drain_insert_history(&mut rx);
chat.handle_server_notification(
ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "reasoning-1".to_string(),
delta: "middle".to_string(),
summary_index: 0,
}),
/*replay_kind*/ None,
);
chat.handle_server_notification(
ServerNotification::ReasoningSummaryPartAdded(ReasoningSummaryPartAddedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "reasoning-1".to_string(),
summary_index: 1,
}),
/*replay_kind*/ None,
);
chat.handle_server_notification(
ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "reasoning-1".to_string(),
delta: "tail".to_string(),
summary_index: 1,
}),
/*replay_kind*/ None,
);
chat.handle_server_notification(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: AppServerThreadItem::Reasoning {
id: "reasoning-1".to_string(),
summary: vec!["prefix middle".to_string(), "tail".to_string()],
content: Vec::new(),
},
}),
/*replay_kind*/ None,
);
let rendered = match rx.try_recv() {
Ok(AppEvent::InsertHistoryCell(cell)) => {
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
}
other => panic!("expected InsertHistoryCell, got {other:?}"),
};
assert_eq!(rendered.matches("prefix middle").count(), 1);
assert!(rendered.contains("tail"));
assert_chatwidget_snapshot!(
"live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks",
rendered
);
}
#[tokio::test]
async fn replayed_turn_started_does_not_mark_task_running() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;