Compare commits

...

1 Commits

Author SHA1 Message Date
Felipe Coury
63c9a22e19 feat(tui): source-back assistant streaming 2026-04-29 15:13:43 -03:00
10 changed files with 229 additions and 258 deletions

View File

@@ -206,30 +206,6 @@ impl App {
AppEvent::EndInitialHistoryReplayBuffer => {
self.finish_initial_history_replay_buffer(tui);
}
AppEvent::ConsolidateAgentMessage { source, cwd } => {
if !self.terminal_resize_reflow_enabled() {
self.transcript_reflow.clear();
return Ok(AppRunControl::Continue);
}
let end = self.transcript_cells.len();
let start =
trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells);
if start < end {
let consolidated: Arc<dyn HistoryCell> =
Arc::new(history_cell::AgentMarkdownCell::new(source, &cwd));
self.transcript_cells
.splice(start..end, std::iter::once(consolidated.clone()));
if let Some(Overlay::Transcript(t)) = &mut self.overlay {
t.consolidate_cells(start..end, consolidated.clone());
tui.frame_requester().schedule_frame();
}
self.maybe_finish_stream_reflow(tui)?;
} else {
self.maybe_finish_stream_reflow(tui)?;
}
}
AppEvent::ConsolidateProposedPlan(source) => {
if !self.terminal_resize_reflow_enabled() {
self.transcript_reflow.clear();

View File

@@ -355,9 +355,8 @@ impl App {
self.transcript_reflow.clear_pending_reflow();
// Track that a reflow happened during an active stream or while trailing
// unconsolidated AgentMessageCells are still pending consolidation so
// ConsolidateAgentMessage can schedule a follow-up reflow.
// Track that a reflow happened during a stream with transient cells so stream
// consolidation can schedule a follow-up reflow.
let reflow_ran_during_stream =
!self.transcript_cells.is_empty() && self.should_mark_reflow_as_stream_time();
@@ -468,14 +467,11 @@ impl App {
/// Return whether current transcript state should be treated as stream-time resize state.
///
/// The active stream controllers cover normal streaming. The trailing-cell checks cover the
/// narrow window after a controller has stopped but before the app has processed the
/// consolidation event that replaces transient stream cells with source-backed cells.
/// The plan stream controller covers normal plan streaming. The trailing-cell check covers the
/// narrow window after a plan controller has stopped but before the app has processed the
/// consolidation event that replaces transient plan stream cells with source-backed cells.
pub(super) fn should_mark_reflow_as_stream_time(&self) -> bool {
self.chat_widget.has_active_agent_stream()
|| self.chat_widget.has_active_plan_stream()
|| trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells)
< self.transcript_cells.len()
self.chat_widget.has_active_plan_stream()
|| trailing_run_start::<history_cell::ProposedPlanStreamCell>(&self.transcript_cells)
< self.transcript_cells.len()
}

View File

@@ -14,6 +14,7 @@ use crate::chatwidget::tests::make_chatwidget_manual_with_sender;
use crate::chatwidget::tests::set_chatgpt_auth;
use crate::chatwidget::tests::set_fast_mode_test_catalog;
use crate::file_search::FileSearchManager;
use crate::history_cell::AgentMarkdownCell;
use crate::history_cell::AgentMessageCell;
use crate::history_cell::HistoryCell;
use crate::history_cell::PlainHistoryCell;
@@ -68,14 +69,19 @@ use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::items::AgentMessageContent;
use codex_protocol::items::AgentMessageItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile;
use codex_protocol::models::FileSystemPermissions;
use codex_protocol::models::NetworkPermissions;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AgentMessageDeltaEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::NetworkApprovalContext;
use codex_protocol::protocol::NetworkApprovalProtocol;
use codex_protocol::protocol::RolloutItem;
@@ -3961,6 +3967,69 @@ async fn height_shrink_schedules_resize_reflow() {
assert!(app.transcript_reflow.has_pending_reflow());
}
#[tokio::test]
async fn finalized_assistant_stream_is_source_backed_for_resize_reflow() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
enable_terminal_resize_reflow(&mut app);
app.chat_widget.handle_codex_event(Event {
id: "delta-1".to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "draft text".to_string(),
}),
});
assert!(
app_event_rx.try_recv().is_err(),
"assistant deltas should stay in the active cell until completion"
);
app.chat_widget.handle_codex_event(Event {
id: "item-1".to_string(),
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: ThreadId::new(),
turn_id: "turn-1".to_string(),
item: TurnItem::AgentMessage(AgentMessageItem {
id: "assistant-1".to_string(),
content: vec![AgentMessageContent::Text {
text: "final source backed assistant text that wraps differently".to_string(),
}],
phase: None,
memory_citation: None,
}),
}),
});
let cell = match app_event_rx
.try_recv()
.expect("expected finalized assistant history")
{
AppEvent::InsertHistoryCell(cell) => cell,
other => panic!("expected InsertHistoryCell, got {other:?}"),
};
let cell: Arc<dyn HistoryCell> = cell.into();
assert!(
cell.as_any().is::<AgentMarkdownCell>(),
"finalized assistant stream should insert a source-backed cell"
);
app.transcript_cells.push(cell);
let wide = app.render_transcript_lines_for_reflow(/*width*/ 100);
let narrow = app.render_transcript_lines_for_reflow(/*width*/ 28);
assert!(
narrow.lines.len() > wide.lines.len(),
"source-backed transcript cell should re-render at resize width"
);
let narrow_text = narrow
.lines
.iter()
.map(rendered_line_text)
.collect::<Vec<_>>()
.join("\n");
let normalized_narrow_text = narrow_text.split_whitespace().collect::<Vec<_>>().join(" ");
assert!(normalized_narrow_text.contains("final source backed assistant"));
assert!(!normalized_narrow_text.contains("draft text"));
}
fn test_turn(turn_id: &str, status: TurnStatus, items: Vec<ThreadItem>) -> Turn {
Turn {
id: turn_id.to_string(),

View File

@@ -427,19 +427,6 @@ pub(crate) enum AppEvent {
/// Finish buffering initial resume replay after all replay events have been queued.
EndInitialHistoryReplayBuffer,
/// Replace the contiguous run of streaming `AgentMessageCell`s at the end of
/// the transcript with a single `AgentMarkdownCell` that stores the raw
/// markdown source and re-renders from it on resize.
///
/// Emitted by `ChatWidget::flush_answer_stream_with_separator` after stream
/// finalization. The `App` handler walks backward through `transcript_cells`
/// to find the `AgentMessageCell` run and splices in the consolidated cell.
/// The `cwd` keeps local file-link display stable across the final re-render.
ConsolidateAgentMessage {
source: String,
cwd: PathBuf,
},
/// Replace the contiguous run of streaming `ProposedPlanStreamCell`s at the
/// end of the transcript with a single source-backed `ProposedPlanCell`.
///

View File

@@ -412,7 +412,7 @@ use crate::streaming::chunking::AdaptiveChunkingPolicy;
use crate::streaming::commit_tick::CommitTickScope;
use crate::streaming::commit_tick::run_commit_tick;
use crate::streaming::controller::PlanStreamController;
use crate::streaming::controller::StreamController;
use crate::streaming::controller::SourceBackedStreamController;
use chrono::Local;
use codex_file_search::FileMatch;
@@ -833,8 +833,8 @@ pub(crate) struct ChatWidget {
rate_limit_switch_prompt: RateLimitSwitchPromptState,
add_credits_nudge_email_in_flight: Option<AddCreditsNudgeCreditType>,
adaptive_chunking: AdaptiveChunkingPolicy,
// Stream lifecycle controller
stream_controller: Option<StreamController>,
// Source-backed lifecycle controller for assistant output.
stream_controller: Option<SourceBackedStreamController>,
// Stream lifecycle controller for proposed plan output.
plan_stream_controller: Option<PlanStreamController>,
/// Holds the platform clipboard lease so copied text remains available while supported.
@@ -2038,32 +2038,18 @@ impl ChatWidget {
}
fn flush_answer_stream_with_separator(&mut self) {
let had_stream_controller = self.stream_controller.is_some();
if let Some(mut controller) = self.stream_controller.take() {
let (cell, source) = controller.finalize();
if let Some(cell) = cell {
if let Some(controller) = self.stream_controller.take() {
self.active_cell = None;
self.bump_active_cell_revision();
if let Some(cell) = controller.finalize() {
self.add_boxed_history(cell);
}
// Consolidate the run of streaming AgentMessageCells into a single AgentMarkdownCell
// that can re-render from source on resize.
if let Some(source) = source {
self.app_event_tx.send(AppEvent::ConsolidateAgentMessage {
source,
cwd: self.config.cwd.to_path_buf(),
});
}
}
self.adaptive_chunking.reset();
if had_stream_controller && self.stream_controllers_idle() {
self.app_event_tx.send(AppEvent::StopCommitAnimation);
}
}
fn stream_controllers_idle(&self) -> bool {
self.stream_controller
.as_ref()
.map(|controller| controller.queued_lines() == 0)
.unwrap_or(true)
self.stream_controller.is_none()
&& self
.plan_stream_controller
.as_ref()
@@ -2594,13 +2580,14 @@ impl ChatWidget {
}
fn finalize_completed_assistant_message(&mut self, message: Option<&str>) {
// If we have a stream_controller, the finalized message payload is redundant because the
// visible content has already been accumulated through deltas.
if self.stream_controller.is_none()
&& let Some(message) = message
&& !message.is_empty()
{
self.handle_streaming_delta(message.to_string());
if let Some(message) = message.filter(|message| !message.is_empty()) {
if let Some(controller) = self.stream_controller.as_mut() {
controller.set_markdown(message.to_string());
self.active_cell = Some(Box::new(controller.active_cell()));
self.bump_active_cell_revision();
} else {
self.handle_streaming_delta(message.to_string());
}
}
self.flush_answer_stream_with_separator();
self.handle_stream_finished();
@@ -4757,7 +4744,6 @@ impl ChatWidget {
let now = Instant::now();
let outcome = run_commit_tick(
&mut self.adaptive_chunking,
self.stream_controller.as_mut(),
self.plan_stream_controller.as_mut(),
scope,
now,
@@ -4810,11 +4796,11 @@ impl ChatWidget {
#[inline]
fn handle_streaming_delta(&mut self, delta: String) {
// Before streaming agent content, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();
if self.stream_controller.is_none() {
// Before streaming agent content, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();
// If the previous turn inserted non-stream history (exec output, patch status, MCP
// calls), render a separator before starting the next streamed assistant message.
if self.needs_final_message_separator && self.had_work_activity {
@@ -4826,17 +4812,14 @@ impl ChatWidget {
// Reset the flag even if we don't show separator (no work was done)
self.needs_final_message_separator = false;
}
self.stream_controller = Some(StreamController::new(
self.current_stream_width(/*reserved_cols*/ 2),
&self.config.cwd,
));
self.stream_controller = Some(SourceBackedStreamController::new(&self.config.cwd));
}
if let Some(controller) = self.stream_controller.as_mut()
&& controller.push(&delta)
{
self.app_event_tx.send(AppEvent::StartCommitAnimation);
self.run_catch_up_commit_tick();
if let Some(controller) = self.stream_controller.as_mut() {
controller.push(&delta);
self.active_cell = Some(Box::new(controller.active_cell()));
self.bump_active_cell_revision();
}
self.bottom_pane.hide_status_indicator();
self.request_redraw();
}
@@ -11365,11 +11348,7 @@ impl ChatWidget {
pub(crate) fn on_terminal_resize(&mut self, width: u16) {
let had_rendered_width = self.last_rendered_width.get().is_some();
self.last_rendered_width.set(Some(width as usize));
let stream_width = self.current_stream_width(/*reserved_cols*/ 2);
let plan_stream_width = self.current_stream_width(/*reserved_cols*/ 4);
if let Some(controller) = self.stream_controller.as_mut() {
controller.set_width(stream_width);
}
if let Some(controller) = self.plan_stream_controller.as_mut() {
controller.set_width(plan_stream_width);
}
@@ -11378,11 +11357,6 @@ impl ChatWidget {
}
}
/// Whether an agent message stream is active (not a plan stream).
pub(crate) fn has_active_agent_stream(&self) -> bool {
self.stream_controller.is_some()
}
/// Whether a proposed-plan stream is active.
pub(crate) fn has_active_plan_stream(&self) -> bool {
self.plan_stream_controller.is_some()
@@ -11538,9 +11512,6 @@ impl ChatWidget {
if matches!(op.view(), crate::app_command::AppCommandView::Interrupt)
&& self.agent_turn_running
{
if let Some(controller) = self.stream_controller.as_mut() {
controller.clear_queue();
}
if let Some(controller) = self.plan_stream_controller.as_mut() {
controller.clear_queue();
}

View File

@@ -2987,6 +2987,56 @@ printf 'fenced within fenced\n'
);
}
#[tokio::test]
async fn assistant_delta_updates_source_backed_active_cell_without_history_insert() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.handle_codex_event(Event {
id: "delta-1".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "This is a long streamed paragraph that should reflow from markdown source while it is still active.".to_string(),
}),
});
assert!(
drain_insert_history(&mut rx).is_empty(),
"streaming deltas should render through the active cell until completion"
);
let narrow = chat
.active_cell_transcript_lines(/*width*/ 28)
.expect("active stream should have transcript lines");
let wide = chat
.active_cell_transcript_lines(/*width*/ 100)
.expect("active stream should have transcript lines");
assert!(
narrow.len() > wide.len(),
"active stream should re-render from source at the requested width"
);
}
#[tokio::test]
async fn completed_assistant_item_replaces_streamed_draft_before_history_flush() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.handle_codex_event(Event {
id: "delta-1".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "draft text".to_string(),
}),
});
complete_assistant_message(
&mut chat,
"msg-1",
"final **markdown** text",
/*phase*/ None,
);
let rendered = lines_to_single_string(&drain_insert_history(&mut rx).concat());
assert!(rendered.contains("final markdown text"));
assert!(!rendered.contains("draft text"));
assert!(chat.active_cell_transcript_lines(/*width*/ 80).is_none());
}
#[tokio::test]
async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;

View File

@@ -455,11 +455,13 @@ impl HistoryCell for ReasoningSummaryCell {
}
#[derive(Debug)]
#[cfg(test)]
pub(crate) struct AgentMessageCell {
lines: Vec<Line<'static>>,
is_first_line: bool,
}
#[cfg(test)]
impl AgentMessageCell {
pub(crate) fn new(lines: Vec<Line<'static>>, is_first_line: bool) -> Self {
Self {
@@ -469,6 +471,7 @@ impl AgentMessageCell {
}
}
#[cfg(test)]
impl HistoryCell for AgentMessageCell {
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
adaptive_wrap_lines(
@@ -490,10 +493,8 @@ impl HistoryCell for AgentMessageCell {
/// A consolidated agent message cell that stores raw markdown source and re-renders from it.
///
/// After a stream finalizes, the `ConsolidateAgentMessage` handler in `App`
/// replaces the contiguous run of `AgentMessageCell`s with a single
/// `AgentMarkdownCell`. On terminal resize, `display_lines(width)` re-renders
/// from source via `append_markdown`.
/// The chat widget uses this both while assistant output is live and after the stream finalizes.
/// On terminal resize, `display_lines(width)` re-renders from source via `append_markdown`.
///
/// The cell snapshots `cwd` at construction so local file-link display remains aligned with the
/// session that produced the message. Reusing the current process cwd during reflow would make old
@@ -5156,80 +5157,4 @@ mod tests {
"word_wrap_lines should not alter lines that already fit within width"
);
}
/// Simulate the consolidation backward-walk logic from `App::handle_event`
/// to verify it correctly identifies and replaces `AgentMessageCell` runs.
#[test]
fn consolidation_walker_replaces_agent_message_cells() {
use std::sync::Arc;
// Build a transcript with: [UserCell, AgentMsg(head), AgentMsg(cont), AgentMsg(cont)]
let user = Arc::new(UserHistoryCell {
message: "hello".to_string(),
text_elements: Vec::new(),
local_image_paths: Vec::new(),
remote_image_urls: Vec::new(),
}) as Arc<dyn HistoryCell>;
let head = Arc::new(AgentMessageCell::new(
vec![Line::from("line 1")],
/*is_first_line*/ true,
)) as Arc<dyn HistoryCell>;
let cont1 = Arc::new(AgentMessageCell::new(
vec![Line::from("line 2")],
/*is_first_line*/ false,
)) as Arc<dyn HistoryCell>;
let cont2 = Arc::new(AgentMessageCell::new(
vec![Line::from("line 3")],
/*is_first_line*/ false,
)) as Arc<dyn HistoryCell>;
let mut transcript_cells: Vec<Arc<dyn HistoryCell>> =
vec![user.clone(), head, cont1, cont2];
// Run the same consolidation logic as the handler.
let source = "line 1\nline 2\nline 3\n".to_string();
let end = transcript_cells.len();
let mut start = end;
while start > 0
&& transcript_cells[start - 1].is_stream_continuation()
&& transcript_cells[start - 1]
.as_any()
.is::<AgentMessageCell>()
{
start -= 1;
}
if start > 0
&& transcript_cells[start - 1]
.as_any()
.is::<AgentMessageCell>()
&& !transcript_cells[start - 1].is_stream_continuation()
{
start -= 1;
}
assert_eq!(
start, 1,
"should find all 3 agent cells starting at index 1"
);
assert_eq!(end, 4);
// Splice.
let consolidated: Arc<dyn HistoryCell> =
Arc::new(AgentMarkdownCell::new(source, &test_cwd()));
transcript_cells.splice(start..end, std::iter::once(consolidated));
assert_eq!(transcript_cells.len(), 2, "should be [user, consolidated]");
// Verify first cell is still the user cell.
assert!(
transcript_cells[0].as_any().is::<UserHistoryCell>(),
"first cell should be UserHistoryCell"
);
// Verify second cell is AgentMarkdownCell.
assert!(
transcript_cells[1].as_any().is::<AgentMarkdownCell>(),
"second cell should be AgentMarkdownCell"
);
}
}

View File

@@ -555,11 +555,10 @@ impl TranscriptOverlay {
/// Replace a range of committed cells with a single consolidated cell.
///
/// Mirrors the splice performed on `App::transcript_cells` during
/// `ConsolidateAgentMessage` so the Ctrl+T overlay stays in sync with the
/// main transcript. The range is clamped defensively: cells may have been
/// inserted after the overlay opened, leaving it with fewer entries than
/// the main transcript.
/// Mirrors source-backed stream consolidation on `App::transcript_cells` so the Ctrl+T overlay
/// stays in sync with the main transcript. The range is clamped defensively: cells may have
/// been inserted after the overlay opened, leaving it with fewer entries than the main
/// transcript.
pub(crate) fn consolidate_cells(
&mut self,
range: std::ops::Range<usize>,

View File

@@ -1,7 +1,7 @@
//! Orchestrates commit-tick drains across streaming controllers.
//! Orchestrates commit-tick drains for queue-backed plan streaming.
//!
//! This module bridges queue-based chunking policy (`chunking`) with the concrete stream
//! controllers (`controller`). Callers provide the current controllers and tick scope; the module
//! controller (`controller`). Callers provide the current controller and tick scope; the module
//! computes queue pressure, selects a drain plan, applies it, and returns emitted history cells.
//!
//! The module preserves ordering by draining only from controller queue heads. It does not schedule
@@ -24,7 +24,6 @@ use super::chunking::ChunkingMode;
use super::chunking::DrainPlan;
use super::chunking::QueueSnapshot;
use super::controller::PlanStreamController;
use super::controller::StreamController;
/// Describes whether a commit tick may run in all modes or only in catch-up mode.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -59,53 +58,38 @@ impl Default for CommitTickOutput {
}
}
/// Runs one commit tick against the provided stream controllers.
/// Runs one commit tick against the provided plan stream controller.
///
/// This function collects a [`QueueSnapshot`], asks [`AdaptiveChunkingPolicy`] for a
/// [`ChunkingDecision`], and then applies the resulting [`DrainPlan`] to both controllers.
/// If callers pass stale controller references (for example, references not tied to the
/// [`ChunkingDecision`], and then applies the resulting [`DrainPlan`] to the controller.
/// If callers pass a stale controller reference (for example, one not tied to the
/// current turn), queue age can be misread and the policy may stay in catch-up longer
/// than expected.
pub(crate) fn run_commit_tick(
policy: &mut AdaptiveChunkingPolicy,
stream_controller: Option<&mut StreamController>,
plan_stream_controller: Option<&mut PlanStreamController>,
scope: CommitTickScope,
now: Instant,
) -> CommitTickOutput {
let snapshot = stream_queue_snapshot(
stream_controller.as_deref(),
plan_stream_controller.as_deref(),
now,
);
let snapshot = stream_queue_snapshot(plan_stream_controller.as_deref(), now);
let decision = resolve_chunking_plan(policy, snapshot, now);
if scope == CommitTickScope::CatchUpOnly && decision.mode != ChunkingMode::CatchUp {
return CommitTickOutput::default();
}
apply_commit_tick_plan(
decision.drain_plan,
stream_controller,
plan_stream_controller,
)
apply_commit_tick_plan(decision.drain_plan, plan_stream_controller)
}
/// Builds the combined queue-pressure snapshot consumed by chunking policy.
///
/// The snapshot sums queue depth across controllers and keeps the maximum oldest age
/// so policy decisions reflect the most delayed queued line currently visible.
/// Policy decisions reflect the most delayed queued plan line currently visible.
fn stream_queue_snapshot(
stream_controller: Option<&StreamController>,
plan_stream_controller: Option<&PlanStreamController>,
now: Instant,
) -> QueueSnapshot {
let mut queued_lines = 0usize;
let mut oldest_age: Option<Duration> = None;
if let Some(controller) = stream_controller {
queued_lines += controller.queued_lines();
oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now));
}
if let Some(controller) = plan_stream_controller {
queued_lines += controller.queued_lines();
oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now));
@@ -141,25 +125,16 @@ fn resolve_chunking_plan(
decision
}
/// Applies a [`DrainPlan`] to all available stream controllers.
/// Applies a [`DrainPlan`] to the available plan stream controller.
///
/// The returned [`CommitTickOutput`] reports emitted cells and whether all
/// present controllers are idle after draining.
/// The returned [`CommitTickOutput`] reports emitted cells and whether the
/// present controller is idle after draining.
fn apply_commit_tick_plan(
drain_plan: DrainPlan,
stream_controller: Option<&mut StreamController>,
plan_stream_controller: Option<&mut PlanStreamController>,
) -> CommitTickOutput {
let mut output = CommitTickOutput::default();
if let Some(controller) = stream_controller {
output.has_controller = true;
let (cell, is_idle) = drain_stream_controller(controller, drain_plan);
if let Some(cell) = cell {
output.cells.push(cell);
}
output.all_idle &= is_idle;
}
if let Some(controller) = plan_stream_controller {
output.has_controller = true;
let (cell, is_idle) = drain_plan_stream_controller(controller, drain_plan);
@@ -172,25 +147,10 @@ fn apply_commit_tick_plan(
output
}
/// Applies one drain step to the main stream controller.
///
/// [`DrainPlan::Single`] maps to one-line drain; [`DrainPlan::Batch`] maps to
/// multi-line drain (including instant catch-up when policy requests the full
/// queued backlog).
fn drain_stream_controller(
controller: &mut StreamController,
drain_plan: DrainPlan,
) -> (Option<Box<dyn HistoryCell>>, bool) {
match drain_plan {
DrainPlan::Single => controller.on_commit_tick(),
DrainPlan::Batch(max_lines) => controller.on_commit_tick_batch(max_lines),
}
}
/// Applies one drain step to the plan stream controller.
///
/// This mirrors [`drain_stream_controller`] so both controller types follow the
/// same chunking policy decisions.
/// [`DrainPlan::Single`] maps to one-line drain; [`DrainPlan::Batch`] maps to multi-line drain
/// including instant catch-up when policy requests the full queued backlog.
fn drain_plan_stream_controller(
controller: &mut PlanStreamController,
drain_plan: DrainPlan,

View File

@@ -210,23 +210,24 @@ impl StreamCore {
}
}
/// Controls newline-gated streaming for assistant messages.
/// Test-only newline-gated streaming controller for transient assistant cells.
///
/// The controller emits transient `AgentMessageCell`s for live display and returns raw markdown
/// source on `finalize` so the app can replace those transient cells with a source-backed
/// `AgentMarkdownCell`. Callers should use `set_width` on terminal resize; rebuilding the queue
/// from already emitted cells would duplicate output instead of preserving the stream position.
/// Production assistant streaming is source-backed. This controller remains as coverage for the
/// lower-level line streamer behavior that the plan stream still shares.
#[cfg(test)]
pub(crate) struct StreamController {
core: StreamCore,
header_emitted: bool,
}
#[cfg(test)]
impl StreamController {
/// Create a stream controller that renders markdown relative to the given width and cwd.
///
/// `width` is the content width available to markdown rendering, not necessarily the full
/// terminal width. Passing a stale width after resize will keep queued live output wrapped for
/// the old viewport until app-level reflow repairs the finalized transcript.
#[cfg(test)]
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
Self {
core: StreamCore::new(width, cwd),
@@ -238,6 +239,7 @@ impl StreamController {
///
/// Deltas are committed only through newline boundaries. A `false` return can still mean source
/// was buffered; it only means no newly renderable complete line is ready for live emission.
#[cfg(test)]
pub(crate) fn push(&mut self, delta: &str) -> bool {
self.core.push_delta(delta)
}
@@ -247,6 +249,7 @@ impl StreamController {
/// The source is `None` only when the stream never accumulated content. Callers that discard the
/// returned source cannot later consolidate the transcript into a width-sensitive finalized
/// cell.
#[cfg(test)]
pub(crate) fn finalize(&mut self) -> (Option<Box<dyn HistoryCell>>, Option<String>) {
let remaining = self.core.finalize_remaining();
if self.core.raw_source.is_empty() {
@@ -277,14 +280,7 @@ impl StreamController {
self.core.queued_lines()
}
pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
self.core.oldest_queued_age(now)
}
pub(crate) fn clear_queue(&mut self) {
self.core.clear_queue();
}
#[cfg(test)]
pub(crate) fn set_width(&mut self, width: Option<usize>) {
self.core.set_width(width);
}
@@ -301,11 +297,53 @@ impl StreamController {
}
}
/// Keeps assistant output source-backed while the message is still streaming.
///
/// This controller does not emit transient history cells. The chat widget renders the current
/// source through an active `AgentMarkdownCell`, then commits the same source-backed cell once the
/// stream finishes.
pub(crate) struct SourceBackedStreamController {
markdown_source: String,
cwd: PathBuf,
}
impl SourceBackedStreamController {
pub(crate) fn new(cwd: &Path) -> Self {
Self {
markdown_source: String::new(),
cwd: cwd.to_path_buf(),
}
}
pub(crate) fn push(&mut self, delta: &str) {
self.markdown_source.push_str(delta);
}
pub(crate) fn set_markdown(&mut self, markdown_source: String) {
self.markdown_source = markdown_source;
}
pub(crate) fn active_cell(&self) -> history_cell::AgentMarkdownCell {
history_cell::AgentMarkdownCell::new(self.markdown_source.clone(), self.cwd.as_path())
}
pub(crate) fn finalize(self) -> Option<Box<dyn HistoryCell>> {
if self.markdown_source.trim().is_empty() {
return None;
}
Some(Box::new(history_cell::AgentMarkdownCell::new(
self.markdown_source,
self.cwd.as_path(),
)))
}
}
/// Controls newline-gated streaming for proposed plan markdown.
///
/// This follows the same source-retention contract as `StreamController`, but wraps emitted lines
/// in the proposed-plan header, padding, and style. Finalization must return source for
/// `ProposedPlanCell`; otherwise a resized finalized plan would keep the transient stream shape.
/// This follows the same source-retention contract as the lower-level stream core, but wraps
/// emitted lines in the proposed-plan header, padding, and style. Finalization must return source
/// for `ProposedPlanCell`; otherwise a resized finalized plan would keep the transient stream
/// shape.
pub(crate) struct PlanStreamController {
core: StreamCore,
header_emitted: bool,
@@ -315,8 +353,8 @@ pub(crate) struct PlanStreamController {
impl PlanStreamController {
/// Create a proposed-plan stream controller that renders markdown relative to the given cwd.
///
/// The width has the same meaning as in `StreamController`: it is the markdown body width, and
/// callers must update it when the terminal width changes.
/// The width is the markdown body width, and callers must update it when the terminal width
/// changes.
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
Self {
core: StreamCore::new(width, cwd),