diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index eaba0aea57..82699c9d4f 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -2641,15 +2641,20 @@ impl ChatWidget { self.request_redraw(); } - fn finalize_completed_assistant_message(&mut self, message: Option<&str>) { - // If we have a stream_controller, the finalized message payload is redundant because the - // visible content has already been accumulated through deltas. - if self.stream_controller.is_none() - && let Some(message) = message + fn reconcile_answer_stream_final_message(&mut self, message: Option<&str>) { + if let Some(message) = message && !message.is_empty() { - self.handle_streaming_delta(message.to_string()); + if let Some(controller) = self.stream_controller.as_mut() { + controller.append_missing_suffix_from_final_message(message); + } else { + self.handle_streaming_delta(message.to_string()); + } } + } + + fn finalize_completed_assistant_message(&mut self, message: Option<&str>) { + self.reconcile_answer_stream_final_message(message); self.flush_answer_stream_with_separator(); self.handle_stream_finished(); self.request_redraw(); @@ -2851,6 +2856,12 @@ impl ChatWidget { .unwrap_or_default(); self.saw_copy_source_this_turn = false; // If a stream is currently active, finalize it. + if let Some(message) = last_agent_message.as_deref() + && !message.is_empty() + && let Some(controller) = self.stream_controller.as_mut() + { + controller.append_missing_suffix_from_final_message(message); + } self.flush_answer_stream_with_separator(); if let Some(mut controller) = self.plan_stream_controller.take() { let (cell, source) = controller.finalize(); diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__deltas_then_final_message_repairs_missing_tail_snapshot.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__deltas_then_final_message_repairs_missing_tail_snapshot.snap new file mode 100644 index 0000000000..4d916a33cc --- /dev/null +++ b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__deltas_then_final_message_repairs_missing_tail_snapshot.snap @@ -0,0 +1,5 @@ +--- +source: tui/src/chatwidget/tests/status_and_layout.rs +expression: combined +--- +• Here is the result. diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__deltas_then_same_final_message_are_rendered_snapshot.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__deltas_then_same_final_message_are_rendered_snapshot.snap deleted file mode 100644 index e353cbef2c..0000000000 --- a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__deltas_then_same_final_message_are_rendered_snapshot.snap +++ /dev/null @@ -1,5 +0,0 @@ ---- -source: tui/src/chatwidget/tests.rs -expression: combined ---- - diff --git a/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs b/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs index 12bb0dcb6f..e1098b1f62 100644 --- a/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs +++ b/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs @@ -2187,67 +2187,50 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() { } #[tokio::test] -async fn deltas_then_same_final_message_are_rendered_snapshot() { +async fn deltas_then_final_message_repairs_missing_tail_snapshot() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; - // Stream some reasoning deltas first. chat.handle_codex_event(Event { - id: "s1".into(), - msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { - delta: "I will ".into(), - }), - }); - chat.handle_codex_event(Event { - id: "s1".into(), - msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { - delta: "first analyze the ".into(), - }), - }); - chat.handle_codex_event(Event { - id: "s1".into(), - msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { - delta: "request.".into(), - }), - }); - chat.handle_codex_event(Event { - id: "s1".into(), - msg: EventMsg::AgentReasoning(AgentReasoningEvent { - text: "request.".into(), + id: "turn-1".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + started_at: None, + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, }), }); - // Then stream answer deltas, followed by the exact same final message. + // Streamed deltas can be missing the response tail even though TurnComplete carries the full + // final assistant message. chat.handle_codex_event(Event { id: "s1".into(), msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: "Here is the ".into(), }), }); + chat.handle_codex_event(Event { - id: "s1".into(), - msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { - delta: "result.".into(), + id: "turn-1".into(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), + last_agent_message: Some("Here is the result.".to_string()), + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, }), }); - chat.handle_codex_event(Event { - id: "s1".into(), - msg: EventMsg::AgentMessage(AgentMessageEvent { - message: "Here is the result.".into(), - phase: None, - memory_citation: None, - }), - }); - - // Snapshot the combined visible content to ensure we render as expected - // when deltas are followed by the identical final message. let cells = drain_insert_history(&mut rx); let combined = cells .iter() .map(|lines| lines_to_single_string(lines)) .collect::(); + assert!( + combined.contains("Here is the result."), + "missing reconciled final message tail: {combined}" + ); assert_chatwidget_snapshot!( - "deltas_then_same_final_message_are_rendered_snapshot", + "deltas_then_final_message_repairs_missing_tail_snapshot", combined ); } diff --git a/codex-rs/tui/src/markdown_stream.rs b/codex-rs/tui/src/markdown_stream.rs index 311ea202c4..bf09a9612b 100644 --- a/codex-rs/tui/src/markdown_stream.rs +++ b/codex-rs/tui/src/markdown_stream.rs @@ -79,6 +79,23 @@ impl MarkdownStreamCollector { self.buffer.push_str(delta); } + /// Append the suffix that is present in the finalized message but missing from streamed + /// deltas. + pub fn append_missing_suffix_from_final_message(&mut self, final_message: &str) { + if final_message.starts_with(&self.buffer) { + let suffix = &final_message[self.buffer.len()..]; + if !suffix.is_empty() { + self.push_delta(suffix); + } + } else { + tracing::debug!( + streamed_len = self.buffer.len(), + final_len = final_message.len(), + "final assistant message did not extend streamed deltas" + ); + } + } + /// Commit newly completed raw markdown source up to the last newline. /// /// This returns only source that has not been returned by a previous commit. Calling it after a diff --git a/codex-rs/tui/src/streaming/controller.rs b/codex-rs/tui/src/streaming/controller.rs index 2def4ae8bb..bffe1be7a1 100644 --- a/codex-rs/tui/src/streaming/controller.rs +++ b/codex-rs/tui/src/streaming/controller.rs @@ -71,6 +71,12 @@ impl StreamCore { false } + fn append_missing_suffix_from_final_message(&mut self, final_message: &str) { + self.state + .collector + .append_missing_suffix_from_final_message(final_message); + } + fn finalize_remaining(&mut self) -> Vec> { let remainder_source = self.state.collector.finalize_and_drain_source(); if !remainder_source.is_empty() { @@ -242,6 +248,12 @@ impl StreamController { self.core.push_delta(delta) } + /// Reconcile the streamed deltas with the finalized assistant message. + pub(crate) fn append_missing_suffix_from_final_message(&mut self, final_message: &str) { + self.core + .append_missing_suffix_from_final_message(final_message); + } + /// Finish the stream and return the final transient cell plus accumulated markdown source. /// /// The source is `None` only when the stream never accumulated content. Callers that discard the