tui: reconcile final message stream tail

This commit is contained in:
dank-openai
2026-04-23 18:15:43 -04:00
parent be5f34a898
commit d673b72ffc
6 changed files with 73 additions and 50 deletions

View File

@@ -2641,15 +2641,20 @@ impl ChatWidget {
self.request_redraw();
}
fn finalize_completed_assistant_message(&mut self, message: Option<&str>) {
// If we have a stream_controller, the finalized message payload is redundant because the
// visible content has already been accumulated through deltas.
if self.stream_controller.is_none()
&& let Some(message) = message
fn reconcile_answer_stream_final_message(&mut self, message: Option<&str>) {
if let Some(message) = message
&& !message.is_empty()
{
self.handle_streaming_delta(message.to_string());
if let Some(controller) = self.stream_controller.as_mut() {
controller.append_missing_suffix_from_final_message(message);
} else {
self.handle_streaming_delta(message.to_string());
}
}
}
fn finalize_completed_assistant_message(&mut self, message: Option<&str>) {
self.reconcile_answer_stream_final_message(message);
self.flush_answer_stream_with_separator();
self.handle_stream_finished();
self.request_redraw();
@@ -2851,6 +2856,12 @@ impl ChatWidget {
.unwrap_or_default();
self.saw_copy_source_this_turn = false;
// If a stream is currently active, finalize it.
if let Some(message) = last_agent_message.as_deref()
&& !message.is_empty()
&& let Some(controller) = self.stream_controller.as_mut()
{
controller.append_missing_suffix_from_final_message(message);
}
self.flush_answer_stream_with_separator();
if let Some(mut controller) = self.plan_stream_controller.take() {
let (cell, source) = controller.finalize();

View File

@@ -0,0 +1,5 @@
---
source: tui/src/chatwidget/tests/status_and_layout.rs
expression: combined
---
• Here is the result.

View File

@@ -1,5 +0,0 @@
---
source: tui/src/chatwidget/tests.rs
expression: combined
---

View File

@@ -2187,67 +2187,50 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
}
#[tokio::test]
async fn deltas_then_same_final_message_are_rendered_snapshot() {
async fn deltas_then_final_message_repairs_missing_tail_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
});
// Then stream answer deltas, followed by the exact same final message.
// Streamed deltas can be missing the response tail even though TurnComplete carries the full
// final assistant message.
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Here is the result.".to_string()),
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
phase: None,
memory_citation: None,
}),
});
// Snapshot the combined visible content to ensure we render as expected
// when deltas are followed by the identical final message.
let cells = drain_insert_history(&mut rx);
let combined = cells
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<String>();
assert!(
combined.contains("Here is the result."),
"missing reconciled final message tail: {combined}"
);
assert_chatwidget_snapshot!(
"deltas_then_same_final_message_are_rendered_snapshot",
"deltas_then_final_message_repairs_missing_tail_snapshot",
combined
);
}

View File

@@ -79,6 +79,23 @@ impl MarkdownStreamCollector {
self.buffer.push_str(delta);
}
/// Append the suffix that is present in the finalized message but missing from streamed
/// deltas.
pub fn append_missing_suffix_from_final_message(&mut self, final_message: &str) {
if final_message.starts_with(&self.buffer) {
let suffix = &final_message[self.buffer.len()..];
if !suffix.is_empty() {
self.push_delta(suffix);
}
} else {
tracing::debug!(
streamed_len = self.buffer.len(),
final_len = final_message.len(),
"final assistant message did not extend streamed deltas"
);
}
}
/// Commit newly completed raw markdown source up to the last newline.
///
/// This returns only source that has not been returned by a previous commit. Calling it after a

View File

@@ -71,6 +71,12 @@ impl StreamCore {
false
}
fn append_missing_suffix_from_final_message(&mut self, final_message: &str) {
self.state
.collector
.append_missing_suffix_from_final_message(final_message);
}
fn finalize_remaining(&mut self) -> Vec<Line<'static>> {
let remainder_source = self.state.collector.finalize_and_drain_source();
if !remainder_source.is_empty() {
@@ -242,6 +248,12 @@ impl StreamController {
self.core.push_delta(delta)
}
/// Reconcile the streamed deltas with the finalized assistant message.
pub(crate) fn append_missing_suffix_from_final_message(&mut self, final_message: &str) {
self.core
.append_missing_suffix_from_final_message(final_message);
}
/// Finish the stream and return the final transient cell plus accumulated markdown source.
///
/// The source is `None` only when the stream never accumulated content. Callers that discard the