//! Streaming primitives used by the TUI transcript pipeline. //! //! `StreamState` owns newline-gated markdown collection and a FIFO queue of committed render lines. //! Higher-level modules build on top of this state: //! - `controller` adapts queued lines into `HistoryCell` emission rules for message and plan streams. //! - `chunking` computes adaptive drain plans from queue pressure. //! - `commit_tick` binds policy decisions to concrete controller drains. //! //! The key invariant is queue ordering. All drains pop from the front, and enqueue records an //! arrival timestamp so policy code can reason about oldest queued age without peeking into text. use std::collections::VecDeque; use std::path::Path; use std::time::Duration; use std::time::Instant; use ratatui::text::Line; use crate::markdown_stream::MarkdownStreamCollector; pub(crate) mod chunking; pub(crate) mod commit_tick; pub(crate) mod controller; struct QueuedLine { line: Line<'static>, enqueued_at: Instant, } /// Holds in-flight markdown stream state and queued committed lines. pub(crate) struct StreamState { pub(crate) collector: MarkdownStreamCollector, queued_lines: VecDeque, pub(crate) has_seen_delta: bool, } impl StreamState { /// Create stream state whose markdown collector renders local file links relative to `cwd`. /// /// Controllers are expected to pass the session cwd here once and keep it stable for the /// lifetime of the active stream. pub(crate) fn new(width: Option, cwd: &Path) -> Self { Self { collector: MarkdownStreamCollector::new(width, cwd), queued_lines: VecDeque::new(), has_seen_delta: false, } } /// Resets collector and queue state for the next stream lifecycle. pub(crate) fn clear(&mut self) { self.collector.clear(); self.queued_lines.clear(); self.has_seen_delta = false; } /// Drains one queued line from the front of the queue. pub(crate) fn step(&mut self) -> Vec> { self.queued_lines .pop_front() .map(|queued| queued.line) .into_iter() .collect() } /// Drains up to `max_lines` queued lines from the front of the queue. /// /// Callers that pass very large values still get bounded behavior because this method clamps to /// the currently available queue length. pub(crate) fn drain_n(&mut self, max_lines: usize) -> Vec> { let end = max_lines.min(self.queued_lines.len()); self.queued_lines .drain(..end) .map(|queued| queued.line) .collect() } /// Clears queued lines while keeping collector/turn lifecycle state intact. pub(crate) fn clear_queue(&mut self) { self.queued_lines.clear(); } /// Returns whether no lines are queued for commit. pub(crate) fn is_idle(&self) -> bool { self.queued_lines.is_empty() } /// Returns the current queue depth. pub(crate) fn queued_len(&self) -> usize { self.queued_lines.len() } /// Returns the age of the oldest queued line. pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option { self.queued_lines .front() .map(|queued| now.saturating_duration_since(queued.enqueued_at)) } /// Appends committed lines to the queue with a shared enqueue timestamp. pub(crate) fn enqueue(&mut self, lines: Vec>) { let now = Instant::now(); self.queued_lines .extend(lines.into_iter().map(|line| QueuedLine { line, enqueued_at: now, })); } } #[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; use std::path::PathBuf; fn test_cwd() -> PathBuf { // These tests only need a stable absolute cwd; using temp_dir() avoids baking Unix- or // Windows-specific root semantics into the fixtures. std::env::temp_dir() } #[test] fn drain_n_clamps_to_available_lines() { let mut state = StreamState::new(/*width*/ None, &test_cwd()); state.enqueue(vec![Line::from("one")]); let drained = state.drain_n(/*max_lines*/ 8); assert_eq!(drained, vec![Line::from("one")]); assert!(state.is_idle()); } }