Merge branch 'dfyp9w-codex/implement-delta-based-streaming-rendering' of github.com:openai/codex into codex/add-raw-chain-of-thought-in-codex-cli

This commit is contained in:
Ahmed Ibrahim
2025-08-03 00:22:49 -07:00
7 changed files with 240 additions and 193 deletions

View File

@@ -1307,6 +1307,7 @@ async fn try_run_turn(
}
};
warn!("ResponseEvent: {event:?}");
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {

View File

@@ -89,13 +89,6 @@ const THIRD_USER_MSG: &str = "next turn";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn summarize_context_three_requests_and_instructions() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Set up a mock server that we can inspect after the run.
let server = MockServer::start().await;

View File

@@ -18,8 +18,7 @@ use crossterm::event::KeyEvent;
use crossterm::event::KeyEventKind;
use crossterm::terminal::supports_keyboard_enhancement;
use ratatui::layout::Offset;
use ratatui::prelude::Backend;
use ratatui::text::Line;
use ratatui::layout::Rect;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -58,8 +57,6 @@ pub(crate) struct App<'a> {
/// True when a redraw has been scheduled but not yet executed.
pending_redraw: Arc<AtomicBool>,
pending_history_lines: Vec<Line<'static>>,
/// Stored parameters needed to instantiate the ChatWidget later, e.g.,
/// after dismissing the Git-repo warning.
chat_args: Option<ChatWidgetArgs>,
@@ -164,7 +161,6 @@ impl App<'_> {
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
Self {
app_event_tx,
pending_history_lines: Vec::new(),
app_event_rx,
app_state,
config,
@@ -211,8 +207,9 @@ impl App<'_> {
while let Ok(event) = self.app_event_rx.recv() {
match event {
AppEvent::InsertHistory(lines) => {
self.pending_history_lines.extend(lines);
self.app_event_tx.send(AppEvent::RequestRedraw);
if let AppState::Chat { widget } = &mut self.app_state {
widget.add_history_lines(lines);
}
}
AppEvent::RequestRedraw => {
self.schedule_redraw();
@@ -413,30 +410,15 @@ impl App<'_> {
}
let size = terminal.size()?;
let desired_height = match &self.app_state {
AppState::Chat { widget } => widget.desired_height(size.width),
AppState::GitWarning { .. } => 10,
let area = Rect {
x: 0,
y: 0,
width: size.width,
height: size.height,
};
let mut area = terminal.viewport_area;
area.height = desired_height.min(size.height);
area.width = size.width;
if area.bottom() > size.height {
terminal
.backend_mut()
.scroll_region_up(0..area.top(), area.bottom() - size.height)?;
area.y = size.height - area.height;
}
if area != terminal.viewport_area {
terminal.clear()?;
terminal.set_viewport_area(area);
}
if !self.pending_history_lines.is_empty() {
crate::insert_history::insert_history_lines(
terminal,
self.pending_history_lines.clone(),
);
self.pending_history_lines.clear();
terminal.clear()?;
}
match &mut self.app_state {
AppState::Chat { widget } => {

View File

@@ -29,10 +29,15 @@ use crossterm::event::KeyEvent;
use crossterm::event::KeyEventKind;
use ratatui::buffer::Buffer;
use ratatui::layout::Rect;
use ratatui::style::Stylize;
use ratatui::text::Line;
use ratatui::widgets::Paragraph;
use ratatui::widgets::Widget;
use ratatui::widgets::WidgetRef;
use ratatui::widgets::Wrap;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel;
use unicode_width::UnicodeWidthStr;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
@@ -44,6 +49,7 @@ use crate::exec_command::strip_bash_lc_and_escape;
use crate::history_cell::CommandOutput;
use crate::history_cell::HistoryCell;
use crate::history_cell::PatchEventType;
use crate::markdown::append_markdown;
use crate::user_approval_widget::ApprovalRequest;
use codex_file_search::FileMatch;
@@ -61,10 +67,18 @@ pub(crate) struct ChatWidget<'a> {
initial_user_message: Option<UserMessage>,
token_usage: TokenUsage,
reasoning_buffer: String,
// Buffer for streaming assistant answer text; we do not surface partial
// We wait for the final AgentMessage event and then emit the full text
// at once into scrollback so the history contains a single message.
/// Buffer for streaming assistant answer text.
answer_buffer: String,
/// Full history rendered by the widget.
history: Vec<Line<'static>>,
/// Index where the current streaming agent message begins in `history`.
current_answer_start: Option<usize>,
/// Number of lines currently occupied by the streaming agent message in `history`.
current_answer_len: usize,
/// Index where the current streaming reasoning message begins in `history`.
current_reasoning_start: Option<usize>,
/// Number of lines currently occupied by the streaming reasoning block in `history`.
current_reasoning_len: usize,
running_commands: HashMap<String, RunningCommand>,
}
@@ -152,14 +166,15 @@ impl ChatWidget<'_> {
token_usage: TokenUsage::default(),
reasoning_buffer: String::new(),
answer_buffer: String::new(),
history: Vec::new(),
current_answer_start: None,
current_answer_len: 0,
current_reasoning_start: None,
current_reasoning_len: 0,
running_commands: HashMap::new(),
}
}
pub fn desired_height(&self, width: u16) -> u16 {
self.bottom_pane.desired_height(width)
}
pub(crate) fn handle_key_event(&mut self, key_event: KeyEvent) {
if key_event.kind == KeyEventKind::Press {
self.bottom_pane.clear_ctrl_c_quit_hint();
@@ -178,8 +193,12 @@ impl ChatWidget<'_> {
}
fn add_to_history(&mut self, cell: HistoryCell) {
self.app_event_tx
.send(AppEvent::InsertHistory(cell.plain_lines()));
self.add_history_lines(cell.plain_lines());
}
pub(crate) fn add_history_lines(&mut self, lines: Vec<Line<'static>>) {
self.history.extend(lines);
self.request_redraw();
}
fn submit_user_message(&mut self, user_message: UserMessage) {
@@ -221,6 +240,7 @@ impl ChatWidget<'_> {
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
tracing::trace!("[TUI] codex_event: {:?}", msg);
match msg {
EventMsg::SessionConfigured(event) => {
self.bottom_pane
@@ -237,10 +257,6 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// Final assistant answer. Prefer the fully provided message
// from the event; if it is empty fall back to any accumulated
// delta buffer (some providers may only stream deltas and send
// an empty final message).
let full = if message.is_empty() {
std::mem::take(&mut self.answer_buffer)
} else {
@@ -248,26 +264,87 @@ impl ChatWidget<'_> {
message
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_message(&self.config, full));
let lines = build_agent_message_lines(&self.config, &full, true);
let new_len = lines.len();
match self.current_answer_start.take() {
Some(start) => {
let old_len = self.current_answer_len;
let end = start.saturating_add(old_len).min(self.history.len());
// Replace just the answer block so we don't drop later content.
self.history.splice(start..end, lines);
// Adjust downstream reasoning block start if it comes after this.
if let Some(rstart) = self.current_reasoning_start {
if rstart > start {
let delta = new_len as isize - old_len as isize;
self.current_reasoning_start =
Some((rstart as isize + delta) as usize);
}
}
self.current_answer_len = 0;
}
None => {
self.history.extend(lines);
}
}
self.request_redraw();
}
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
self.answer_buffer.push_str(&delta);
let lines = build_agent_message_lines(&self.config, &self.answer_buffer, false);
let new_len = lines.len();
match self.current_answer_start {
Some(start) => {
let old_len = self.current_answer_len;
let end = start.saturating_add(old_len).min(self.history.len());
self.history.splice(start..end, lines);
// Adjust downstream reasoning block start if it comes after this.
if let Some(rstart) = self.current_reasoning_start {
if rstart > start {
let delta = new_len as isize - old_len as isize;
self.current_reasoning_start =
Some((rstart as isize + delta) as usize);
}
}
self.current_answer_len = new_len;
}
None => {
self.current_answer_start = Some(self.history.len());
self.current_answer_len = new_len;
self.history.extend(lines);
}
}
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
// Buffer only do not emit partial lines. This avoids cases
// where long responses appear truncated if the terminal
// wrapped early. The full message is emitted on
// AgentMessage.
self.answer_buffer.push_str(&delta);
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
// Buffer only disable incremental reasoning streaming so we
// avoid truncated intermediate lines. Full text emitted on
// AgentReasoning.
self.reasoning_buffer.push_str(&delta);
let lines =
build_agent_reasoning_lines(&self.config, &self.reasoning_buffer, false);
let new_len = lines.len();
match self.current_reasoning_start {
Some(start) => {
let old_len = self.current_reasoning_len;
let end = start.saturating_add(old_len).min(self.history.len());
self.history.splice(start..end, lines);
// Adjust downstream answer block start if it comes after this.
if let Some(astart) = self.current_answer_start {
if astart > start {
let delta = new_len as isize - old_len as isize;
self.current_answer_start =
Some((astart as isize + delta) as usize);
}
}
self.current_reasoning_len = new_len;
}
None => {
self.current_reasoning_start = Some(self.history.len());
self.current_reasoning_len = new_len;
self.history.extend(lines);
}
}
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
// Emit full reasoning text once. Some providers might send
// final event with empty text if only deltas were used.
let full = if text.is_empty() {
std::mem::take(&mut self.reasoning_buffer)
} else {
@@ -275,9 +352,29 @@ impl ChatWidget<'_> {
text
};
if !full.is_empty() {
self.add_to_history(HistoryCell::new_agent_reasoning(&self.config, full));
let lines = build_agent_reasoning_lines(&self.config, &full, true);
let new_len = lines.len();
match self.current_reasoning_start.take() {
Some(start) => {
let old_len = self.current_reasoning_len;
let end = start.saturating_add(old_len).min(self.history.len());
self.history.splice(start..end, lines);
// Adjust downstream answer block start if it comes after this.
if let Some(astart) = self.current_answer_start {
if astart > start {
let delta = new_len as isize - old_len as isize;
self.current_answer_start =
Some((astart as isize + delta) as usize);
}
}
self.current_reasoning_len = 0;
}
None => {
self.history.extend(lines);
}
}
self.request_redraw();
}
self.request_redraw();
}
EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }) => {
self.add_to_history(HistoryCell::new_agent_reasoning(&self.config, text));
@@ -291,6 +388,43 @@ impl ChatWidget<'_> {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
// Finalize any inprogress streaming blocks defensively.
if let Some(start) = self.current_answer_start.take() {
if !self.answer_buffer.is_empty() {
let lines =
build_agent_message_lines(&self.config, &self.answer_buffer, true);
let new_len = lines.len();
let old_len = self.current_answer_len;
let end = start.saturating_add(old_len).min(self.history.len());
self.history.splice(start..end, lines);
if let Some(rstart) = self.current_reasoning_start {
if rstart > start {
let delta = new_len as isize - old_len as isize;
self.current_reasoning_start =
Some((rstart as isize + delta) as usize);
}
}
}
self.current_answer_len = 0;
}
if let Some(start) = self.current_reasoning_start.take() {
if !self.reasoning_buffer.is_empty() {
let lines =
build_agent_reasoning_lines(&self.config, &self.reasoning_buffer, true);
let new_len = lines.len();
let old_len = self.current_reasoning_len;
let end = start.saturating_add(old_len).min(self.history.len());
self.history.splice(start..end, lines);
if let Some(astart) = self.current_answer_start {
if astart > start {
let delta = new_len as isize - old_len as isize;
self.current_answer_start =
Some((astart as isize + delta) as usize);
}
}
}
self.current_reasoning_len = 0;
}
self.bottom_pane.set_task_running(false);
self.request_redraw();
}
@@ -484,6 +618,10 @@ impl ChatWidget<'_> {
self.submit_op(Op::Interrupt);
self.answer_buffer.clear();
self.reasoning_buffer.clear();
self.current_answer_start = None;
self.current_answer_len = 0;
self.current_reasoning_start = None;
self.current_reasoning_len = 0;
CancellationEvent::Ignored
} else if self.bottom_pane.ctrl_c_quit_hint_visible() {
self.submit_op(Op::Shutdown);
@@ -518,13 +656,52 @@ impl ChatWidget<'_> {
impl WidgetRef for &ChatWidget<'_> {
fn render_ref(&self, area: Rect, buf: &mut Buffer) {
// In the hybrid inline viewport mode we only draw the interactive
// bottom pane; history entries are injected directly into scrollback
// via `Terminal::insert_before`.
(&self.bottom_pane).render(area, buf);
let bottom_height = self.bottom_pane.desired_height(area.width);
let history_height = area.height.saturating_sub(bottom_height);
if history_height > 0 {
let history_area = Rect {
x: area.x,
y: area.y,
width: area.width,
height: history_height,
};
let total_rows = wrapped_row_count(&self.history, history_area.width);
let scroll = total_rows.saturating_sub(history_height);
Paragraph::new(self.history.clone())
.wrap(Wrap { trim: false })
.scroll((scroll, 0))
.render(history_area, buf);
}
let bottom_area = Rect {
x: area.x,
y: area.y + history_height,
width: area.width,
height: bottom_height,
};
(&self.bottom_pane).render(bottom_area, buf);
}
}
fn wrapped_row_count(lines: &[Line<'_>], width: u16) -> u16 {
if width == 0 {
return 0;
}
let w = width as u32;
let mut rows: u32 = 0;
for line in lines {
let total_width: u32 = line
.spans
.iter()
.map(|span| span.content.width() as u32)
.sum();
let line_rows = total_width.div_ceil(w).max(1);
rows = rows.saturating_add(line_rows);
}
rows.min(u16::MAX as u32) as u16
}
fn add_token_usage(current_usage: &TokenUsage, new_usage: &TokenUsage) -> TokenUsage {
let cached_input_tokens = match (
current_usage.cached_input_tokens,
@@ -552,3 +729,23 @@ fn add_token_usage(current_usage: &TokenUsage, new_usage: &TokenUsage) -> TokenU
total_tokens: current_usage.total_tokens + new_usage.total_tokens,
}
}
fn build_agent_message_lines(config: &Config, message: &str, finalize: bool) -> Vec<Line<'static>> {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(Line::from("codex".magenta().bold()));
append_markdown(message, &mut lines, config);
if finalize {
lines.push(Line::from(""));
}
lines
}
fn build_agent_reasoning_lines(config: &Config, text: &str, finalize: bool) -> Vec<Line<'static>> {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(Line::from("thinking".magenta().italic()));
append_markdown(text, &mut lines, config);
if finalize {
lines.push(Line::from(""));
}
lines
}

View File

@@ -324,16 +324,6 @@ where
&mut self.buffers[self.current]
}
/// Gets the backend
pub const fn backend(&self) -> &B {
&self.backend
}
/// Gets the backend as a mutable reference
pub fn backend_mut(&mut self) -> &mut B {
&mut self.backend
}
/// Obtains a difference between the previous and the current buffer and passes it to the
/// current backend for drawing.
pub fn flush(&mut self) -> io::Result<()> {

View File

@@ -68,9 +68,6 @@ pub(crate) enum HistoryCell {
/// Message from the user.
UserPrompt { view: TextBlock },
/// Message from the agent.
AgentMessage { view: TextBlock },
/// Reasoning event from the agent.
AgentReasoning { view: TextBlock },
@@ -128,7 +125,6 @@ impl HistoryCell {
match self {
HistoryCell::WelcomeMessage { view }
| HistoryCell::UserPrompt { view }
| HistoryCell::AgentMessage { view }
| HistoryCell::AgentReasoning { view }
| HistoryCell::BackgroundEvent { view }
| HistoryCell::GitDiffOutput { view }
@@ -231,17 +227,6 @@ impl HistoryCell {
}
}
pub(crate) fn new_agent_message(config: &Config, message: String) -> Self {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(Line::from("codex".magenta().bold()));
append_markdown(&message, &mut lines, config);
lines.push(Line::from(""));
HistoryCell::AgentMessage {
view: TextBlock::new(lines),
}
}
pub(crate) fn new_agent_reasoning(config: &Config, text: String) -> Self {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(Line::from("thinking".magenta().italic()));

View File

@@ -1,107 +1,6 @@
use std::fmt;
use std::io;
use std::io::Write;
use crate::tui;
use crossterm::Command;
use crossterm::cursor::MoveTo;
use crossterm::queue;
use crossterm::style::Color as CColor;
use crossterm::style::Colors;
use crossterm::style::Print;
use crossterm::style::SetAttribute;
use crossterm::style::SetBackgroundColor;
use crossterm::style::SetColors;
use crossterm::style::SetForegroundColor;
use ratatui::layout::Size;
use ratatui::prelude::Backend;
use ratatui::style::Color;
use ratatui::style::Modifier;
use ratatui::text::Line;
use ratatui::text::Span;
/// Insert `lines` above the viewport.
pub(crate) fn insert_history_lines(terminal: &mut tui::Tui, lines: Vec<Line>) {
let screen_size = terminal.backend().size().unwrap_or(Size::new(0, 0));
let cursor_pos = terminal.get_cursor_position().ok();
let mut area = terminal.get_frame().area();
let wrapped_lines = wrapped_line_count(&lines, area.width);
let cursor_top = if area.bottom() < screen_size.height {
// If the viewport is not at the bottom of the screen, scroll it down to make room.
// Don't scroll it past the bottom of the screen.
let scroll_amount = wrapped_lines.min(screen_size.height - area.bottom());
terminal
.backend_mut()
.scroll_region_down(area.top()..screen_size.height, scroll_amount)
.ok();
let cursor_top = area.top().saturating_sub(1);
area.y += scroll_amount;
terminal.set_viewport_area(area);
cursor_top
} else {
area.top().saturating_sub(1)
};
// Limit the scroll region to the lines from the top of the screen to the
// top of the viewport. With this in place, when we add lines inside this
// area, only the lines in this area will be scrolled. We place the cursor
// at the end of the scroll region, and add lines starting there.
//
// ┌─Screen───────────────────────┐
// │┌╌Scroll region╌╌╌╌╌╌╌╌╌╌╌╌╌╌┐│
// │┆ ┆│
// │┆ ┆│
// │┆ ┆│
// │█╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┘│
// │╭─Viewport───────────────────╮│
// ││ ││
// │╰────────────────────────────╯│
// └──────────────────────────────┘
queue!(std::io::stdout(), SetScrollRegion(1..area.top())).ok();
// NB: we are using MoveTo instead of set_cursor_position here to avoid messing with the
// terminal's last_known_cursor_position, which hopefully will still be accurate after we
// fetch/restore the cursor position. insert_history_lines should be cursor-position-neutral :)
queue!(std::io::stdout(), MoveTo(0, cursor_top)).ok();
for line in lines {
queue!(std::io::stdout(), Print("\r\n")).ok();
write_spans(&mut std::io::stdout(), line.iter()).ok();
}
queue!(std::io::stdout(), ResetScrollRegion).ok();
// Restore the cursor position to where it was before we started.
if let Some(cursor_pos) = cursor_pos {
queue!(std::io::stdout(), MoveTo(cursor_pos.x, cursor_pos.y)).ok();
}
}
fn wrapped_line_count(lines: &[Line], width: u16) -> u16 {
let mut count = 0;
for line in lines {
count += line_height(line, width);
}
count
}
fn line_height(line: &Line, width: u16) -> u16 {
use unicode_width::UnicodeWidthStr;
// get the total display width of the line, accounting for double-width chars
let total_width = line
.spans
.iter()
.map(|span| span.content.width())
.sum::<usize>();
// divide by width to get the number of lines, rounding up
if width == 0 {
1
} else {
(total_width as u16).div_ceil(width).max(1)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SetScrollRegion(pub std::ops::Range<u16>);