From 03a23b57b9cdb7d23a547b0df30ee37d28908b2c Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Sat, 4 Apr 2026 11:27:12 -0700 Subject: [PATCH] Fix reasoning summaries and orphan stream deltas Addresses #16801 Problem: Responses streams can emit assistant or reasoning deltas before an active item exists, which can panic the CLI and omit completed reasoning summaries from the live TUI. Solution: Buffer pre-item assistant text deltas, drop orphan reasoning deltas safely, and backfill completed reasoning summaries when no live deltas were buffered. --- codex-rs/core/src/codex.rs | 101 +++++++++++----- codex-rs/core/tests/common/responses.rs | 7 ++ codex-rs/core/tests/suite/items.rs | 111 ++++++++++++++++++ codex-rs/tui/src/chatwidget.rs | 5 +- ...d_renders_summary_without_prior_delta.snap | 5 + .../src/chatwidget/tests/history_replay.rs | 45 +++++++ 6 files changed, 244 insertions(+), 30 deletions(-) create mode 100644 codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5044202174..202bd139db 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -7374,6 +7374,7 @@ async fn try_run_sampling_request( let mut should_emit_turn_diff = false; let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan; let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode); + let mut pending_output_text_delta = String::new(); let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id)); let receiving_span = trace_span!("receiving_stream"); let outcome: CodexResult = loop { @@ -7470,6 +7471,13 @@ async fn try_run_sampling_request( let output_result = handle_output_item_done(&mut ctx, item, previously_active_item) .instrument(handle_responses) .await?; + if !pending_output_text_delta.is_empty() { + warn!( + buffered_len = pending_output_text_delta.len(), + "dropping buffered output text deltas after item completion" + ); + pending_output_text_delta.clear(); + } if let Some(tool_future) = output_result.tool_future { in_flight.push_back(tool_future); } @@ -7495,26 +7503,43 @@ async fn try_run_sampling_request( .await { let mut turn_item = turn_item; - let mut seeded_parsed: Option = None; - let mut seeded_item_id: Option = None; - if matches!(turn_item, TurnItem::AgentMessage(_)) - && let Some(raw_text) = raw_assistant_output_text_from_item(&item) - { + let mut seeded_emit: Option<(String, ParsedAssistantTextDelta)> = None; + let mut pending_emit: Option<(String, ParsedAssistantTextDelta)> = None; + let raw_output_text = matches!(turn_item, TurnItem::AgentMessage(_)) + .then(|| raw_assistant_output_text_from_item(&item)) + .flatten(); + if matches!(turn_item, TurnItem::AgentMessage(_)) { let item_id = turn_item.id(); - let mut seeded = - assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text); - if let TurnItem::AgentMessage(agent_message) = &mut turn_item { - agent_message.content = - vec![codex_protocol::items::AgentMessageContent::Text { - text: if plan_mode { - String::new() - } else { - std::mem::take(&mut seeded.visible_text) - }, - }]; + if let Some(raw_text) = raw_output_text.as_deref() { + let mut seeded = + assistant_message_stream_parsers.seed_item_text(&item_id, raw_text); + if let TurnItem::AgentMessage(agent_message) = &mut turn_item { + agent_message.content = + vec![codex_protocol::items::AgentMessageContent::Text { + text: if plan_mode { + String::new() + } else { + std::mem::take(&mut seeded.visible_text) + }, + }]; + } + seeded_emit = plan_mode.then_some((item_id.clone(), seeded)); + } + if !pending_output_text_delta.is_empty() { + let mut pending_delta = std::mem::take(&mut pending_output_text_delta); + if let Some(raw_text) = raw_output_text.as_deref() { + if pending_delta.starts_with(raw_text) { + pending_delta.drain(..raw_text.len()); + } else if raw_text.starts_with(&pending_delta) { + pending_delta.clear(); + } + } + if !pending_delta.is_empty() { + let parsed = assistant_message_stream_parsers + .parse_delta(&item_id, &pending_delta); + pending_emit = Some((item_id, parsed)); + } } - seeded_parsed = plan_mode.then_some(seeded); - seeded_item_id = Some(item_id); } if let Some(state) = plan_mode_state.as_mut() && matches!(turn_item, TurnItem::AgentMessage(_)) @@ -7526,16 +7551,22 @@ async fn try_run_sampling_request( } else { sess.emit_turn_item_started(&turn_context, &turn_item).await; } - if let (Some(state), Some(item_id), Some(parsed)) = ( - plan_mode_state.as_mut(), - seeded_item_id.as_deref(), - seeded_parsed, - ) { + if let Some((item_id, parsed)) = seeded_emit { emit_streamed_assistant_text_delta( &sess, &turn_context, - Some(state), - item_id, + plan_mode_state.as_mut(), + &item_id, + parsed, + ) + .await; + } + if let Some((item_id, parsed)) = pending_emit { + emit_streamed_assistant_text_delta( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &item_id, parsed, ) .await; @@ -7612,7 +7643,10 @@ async fn try_run_sampling_request( .await; } } else { - error_or_panic("OutputTextDelta without active item".to_string()); + if pending_output_text_delta.is_empty() { + warn!("buffering OutputTextDelta without active item"); + } + pending_output_text_delta.push_str(&delta); } } ResponseEvent::ReasoningSummaryDelta { @@ -7630,7 +7664,10 @@ async fn try_run_sampling_request( sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) .await; } else { - error_or_panic("ReasoningSummaryDelta without active item".to_string()); + warn!( + summary_index, + "dropping ReasoningSummaryDelta without active item" + ); } } ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { @@ -7642,7 +7679,10 @@ async fn try_run_sampling_request( }); sess.send_event(&turn_context, event).await; } else { - error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); + warn!( + summary_index, + "dropping ReasoningSummaryPartAdded without active item" + ); } } ResponseEvent::ReasoningContentDelta { @@ -7660,7 +7700,10 @@ async fn try_run_sampling_request( sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) .await; } else { - error_or_panic("ReasoningRawContentDelta without active item".to_string()); + warn!( + content_index, + "dropping ReasoningRawContentDelta without active item" + ); } } } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 2e2155ebdd..cf286b4b71 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -719,6 +719,13 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value { }) } +pub fn ev_reasoning_summary_part_added(summary_index: i64) -> Value { + serde_json::json!({ + "type": "response.reasoning_summary_part.added", + "summary_index": summary_index, + }) +} + pub fn ev_reasoning_text_delta(delta: &str) -> Value { serde_json::json!({ "type": "response.reasoning_text.delta", diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index f949cd4b97..9131e546bb 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -21,6 +21,7 @@ use core_test_support::responses::ev_message_item_added; use core_test_support::responses::ev_output_text_delta; use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_reasoning_item_added; +use core_test_support::responses::ev_reasoning_summary_part_added; use core_test_support::responses::ev_reasoning_summary_text_delta; use core_test_support::responses::ev_reasoning_text_delta; use core_test_support::responses::ev_response_created; @@ -1061,6 +1062,116 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn output_text_delta_before_output_item_added_is_buffered() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_output_text_delta("Hello "), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("world"), + ev_assistant_message("msg-1", "Hello world"), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_protocol::protocol::AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + let mut agent_deltas = Vec::new(); + let mut completed = None; + loop { + match wait_for_event(&codex, |_| true).await { + EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => completed = Some(item), + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + + assert_eq!(agent_deltas.concat(), "Hello world"); + let completed_text: String = completed + .expect("assistant item completion should be emitted") + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + assert_eq!(completed_text, "Hello world"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn orphan_reasoning_summary_events_do_not_break_completed_reasoning_item() +-> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_reasoning_summary_part_added(0), + ev_reasoning_summary_text_delta("Summary only"), + ev_reasoning_item("reasoning-1", &["Summary only"], &[]), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "think".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(completed.summary_text, vec!["Summary only".to_string()]); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index b0ddb631bd..e533d47671 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -6016,7 +6016,10 @@ impl ChatWidget { ThreadItem::Reasoning { summary, content, .. } => { - if from_replay { + let should_backfill_reasoning = from_replay + || (self.reasoning_buffer.trim().is_empty() + && self.full_reasoning_buffer.trim().is_empty()); + if should_backfill_reasoning { for delta in summary { self.on_agent_reasoning_delta(delta); } diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap new file mode 100644 index 0000000000..b62a32a1fc --- /dev/null +++ b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap @@ -0,0 +1,5 @@ +--- +source: tui/src/chatwidget/tests/history_replay.rs +expression: rendered +--- +• Summary only diff --git a/codex-rs/tui/src/chatwidget/tests/history_replay.rs b/codex-rs/tui/src/chatwidget/tests/history_replay.rs index 58993d67e8..f5df21f82f 100644 --- a/codex-rs/tui/src/chatwidget/tests/history_replay.rs +++ b/codex-rs/tui/src/chatwidget/tests/history_replay.rs @@ -725,6 +725,51 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() { assert_eq!(rendered.matches("Summary only").count(), 1); } +#[tokio::test] +async fn live_reasoning_item_completed_renders_summary_without_prior_delta() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + chat.show_welcome_banner = false; + + chat.handle_server_notification( + ServerNotification::TurnStarted(TurnStartedNotification { + thread_id: "thread-1".to_string(), + turn: AppServerTurn { + id: "turn-1".to_string(), + items: Vec::new(), + status: AppServerTurnStatus::InProgress, + error: None, + }, + }), + /*replay_kind*/ None, + ); + let _ = drain_insert_history(&mut rx); + + chat.handle_server_notification( + ServerNotification::ItemCompleted(ItemCompletedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item: AppServerThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["Summary only".to_string()], + content: Vec::new(), + }, + }), + /*replay_kind*/ None, + ); + + let rendered = match rx.try_recv() { + Ok(AppEvent::InsertHistoryCell(cell)) => { + lines_to_single_string(&cell.transcript_lines(/*width*/ 80)) + } + other => panic!("expected InsertHistoryCell, got {other:?}"), + }; + assert!(rendered.contains("Summary only")); + assert_chatwidget_snapshot!( + "live_reasoning_item_completed_renders_summary_without_prior_delta", + rendered + ); +} + #[tokio::test] async fn replayed_turn_started_does_not_mark_task_running() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;