Compare commits

...

1 Commits

Author SHA1 Message Date
aibrahim-oai
320aec304a feat: stream agent message deltas 2025-07-31 21:13:10 -07:00
4 changed files with 180 additions and 41 deletions

View File

@@ -28,6 +28,11 @@ use std::sync::mpsc::channel;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
enum PendingHistoryOp {
Insert(Vec<Line<'static>>),
Overwrite(Line<'static>),
}
/// Time window for debouncing redraw requests. /// Time window for debouncing redraw requests.
const REDRAW_DEBOUNCE: Duration = Duration::from_millis(10); const REDRAW_DEBOUNCE: Duration = Duration::from_millis(10);
@@ -57,7 +62,7 @@ pub(crate) struct App<'a> {
/// True when a redraw has been scheduled but not yet executed. /// True when a redraw has been scheduled but not yet executed.
pending_redraw: Arc<AtomicBool>, pending_redraw: Arc<AtomicBool>,
pending_history_lines: Vec<Line<'static>>, pending_history_ops: Vec<PendingHistoryOp>,
/// Stored parameters needed to instantiate the ChatWidget later, e.g., /// Stored parameters needed to instantiate the ChatWidget later, e.g.,
/// after dismissing the Git-repo warning. /// after dismissing the Git-repo warning.
@@ -163,7 +168,7 @@ impl App<'_> {
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone()); let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
Self { Self {
app_event_tx, app_event_tx,
pending_history_lines: Vec::new(), pending_history_ops: Vec::new(),
app_event_rx, app_event_rx,
app_state, app_state,
config, config,
@@ -210,7 +215,13 @@ impl App<'_> {
while let Ok(event) = self.app_event_rx.recv() { while let Ok(event) = self.app_event_rx.recv() {
match event { match event {
AppEvent::InsertHistory(lines) => { AppEvent::InsertHistory(lines) => {
self.pending_history_lines.extend(lines); self.pending_history_ops
.push(PendingHistoryOp::Insert(lines));
self.app_event_tx.send(AppEvent::RequestRedraw);
}
AppEvent::OverwriteHistoryLine(line) => {
self.pending_history_ops
.push(PendingHistoryOp::Overwrite(line));
self.app_event_tx.send(AppEvent::RequestRedraw); self.app_event_tx.send(AppEvent::RequestRedraw);
} }
AppEvent::RequestRedraw => { AppEvent::RequestRedraw => {
@@ -424,12 +435,17 @@ impl App<'_> {
terminal.clear()?; terminal.clear()?;
terminal.set_viewport_area(area); terminal.set_viewport_area(area);
} }
if !self.pending_history_lines.is_empty() { if !self.pending_history_ops.is_empty() {
crate::insert_history::insert_history_lines( for op in self.pending_history_ops.drain(..) {
terminal, match op {
self.pending_history_lines.clone(), PendingHistoryOp::Insert(lines) => {
); crate::insert_history::insert_history_lines(terminal, lines);
self.pending_history_lines.clear(); }
PendingHistoryOp::Overwrite(line) => {
crate::insert_history::overwrite_last_history_line(line);
}
}
}
} }
match &mut self.app_state { match &mut self.app_state {
AppState::Chat { widget } => { AppState::Chat { widget } => {

View File

@@ -48,4 +48,6 @@ pub(crate) enum AppEvent {
}, },
InsertHistory(Vec<Line<'static>>), InsertHistory(Vec<Line<'static>>),
/// Overwrite the last line in the scrollback with the provided content.
OverwriteHistoryLine(Line<'static>),
} }

View File

@@ -28,8 +28,10 @@ use crossterm::event::KeyEvent;
use crossterm::event::KeyEventKind; use crossterm::event::KeyEventKind;
use ratatui::buffer::Buffer; use ratatui::buffer::Buffer;
use ratatui::layout::Rect; use ratatui::layout::Rect;
use ratatui::text::Line;
use ratatui::widgets::Widget; use ratatui::widgets::Widget;
use ratatui::widgets::WidgetRef; use ratatui::widgets::WidgetRef;
use ratatui::style::Stylize;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
@@ -43,6 +45,7 @@ use crate::exec_command::strip_bash_lc_and_escape;
use crate::history_cell::CommandOutput; use crate::history_cell::CommandOutput;
use crate::history_cell::HistoryCell; use crate::history_cell::HistoryCell;
use crate::history_cell::PatchEventType; use crate::history_cell::PatchEventType;
use crate::markdown::append_markdown;
use crate::user_approval_widget::ApprovalRequest; use crate::user_approval_widget::ApprovalRequest;
use codex_file_search::FileMatch; use codex_file_search::FileMatch;
@@ -60,10 +63,13 @@ pub(crate) struct ChatWidget<'a> {
initial_user_message: Option<UserMessage>, initial_user_message: Option<UserMessage>,
token_usage: TokenUsage, token_usage: TokenUsage,
reasoning_buffer: String, reasoning_buffer: String,
// Buffer for streaming assistant answer text; we do not surface partial // Buffer and tracking for streaming assistant reasoning text.
// We wait for the final AgentMessage event and then emit the full text reasoning_inserted_lines: usize,
// at once into scrollback so the history contains a single message. reasoning_header_inserted: bool,
// Buffer and tracking for streaming assistant answer text.
answer_buffer: String, answer_buffer: String,
answer_inserted_lines: usize,
answer_header_inserted: bool,
running_commands: HashMap<String, RunningCommand>, running_commands: HashMap<String, RunningCommand>,
} }
@@ -150,11 +156,21 @@ impl ChatWidget<'_> {
), ),
token_usage: TokenUsage::default(), token_usage: TokenUsage::default(),
reasoning_buffer: String::new(), reasoning_buffer: String::new(),
reasoning_inserted_lines: 0,
reasoning_header_inserted: false,
answer_buffer: String::new(), answer_buffer: String::new(),
answer_inserted_lines: 0,
answer_header_inserted: false,
running_commands: HashMap::new(), running_commands: HashMap::new(),
} }
} }
fn render_markdown_line(&self, text: &str) -> Line<'static> {
let mut lines = Vec::new();
append_markdown(text, &mut lines, &self.config);
lines.into_iter().next().unwrap_or_else(|| Line::from(""))
}
pub fn desired_height(&self, width: u16) -> u16 { pub fn desired_height(&self, width: u16) -> u16 {
self.bottom_pane.desired_height(width) self.bottom_pane.desired_height(width)
} }
@@ -176,6 +192,76 @@ impl ChatWidget<'_> {
self.bottom_pane.handle_paste(text); self.bottom_pane.handle_paste(text);
} }
fn handle_answer_delta(&mut self, delta: &str) {
self.answer_buffer.push_str(delta);
let lines: Vec<&str> = self.answer_buffer.split('\n').collect();
if !self.answer_header_inserted {
let first = *lines.first().unwrap_or(&"");
let mut initial = vec![Line::from("codex".magenta().bold())];
initial.push(self.render_markdown_line(first));
self.app_event_tx.send(AppEvent::InsertHistory(initial));
self.answer_header_inserted = true;
self.answer_inserted_lines = 1;
} else {
let current_count = lines.len();
if current_count > self.answer_inserted_lines {
if self.answer_inserted_lines > 0 {
let idx = self.answer_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
for i in self.answer_inserted_lines..current_count {
let line = self.render_markdown_line(lines[i]);
self.app_event_tx
.send(AppEvent::InsertHistory(vec![line]));
self.answer_inserted_lines += 1;
}
} else if self.answer_inserted_lines > 0 {
let idx = self.answer_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
}
self.request_redraw();
}
fn handle_reasoning_delta(&mut self, delta: &str) {
self.reasoning_buffer.push_str(delta);
let lines: Vec<&str> = self.reasoning_buffer.split('\n').collect();
if !self.reasoning_header_inserted {
let first = *lines.first().unwrap_or(&"");
let mut initial = vec![Line::from("thinking".magenta().italic())];
initial.push(self.render_markdown_line(first));
self.app_event_tx.send(AppEvent::InsertHistory(initial));
self.reasoning_header_inserted = true;
self.reasoning_inserted_lines = 1;
} else {
let current_count = lines.len();
if current_count > self.reasoning_inserted_lines {
if self.reasoning_inserted_lines > 0 {
let idx = self.reasoning_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
for i in self.reasoning_inserted_lines..current_count {
let line = self.render_markdown_line(lines[i]);
self.app_event_tx
.send(AppEvent::InsertHistory(vec![line]));
self.reasoning_inserted_lines += 1;
}
} else if self.reasoning_inserted_lines > 0 {
let idx = self.reasoning_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
}
self.request_redraw();
}
fn add_to_history(&mut self, cell: HistoryCell) { fn add_to_history(&mut self, cell: HistoryCell) {
self.app_event_tx self.app_event_tx
.send(AppEvent::InsertHistory(cell.plain_lines())); .send(AppEvent::InsertHistory(cell.plain_lines()));
@@ -236,46 +322,67 @@ impl ChatWidget<'_> {
self.request_redraw(); self.request_redraw();
} }
EventMsg::AgentMessage(AgentMessageEvent { message }) => { EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// Final assistant answer. Prefer the fully provided message if self.answer_header_inserted {
// from the event; if it is empty fall back to any accumulated if !message.is_empty() && message != self.answer_buffer {
// delta buffer (some providers may only stream deltas and send let tail = message
// an empty final message). .strip_prefix(&self.answer_buffer)
let full = if message.is_empty() { .unwrap_or(&message);
std::mem::take(&mut self.answer_buffer) if !tail.is_empty() {
self.handle_answer_delta(tail);
}
}
self.app_event_tx
.send(AppEvent::InsertHistory(vec![Line::from("")]))
;
} else { } else {
self.answer_buffer.clear(); let full = if message.is_empty() {
message std::mem::take(&mut self.answer_buffer)
}; } else {
if !full.is_empty() { self.answer_buffer.clear();
self.add_to_history(HistoryCell::new_agent_message(&self.config, full)); message
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_message(&self.config, full));
}
} }
self.answer_buffer.clear();
self.answer_inserted_lines = 0;
self.answer_header_inserted = false;
self.request_redraw(); self.request_redraw();
} }
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
// Buffer only do not emit partial lines. This avoids cases self.handle_answer_delta(&delta);
// where long responses appear truncated if the terminal
// wrapped early. The full message is emitted on
// AgentMessage.
self.answer_buffer.push_str(&delta);
} }
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
// Buffer only disable incremental reasoning streaming so we self.handle_reasoning_delta(&delta);
// avoid truncated intermediate lines. Full text emitted on
// AgentReasoning.
self.reasoning_buffer.push_str(&delta);
} }
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => { EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
// Emit full reasoning text once. Some providers might send if self.reasoning_header_inserted {
// final event with empty text if only deltas were used. if !text.is_empty() && text != self.reasoning_buffer {
let full = if text.is_empty() { let tail = text
std::mem::take(&mut self.reasoning_buffer) .strip_prefix(&self.reasoning_buffer)
.unwrap_or(&text);
if !tail.is_empty() {
self.handle_reasoning_delta(tail);
}
}
self.app_event_tx
.send(AppEvent::InsertHistory(vec![Line::from("")]))
;
} else { } else {
self.reasoning_buffer.clear(); let full = if text.is_empty() {
text std::mem::take(&mut self.reasoning_buffer)
}; } else {
if !full.is_empty() { self.reasoning_buffer.clear();
self.add_to_history(HistoryCell::new_agent_reasoning(&self.config, full)); text
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_reasoning(&self.config, full));
}
} }
self.reasoning_buffer.clear();
self.reasoning_inserted_lines = 0;
self.reasoning_header_inserted = false;
self.request_redraw(); self.request_redraw();
} }
EventMsg::TaskStarted => { EventMsg::TaskStarted => {

View File

@@ -5,6 +5,7 @@ use std::io::Write;
use crate::tui; use crate::tui;
use crossterm::Command; use crossterm::Command;
use crossterm::cursor::MoveTo; use crossterm::cursor::MoveTo;
use crossterm::cursor::MoveToColumn;
use crossterm::queue; use crossterm::queue;
use crossterm::style::Color as CColor; use crossterm::style::Color as CColor;
use crossterm::style::Colors; use crossterm::style::Colors;
@@ -13,6 +14,8 @@ use crossterm::style::SetAttribute;
use crossterm::style::SetBackgroundColor; use crossterm::style::SetBackgroundColor;
use crossterm::style::SetColors; use crossterm::style::SetColors;
use crossterm::style::SetForegroundColor; use crossterm::style::SetForegroundColor;
use crossterm::terminal::Clear;
use crossterm::terminal::ClearType;
use ratatui::layout::Size; use ratatui::layout::Size;
use ratatui::prelude::Backend; use ratatui::prelude::Backend;
use ratatui::style::Color; use ratatui::style::Color;
@@ -79,6 +82,17 @@ pub(crate) fn insert_history_lines(terminal: &mut tui::Tui, lines: Vec<Line>) {
} }
} }
/// Overwrite the current line in the history area with new content.
pub(crate) fn overwrite_last_history_line(line: Line) {
queue!(
std::io::stdout(),
MoveToColumn(0),
Clear(ClearType::CurrentLine)
)
.ok();
write_spans(&mut std::io::stdout(), line.iter()).ok();
}
fn wrapped_line_count(lines: &[Line], width: u16) -> u16 { fn wrapped_line_count(lines: &[Line], width: u16) -> u16 {
let mut count = 0; let mut count = 0;
for line in lines { for line in lines {