Compare commits

...

1 Commits

Author SHA1 Message Date
Felipe Coury
92bd059f42 Render assistant streams from markdown source 2026-04-23 16:47:59 -03:00
4 changed files with 164 additions and 35 deletions

View File

@@ -392,7 +392,7 @@ use crate::streaming::chunking::AdaptiveChunkingPolicy;
use crate::streaming::commit_tick::CommitTickScope;
use crate::streaming::commit_tick::run_commit_tick;
use crate::streaming::controller::PlanStreamController;
use crate::streaming::controller::StreamController;
use crate::streaming::controller::SourceBackedStreamController;
use chrono::Local;
use codex_file_search::FileMatch;
@@ -813,8 +813,8 @@ pub(crate) struct ChatWidget {
rate_limit_switch_prompt: RateLimitSwitchPromptState,
add_credits_nudge_email_in_flight: Option<AddCreditsNudgeCreditType>,
adaptive_chunking: AdaptiveChunkingPolicy,
// Stream lifecycle controller
stream_controller: Option<StreamController>,
// Source-backed lifecycle controller for assistant output.
stream_controller: Option<SourceBackedStreamController>,
// Stream lifecycle controller for proposed plan output.
plan_stream_controller: Option<PlanStreamController>,
/// Holds the platform clipboard lease so copied text remains available while supported.
@@ -1833,19 +1833,18 @@ impl ChatWidget {
}
fn flush_answer_stream_with_separator(&mut self) {
if let Some(mut controller) = self.stream_controller.take()
&& let Some(cell) = controller.finalize()
{
self.add_boxed_history(cell);
if let Some(controller) = self.stream_controller.take() {
self.active_cell = None;
self.bump_active_cell_revision();
if let Some(cell) = controller.finalize() {
self.add_boxed_history(cell);
}
}
self.adaptive_chunking.reset();
}
fn stream_controllers_idle(&self) -> bool {
self.stream_controller
.as_ref()
.map(|controller| controller.queued_lines() == 0)
.unwrap_or(true)
self.stream_controller.is_none()
&& self
.plan_stream_controller
.as_ref()
@@ -2366,13 +2365,14 @@ impl ChatWidget {
}
fn finalize_completed_assistant_message(&mut self, message: Option<&str>) {
// If we have a stream_controller, the finalized message payload is redundant because the
// visible content has already been accumulated through deltas.
if self.stream_controller.is_none()
&& let Some(message) = message
&& !message.is_empty()
{
self.handle_streaming_delta(message.to_string());
if let Some(message) = message.filter(|message| !message.is_empty()) {
if let Some(controller) = self.stream_controller.as_mut() {
controller.set_markdown(message.to_string());
self.active_cell = Some(Box::new(controller.active_cell()));
self.bump_active_cell_revision();
} else {
self.handle_streaming_delta(message.to_string());
}
}
self.flush_answer_stream_with_separator();
self.handle_stream_finished();
@@ -4627,7 +4627,7 @@ impl ChatWidget {
let now = Instant::now();
let outcome = run_commit_tick(
&mut self.adaptive_chunking,
self.stream_controller.as_mut(),
/*stream_controller*/ None,
self.plan_stream_controller.as_mut(),
scope,
now,
@@ -4680,11 +4680,11 @@ impl ChatWidget {
#[inline]
fn handle_streaming_delta(&mut self, delta: String) {
// Before streaming agent content, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();
if self.stream_controller.is_none() {
// Before streaming agent content, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();
// If the previous turn inserted non-stream history (exec output, patch status, MCP
// calls), render a separator before starting the next streamed assistant message.
if self.needs_final_message_separator && self.had_work_activity {
@@ -4703,16 +4703,12 @@ impl ChatWidget {
// Reset the flag even if we don't show separator (no work was done)
self.needs_final_message_separator = false;
}
self.stream_controller = Some(StreamController::new(
self.last_rendered_width.get().map(|w| w.saturating_sub(2)),
&self.config.cwd,
));
self.stream_controller = Some(SourceBackedStreamController::new(&self.config.cwd));
}
if let Some(controller) = self.stream_controller.as_mut()
&& controller.push(&delta)
{
self.app_event_tx.send(AppEvent::StartCommitAnimation);
self.run_catch_up_commit_tick();
if let Some(controller) = self.stream_controller.as_mut() {
controller.push(&delta);
self.active_cell = Some(Box::new(controller.active_cell()));
self.bump_active_cell_revision();
}
self.request_redraw();
}

View File

@@ -2665,6 +2665,56 @@ printf 'fenced within fenced\n'
);
}
#[tokio::test]
async fn assistant_delta_updates_source_backed_active_cell_without_history_insert() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.handle_codex_event(Event {
id: "delta-1".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "This is a long streamed paragraph that should reflow from markdown source while it is still active.".to_string(),
}),
});
assert!(
drain_insert_history(&mut rx).is_empty(),
"streaming deltas should render through the active cell until completion"
);
let narrow = chat
.active_cell_transcript_lines(/*width*/ 28)
.expect("active stream should have transcript lines");
let wide = chat
.active_cell_transcript_lines(/*width*/ 100)
.expect("active stream should have transcript lines");
assert!(
narrow.len() > wide.len(),
"active stream should re-render from source at the requested width"
);
}
#[tokio::test]
async fn completed_assistant_item_replaces_streamed_draft_before_history_flush() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.handle_codex_event(Event {
id: "delta-1".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "draft text".to_string(),
}),
});
complete_assistant_message(
&mut chat,
"msg-1",
"final **markdown** text",
/*phase*/ None,
);
let rendered = lines_to_single_string(&drain_insert_history(&mut rx).concat());
assert!(rendered.contains("final markdown text"));
assert!(!rendered.contains("draft text"));
assert!(chat.active_cell_transcript_lines(/*width*/ 80).is_none());
}
#[tokio::test]
async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;

View File

@@ -486,6 +486,51 @@ impl HistoryCell for AgentMessageCell {
}
}
#[derive(Debug, Clone)]
pub(crate) struct AgentMarkdownCell {
markdown: String,
cwd: PathBuf,
}
impl AgentMarkdownCell {
pub(crate) fn new(markdown: String, cwd: &Path) -> Self {
Self {
markdown,
cwd: cwd.to_path_buf(),
}
}
fn lines(&self, width: u16) -> Vec<Line<'static>> {
if self.markdown.is_empty() {
return Vec::new();
}
const AGENT_MARKDOWN_INDENT_WIDTH: usize = 2;
let mut lines = Vec::new();
append_markdown(
&self.markdown,
Some(
(width as usize)
.saturating_sub(AGENT_MARKDOWN_INDENT_WIDTH)
.max(1),
),
Some(self.cwd.as_path()),
&mut lines,
);
adaptive_wrap_lines(
&lines,
RtOptions::new(width as usize)
.initial_indent("".dim().into())
.subsequent_indent(" ".into()),
)
}
}
impl HistoryCell for AgentMarkdownCell {
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
self.lines(width)
}
}
#[derive(Debug)]
pub(crate) struct PlainHistoryCell {
lines: Vec<Line<'static>>,

View File

@@ -5,6 +5,7 @@ use crate::style::proposed_plan_style;
use ratatui::prelude::Stylize;
use ratatui::text::Line;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;
@@ -14,7 +15,6 @@ use super::StreamState;
/// commit animation across streams.
pub(crate) struct StreamController {
state: StreamState,
finishing_after_drain: bool,
header_emitted: bool,
}
@@ -23,15 +23,16 @@ impl StreamController {
///
/// The controller snapshots the path into stream state so later commit ticks and finalization
/// render against the same session cwd that was active when streaming started.
#[cfg(test)]
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
Self {
state: StreamState::new(width, cwd),
finishing_after_drain: false,
header_emitted: false,
}
}
/// Push a delta; if it contains a newline, commit completed lines and start animation.
#[cfg(test)]
pub(crate) fn push(&mut self, delta: &str) -> bool {
let state = &mut self.state;
if !delta.is_empty() {
@@ -49,6 +50,7 @@ impl StreamController {
}
/// Finalize the active stream. Drain and emit now.
#[cfg(test)]
pub(crate) fn finalize(&mut self) -> Option<Box<dyn HistoryCell>> {
// Finalize collector first.
let remaining = {
@@ -68,7 +70,6 @@ impl StreamController {
// Cleanup
self.state.clear();
self.finishing_after_drain = false;
self.emit(out_lines)
}
@@ -112,6 +113,43 @@ impl StreamController {
}
}
/// Controller that keeps assistant streaming output source-backed while it is live.
pub(crate) struct SourceBackedStreamController {
markdown: String,
cwd: PathBuf,
}
impl SourceBackedStreamController {
pub(crate) fn new(cwd: &Path) -> Self {
Self {
markdown: String::new(),
cwd: cwd.to_path_buf(),
}
}
pub(crate) fn push(&mut self, delta: &str) {
self.markdown.push_str(delta);
}
pub(crate) fn set_markdown(&mut self, markdown: String) {
self.markdown = markdown;
}
pub(crate) fn active_cell(&self) -> history_cell::AgentMarkdownCell {
history_cell::AgentMarkdownCell::new(self.markdown.clone(), self.cwd.as_path())
}
pub(crate) fn finalize(self) -> Option<Box<dyn HistoryCell>> {
if self.markdown.trim().is_empty() {
return None;
}
Some(Box::new(history_cell::AgentMarkdownCell::new(
self.markdown,
self.cwd.as_path(),
)))
}
}
/// Controller that streams proposed plan markdown into a styled plan block.
pub(crate) struct PlanStreamController {
state: StreamState,