fix(tui): reflow streamed markdown tables from source

This commit is contained in:
Felipe Coury
2026-05-01 15:50:24 -03:00
parent 06c3dcd95f
commit 7857672284
4 changed files with 230 additions and 19 deletions

View File

@@ -5,21 +5,24 @@
//! the stored cells as source, clears the Codex-owned terminal history, and re-emits the transcript
//! for the new terminal size.
//!
//! Streaming output is the fragile part of this lifecycle. Active streams first appear as transient
//! stream cells, then consolidate into source-backed finalized cells. Resize work that happens
//! before consolidation is marked as stream-time work so consolidation can force one final rebuild
//! from the finalized source.
//! Streaming output is the fragile part of this lifecycle. Active streams first appear as stream
//! cells that may carry the latest fully-emitted stable markdown source, then consolidate into
//! source-backed finalized cells. Resize work that happens before consolidation uses that stable
//! source when available and is still marked as stream-time work so consolidation can force one
//! final rebuild from the finalized source.
//!
//! The row cap is enforced while rendering from `HistoryCell` source, not after writing to the
//! terminal. Initial resume replay uses the same display-line buffering contract so large sessions
//! do not write more retained rows than resize replay would later be willing to rebuild.
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use codex_features::Feature;
use color_eyre::eyre::Result;
use ratatui::prelude::Stylize;
use ratatui::text::Line;
use super::App;
@@ -27,6 +30,7 @@ use super::InitialHistoryReplayBuffer;
use crate::history_cell;
use crate::history_cell::HistoryCell;
use crate::insert_history::HistoryLineWrapPolicy;
use crate::render::line_utils::prefix_lines;
use crate::transcript_reflow::TRANSCRIPT_REFLOW_DEBOUNCE;
use crate::tui;
@@ -35,6 +39,12 @@ struct ReflowCellDisplay {
is_stream_continuation: bool,
}
struct AgentMessageStreamSource {
source: String,
cwd: PathBuf,
is_first_line: bool,
}
/// Rendered transcript lines ready to be replayed into terminal scrollback.
///
/// This is intentionally line-oriented rather than cell-oriented because the terminal only accepts
@@ -431,7 +441,23 @@ impl App {
let row_cap = self.resize_reflow_max_rows();
let mut cell_displays = VecDeque::new();
let mut rendered_rows = 0usize;
let mut start = self.transcript_cells.len();
let stream_run_start =
trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells);
let stream_source = self
.trailing_agent_message_stream_source(stream_run_start, self.transcript_cells.len());
let mut start = stream_source
.as_ref()
.map(|_| stream_run_start)
.unwrap_or(self.transcript_cells.len());
if let Some(stream_source) = stream_source {
let lines = render_agent_message_stream_source(&stream_source, width);
rendered_rows += lines.len();
cell_displays.push_front(ReflowCellDisplay {
lines,
is_stream_continuation: !stream_source.is_first_line,
});
}
while start > 0 {
start -= 1;
@@ -486,6 +512,37 @@ impl App {
}
}
fn trailing_agent_message_stream_source(
&self,
start: usize,
end: usize,
) -> Option<AgentMessageStreamSource> {
if start == end {
return None;
}
let first_agent_cell = self.transcript_cells[start]
.as_any()
.downcast_ref::<history_cell::AgentMessageCell>()?;
let is_first_line = first_agent_cell.is_first_line();
let mut latest_source = None;
for cell in &self.transcript_cells[start..end] {
let agent_cell = cell
.as_any()
.downcast_ref::<history_cell::AgentMessageCell>()?;
if let Some((source, cwd)) = agent_cell.markdown_source() {
latest_source = Some(AgentMessageStreamSource {
source: source.to_string(),
cwd: cwd.to_path_buf(),
is_first_line,
});
}
}
latest_source
}
/// Return whether current transcript state should be treated as stream-time resize state.
///
/// The active stream controllers cover normal streaming. The trailing-cell checks cover the
@@ -500,3 +557,38 @@ impl App {
< self.transcript_cells.len()
}
}
fn render_agent_message_stream_source(
stream_source: &AgentMessageStreamSource,
width: u16,
) -> Vec<Line<'static>> {
let Some(wrap_width) = crate::width::usable_content_width_u16(width, /*reserved_cols*/ 2)
else {
return prefix_lines(
vec![Line::default()],
if stream_source.is_first_line {
"".dim()
} else {
" ".into()
},
" ".into(),
);
};
let mut lines = Vec::new();
crate::markdown::append_markdown(
&stream_source.source,
Some(wrap_width),
Some(stream_source.cwd.as_path()),
&mut lines,
);
prefix_lines(
lines,
if stream_source.is_first_line {
"".dim()
} else {
" ".into()
},
" ".into(),
)
}

View File

@@ -4019,9 +4019,13 @@ fn rendered_line_text(line: &Line<'static>) -> String {
.collect()
}
fn markdown_table_source() -> &'static str {
"| Area | Result |\n| --- | --- |\n| Streaming resize | This cell contains enough prose to wrap differently across terminal widths while staying in table form. |\n| Scrollback preservation | SENTINEL_TABLE_VALUE_WITH_LONG_UNBREAKABLE_TOKEN |\n"
}
fn markdown_table_cell() -> Arc<dyn HistoryCell> {
Arc::new(AgentMarkdownCell::new(
"| Area | Result |\n| --- | --- |\n| Streaming resize | This cell contains enough prose to wrap differently across terminal widths while staying in table form. |\n| Scrollback preservation | SENTINEL_TABLE_VALUE_WITH_LONG_UNBREAKABLE_TOKEN |\n".to_string(),
markdown_table_source().to_string(),
&std::env::temp_dir(),
)) as Arc<dyn HistoryCell>
}
@@ -4152,6 +4156,53 @@ async fn table_resize_lifecycle_reflow_preserves_scrollback_and_rerenders_width(
);
}
#[tokio::test]
async fn table_resize_lifecycle_stream_reflow_uses_markdown_source_not_transient_table_rows() {
let (mut app, _rx, _op_rx) = make_test_app_with_channels().await;
enable_terminal_resize_reflow(&mut app);
app.config.terminal_resize_reflow.max_rows = TerminalResizeReflowMaxRows::Disabled;
let cwd = std::env::temp_dir();
let source = markdown_table_source();
let wide_body_width =
crate::width::usable_content_width_u16(/*total_width*/ 96, /*reserved_cols*/ 2)
.expect("wide terminal width should leave markdown body room");
let mut controller =
crate::streaming::controller::StreamController::new(Some(wide_body_width), cwd.as_path());
assert!(controller.push(&format!(
"{source}\nTail paragraph keeps the preceding table stable.\n"
)));
loop {
let (cell, idle) = controller.on_commit_tick();
if let Some(cell) = cell {
app.transcript_cells.push(cell.into());
}
if idle {
break;
}
}
assert!(
app.transcript_cells.len() > 1,
"stream should emit stable table lines over multiple transient cells",
);
let reflowed_from_transient = app.render_transcript_lines_for_reflow(/*width*/ 44);
let source_backed = AgentMarkdownCell::new(source.to_string(), cwd.as_path());
let expected_source_backed = source_backed.display_lines(/*width*/ 44);
assert_eq!(
reflowed_from_transient
.lines
.iter()
.map(rendered_line_text)
.collect::<Vec<_>>(),
expected_source_backed
.iter()
.map(rendered_line_text)
.collect::<Vec<_>>(),
"resize reflow during streaming must rebuild tables from markdown source, not re-wrap stale transient table rows",
);
}
#[tokio::test]
async fn initial_replay_buffer_keeps_recent_rows_when_row_cap_present() {
let (mut app, _rx, _op_rx) = make_test_app_with_channels().await;

View File

@@ -539,6 +539,16 @@ impl HistoryCell for ReasoningSummaryCell {
pub(crate) struct AgentMessageCell {
lines: Vec<Line<'static>>,
is_first_line: bool,
/// Stable markdown source that had fully drained by the time this transient stream cell was
/// emitted. Resize reflow uses the latest snapshot in a trailing stream run to rebuild tables
/// from source instead of wrapping already-rendered table borders as plain text.
markdown_source: Option<AgentMessageMarkdownSource>,
}
#[derive(Debug)]
struct AgentMessageMarkdownSource {
source: String,
cwd: PathBuf,
}
impl AgentMessageCell {
@@ -546,8 +556,35 @@ impl AgentMessageCell {
Self {
lines,
is_first_line,
markdown_source: None,
}
}
pub(crate) fn new_with_markdown_source(
lines: Vec<Line<'static>>,
is_first_line: bool,
source: String,
cwd: &Path,
) -> Self {
Self {
lines,
is_first_line,
markdown_source: Some(AgentMessageMarkdownSource {
source,
cwd: cwd.to_path_buf(),
}),
}
}
pub(crate) fn markdown_source(&self) -> Option<(&str, &Path)> {
self.markdown_source
.as_ref()
.map(|source| (source.source.as_str(), source.cwd.as_path()))
}
pub(crate) fn is_first_line(&self) -> bool {
self.is_first_line
}
}
impl HistoryCell for AgentMessageCell {

View File

@@ -7,8 +7,10 @@
//! returns the accumulated source to the app for consolidation.
//!
//! Width changes are handled by re-rendering from source and rebuilding only the not-yet-emitted
//! queue. Already emitted rows stay emitted until the app-level transcript reflow rebuilds the full
//! scrollback from finalized cells.
//! queue. Already emitted rows stay emitted, but the last emitted stable stream cell carries a
//! source snapshot so app-level transcript reflow can rebuild from markdown instead of re-wrapping
//! stale table rows. Finalization still consolidates the stream into one finalized source-backed
//! cell.
use crate::history_cell::HistoryCell;
use crate::history_cell::HistoryRenderMode;
@@ -107,6 +109,14 @@ impl StreamCore {
step
}
fn emitted_stable_source(&self) -> Option<&str> {
if self.stable_source_end > 0 && self.emitted_len >= self.rendered_lines.len() {
Some(&self.raw_source[..self.stable_source_end])
} else {
None
}
}
fn queued_lines(&self) -> usize {
self.state.queued_len()
}
@@ -261,8 +271,10 @@ impl StreamCore {
///
/// The controller emits transient `AgentMessageCell`s for live display and returns raw markdown
/// source on `finalize` so the app can replace those transient cells with a source-backed
/// `AgentMarkdownCell`. Callers should use `set_width` on terminal resize; rebuilding the queue
/// from already emitted cells would duplicate output instead of preserving the stream position.
/// `AgentMarkdownCell`. Cells emitted after a stable prefix is fully drained carry that stable
/// source as a resize-reflow repair hint. Callers should use `set_width` on terminal resize;
/// rebuilding the queue from already emitted cells would duplicate output instead of preserving the
/// stream position.
pub(crate) struct StreamController {
core: StreamCore,
header_emitted: bool,
@@ -316,14 +328,15 @@ impl StreamController {
}
let source = std::mem::take(&mut self.core.raw_source);
let out = self.emit(remaining);
let out = self.emit(remaining, Some(source.clone()));
self.core.reset();
(out, Some(source))
}
pub(crate) fn on_commit_tick(&mut self) -> (Option<Box<dyn HistoryCell>>, bool) {
let step = self.core.tick();
(self.emit(step), self.core.is_idle())
let source = self.core.emitted_stable_source().map(str::to_string);
(self.emit(step, source), self.core.is_idle())
}
pub(crate) fn on_commit_tick_batch(
@@ -331,7 +344,8 @@ impl StreamController {
max_lines: usize,
) -> (Option<Box<dyn HistoryCell>>, bool) {
let step = self.core.tick_batch(max_lines);
(self.emit(step), self.core.is_idle())
let source = self.core.emitted_stable_source().map(str::to_string);
(self.emit(step, source), self.core.is_idle())
}
pub(crate) fn queued_lines(&self) -> usize {
@@ -354,15 +368,32 @@ impl StreamController {
self.core.set_render_mode(render_mode);
}
fn emit(&mut self, lines: Vec<Line<'static>>) -> Option<Box<dyn HistoryCell>> {
fn emit(
&mut self,
lines: Vec<Line<'static>>,
markdown_source: Option<String>,
) -> Option<Box<dyn HistoryCell>> {
if lines.is_empty() {
return None;
}
Some(Box::new(history_cell::AgentMessageCell::new(lines, {
let header_emitted = self.header_emitted;
self.header_emitted = true;
!header_emitted
})))
let header_emitted = self.header_emitted;
self.header_emitted = true;
let is_first_line = !header_emitted;
if let Some(source) = markdown_source {
Some(Box::new(
history_cell::AgentMessageCell::new_with_markdown_source(
lines,
is_first_line,
source,
self.core.cwd.as_path(),
),
))
} else {
Some(Box::new(history_cell::AgentMessageCell::new(
lines,
is_first_line,
)))
}
}
}