Compare commits

...

1 Commits

Author SHA1 Message Date
aibrahim-oai
320aec304a feat: stream agent message deltas 2025-07-31 21:13:10 -07:00
4 changed files with 180 additions and 41 deletions

View File

@@ -28,6 +28,11 @@ use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
enum PendingHistoryOp {
Insert(Vec<Line<'static>>),
Overwrite(Line<'static>),
}
/// Time window for debouncing redraw requests.
const REDRAW_DEBOUNCE: Duration = Duration::from_millis(10);
@@ -57,7 +62,7 @@ pub(crate) struct App<'a> {
/// True when a redraw has been scheduled but not yet executed.
pending_redraw: Arc<AtomicBool>,
pending_history_lines: Vec<Line<'static>>,
pending_history_ops: Vec<PendingHistoryOp>,
/// Stored parameters needed to instantiate the ChatWidget later, e.g.,
/// after dismissing the Git-repo warning.
@@ -163,7 +168,7 @@ impl App<'_> {
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
Self {
app_event_tx,
pending_history_lines: Vec::new(),
pending_history_ops: Vec::new(),
app_event_rx,
app_state,
config,
@@ -210,7 +215,13 @@ impl App<'_> {
while let Ok(event) = self.app_event_rx.recv() {
match event {
AppEvent::InsertHistory(lines) => {
self.pending_history_lines.extend(lines);
self.pending_history_ops
.push(PendingHistoryOp::Insert(lines));
self.app_event_tx.send(AppEvent::RequestRedraw);
}
AppEvent::OverwriteHistoryLine(line) => {
self.pending_history_ops
.push(PendingHistoryOp::Overwrite(line));
self.app_event_tx.send(AppEvent::RequestRedraw);
}
AppEvent::RequestRedraw => {
@@ -424,12 +435,17 @@ impl App<'_> {
terminal.clear()?;
terminal.set_viewport_area(area);
}
if !self.pending_history_lines.is_empty() {
crate::insert_history::insert_history_lines(
terminal,
self.pending_history_lines.clone(),
);
self.pending_history_lines.clear();
if !self.pending_history_ops.is_empty() {
for op in self.pending_history_ops.drain(..) {
match op {
PendingHistoryOp::Insert(lines) => {
crate::insert_history::insert_history_lines(terminal, lines);
}
PendingHistoryOp::Overwrite(line) => {
crate::insert_history::overwrite_last_history_line(line);
}
}
}
}
match &mut self.app_state {
AppState::Chat { widget } => {

View File

@@ -48,4 +48,6 @@ pub(crate) enum AppEvent {
},
InsertHistory(Vec<Line<'static>>),
/// Overwrite the last line in the scrollback with the provided content.
OverwriteHistoryLine(Line<'static>),
}

View File

@@ -28,8 +28,10 @@ use crossterm::event::KeyEvent;
use crossterm::event::KeyEventKind;
use ratatui::buffer::Buffer;
use ratatui::layout::Rect;
use ratatui::text::Line;
use ratatui::widgets::Widget;
use ratatui::widgets::WidgetRef;
use ratatui::style::Stylize;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel;
@@ -43,6 +45,7 @@ use crate::exec_command::strip_bash_lc_and_escape;
use crate::history_cell::CommandOutput;
use crate::history_cell::HistoryCell;
use crate::history_cell::PatchEventType;
use crate::markdown::append_markdown;
use crate::user_approval_widget::ApprovalRequest;
use codex_file_search::FileMatch;
@@ -60,10 +63,13 @@ pub(crate) struct ChatWidget<'a> {
initial_user_message: Option<UserMessage>,
token_usage: TokenUsage,
reasoning_buffer: String,
// Buffer for streaming assistant answer text; we do not surface partial
// We wait for the final AgentMessage event and then emit the full text
// at once into scrollback so the history contains a single message.
// Buffer and tracking for streaming assistant reasoning text.
reasoning_inserted_lines: usize,
reasoning_header_inserted: bool,
// Buffer and tracking for streaming assistant answer text.
answer_buffer: String,
answer_inserted_lines: usize,
answer_header_inserted: bool,
running_commands: HashMap<String, RunningCommand>,
}
@@ -150,11 +156,21 @@ impl ChatWidget<'_> {
),
token_usage: TokenUsage::default(),
reasoning_buffer: String::new(),
reasoning_inserted_lines: 0,
reasoning_header_inserted: false,
answer_buffer: String::new(),
answer_inserted_lines: 0,
answer_header_inserted: false,
running_commands: HashMap::new(),
}
}
fn render_markdown_line(&self, text: &str) -> Line<'static> {
let mut lines = Vec::new();
append_markdown(text, &mut lines, &self.config);
lines.into_iter().next().unwrap_or_else(|| Line::from(""))
}
pub fn desired_height(&self, width: u16) -> u16 {
self.bottom_pane.desired_height(width)
}
@@ -176,6 +192,76 @@ impl ChatWidget<'_> {
self.bottom_pane.handle_paste(text);
}
fn handle_answer_delta(&mut self, delta: &str) {
self.answer_buffer.push_str(delta);
let lines: Vec<&str> = self.answer_buffer.split('\n').collect();
if !self.answer_header_inserted {
let first = *lines.first().unwrap_or(&"");
let mut initial = vec![Line::from("codex".magenta().bold())];
initial.push(self.render_markdown_line(first));
self.app_event_tx.send(AppEvent::InsertHistory(initial));
self.answer_header_inserted = true;
self.answer_inserted_lines = 1;
} else {
let current_count = lines.len();
if current_count > self.answer_inserted_lines {
if self.answer_inserted_lines > 0 {
let idx = self.answer_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
for i in self.answer_inserted_lines..current_count {
let line = self.render_markdown_line(lines[i]);
self.app_event_tx
.send(AppEvent::InsertHistory(vec![line]));
self.answer_inserted_lines += 1;
}
} else if self.answer_inserted_lines > 0 {
let idx = self.answer_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
}
self.request_redraw();
}
fn handle_reasoning_delta(&mut self, delta: &str) {
self.reasoning_buffer.push_str(delta);
let lines: Vec<&str> = self.reasoning_buffer.split('\n').collect();
if !self.reasoning_header_inserted {
let first = *lines.first().unwrap_or(&"");
let mut initial = vec![Line::from("thinking".magenta().italic())];
initial.push(self.render_markdown_line(first));
self.app_event_tx.send(AppEvent::InsertHistory(initial));
self.reasoning_header_inserted = true;
self.reasoning_inserted_lines = 1;
} else {
let current_count = lines.len();
if current_count > self.reasoning_inserted_lines {
if self.reasoning_inserted_lines > 0 {
let idx = self.reasoning_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
for i in self.reasoning_inserted_lines..current_count {
let line = self.render_markdown_line(lines[i]);
self.app_event_tx
.send(AppEvent::InsertHistory(vec![line]));
self.reasoning_inserted_lines += 1;
}
} else if self.reasoning_inserted_lines > 0 {
let idx = self.reasoning_inserted_lines - 1;
let line = self.render_markdown_line(lines[idx]);
self.app_event_tx
.send(AppEvent::OverwriteHistoryLine(line));
}
}
self.request_redraw();
}
fn add_to_history(&mut self, cell: HistoryCell) {
self.app_event_tx
.send(AppEvent::InsertHistory(cell.plain_lines()));
@@ -236,46 +322,67 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// Final assistant answer. Prefer the fully provided message
// from the event; if it is empty fall back to any accumulated
// delta buffer (some providers may only stream deltas and send
// an empty final message).
let full = if message.is_empty() {
std::mem::take(&mut self.answer_buffer)
if self.answer_header_inserted {
if !message.is_empty() && message != self.answer_buffer {
let tail = message
.strip_prefix(&self.answer_buffer)
.unwrap_or(&message);
if !tail.is_empty() {
self.handle_answer_delta(tail);
}
}
self.app_event_tx
.send(AppEvent::InsertHistory(vec![Line::from("")]))
;
} else {
self.answer_buffer.clear();
message
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_message(&self.config, full));
let full = if message.is_empty() {
std::mem::take(&mut self.answer_buffer)
} else {
self.answer_buffer.clear();
message
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_message(&self.config, full));
}
}
self.answer_buffer.clear();
self.answer_inserted_lines = 0;
self.answer_header_inserted = false;
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
// Buffer only do not emit partial lines. This avoids cases
// where long responses appear truncated if the terminal
// wrapped early. The full message is emitted on
// AgentMessage.
self.answer_buffer.push_str(&delta);
self.handle_answer_delta(&delta);
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
// Buffer only disable incremental reasoning streaming so we
// avoid truncated intermediate lines. Full text emitted on
// AgentReasoning.
self.reasoning_buffer.push_str(&delta);
self.handle_reasoning_delta(&delta);
}
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
// Emit full reasoning text once. Some providers might send
// final event with empty text if only deltas were used.
let full = if text.is_empty() {
std::mem::take(&mut self.reasoning_buffer)
if self.reasoning_header_inserted {
if !text.is_empty() && text != self.reasoning_buffer {
let tail = text
.strip_prefix(&self.reasoning_buffer)
.unwrap_or(&text);
if !tail.is_empty() {
self.handle_reasoning_delta(tail);
}
}
self.app_event_tx
.send(AppEvent::InsertHistory(vec![Line::from("")]))
;
} else {
self.reasoning_buffer.clear();
text
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_reasoning(&self.config, full));
let full = if text.is_empty() {
std::mem::take(&mut self.reasoning_buffer)
} else {
self.reasoning_buffer.clear();
text
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_reasoning(&self.config, full));
}
}
self.reasoning_buffer.clear();
self.reasoning_inserted_lines = 0;
self.reasoning_header_inserted = false;
self.request_redraw();
}
EventMsg::TaskStarted => {

View File

@@ -5,6 +5,7 @@ use std::io::Write;
use crate::tui;
use crossterm::Command;
use crossterm::cursor::MoveTo;
use crossterm::cursor::MoveToColumn;
use crossterm::queue;
use crossterm::style::Color as CColor;
use crossterm::style::Colors;
@@ -13,6 +14,8 @@ use crossterm::style::SetAttribute;
use crossterm::style::SetBackgroundColor;
use crossterm::style::SetColors;
use crossterm::style::SetForegroundColor;
use crossterm::terminal::Clear;
use crossterm::terminal::ClearType;
use ratatui::layout::Size;
use ratatui::prelude::Backend;
use ratatui::style::Color;
@@ -79,6 +82,17 @@ pub(crate) fn insert_history_lines(terminal: &mut tui::Tui, lines: Vec<Line>) {
}
}
/// Overwrite the current line in the history area with new content.
pub(crate) fn overwrite_last_history_line(line: Line) {
queue!(
std::io::stdout(),
MoveToColumn(0),
Clear(ClearType::CurrentLine)
)
.ok();
write_spans(&mut std::io::stdout(), line.iter()).ok();
}
fn wrapped_line_count(lines: &[Line], width: u16) -> u16 {
let mut count = 0;
for line in lines {