feat: enable streaming deltas

This commit is contained in:
aibrahim-oai
2025-07-14 18:31:37 -07:00
parent 3777e18243
commit 9487ae4ce7
9 changed files with 121 additions and 11 deletions

View File

@@ -426,6 +426,12 @@ where
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
// Deltas are ignored here since aggregation waits for the
// final OutputItemDone.
continue;
}
}
}
}

View File

@@ -205,6 +205,7 @@ struct SseEvent {
kind: String,
response: Option<Value>,
item: Option<Value>,
delta: Option<String>,
}
#[derive(Debug, Deserialize)]
@@ -337,6 +338,22 @@ where
return;
}
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.created" => {
if event.response.is_some() {
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
@@ -360,10 +377,8 @@ where
| "response.function_call_arguments.delta"
| "response.in_progress"
| "response.output_item.added"
| "response.output_text.delta"
| "response.output_text.done"
| "response.reasoning_summary_part.added"
| "response.reasoning_summary_text.delta"
| "response.reasoning_summary_text.done" => {
// Currently, we ignore these events, but we handle them
// separately to skip the logging message in the `other` case.

View File

@@ -53,6 +53,10 @@ impl Prompt {
pub enum ResponseEvent {
Created,
OutputItemDone(ResponseItem),
/// Streaming text from an assistant message.
OutputTextDelta(String),
/// Streaming text from a reasoning summary.
ReasoningSummaryDelta(String),
Completed {
response_id: String,
token_usage: Option<TokenUsage>,

View File

@@ -1121,15 +1121,9 @@ async fn try_run_turn(
let mut stream = sess.client.clone().stream(&prompt).await?;
// Buffer all the incoming messages from the stream first, then execute them.
// If we execute a function call in the middle of handling the stream, it can time out.
let mut input = Vec::new();
while let Some(event) = stream.next().await {
input.push(event?);
}
let mut output = Vec::new();
for event in input {
while let Some(event) = stream.next().await {
let event = event?;
match event {
ResponseEvent::Created => {
let mut state = sess.state.lock().unwrap();
@@ -1150,10 +1144,27 @@ async fn try_run_turn(
let mut state = sess.state.lock().unwrap();
state.pending_call_ids.insert(call_id.clone());
}
let response = handle_response_item(sess, sub_id, item.clone()).await?;
let response = match &item {
ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } => None,
_ => handle_response_item(sess, sub_id, item.clone()).await?,
};
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::OutputTextDelta(text) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageEvent { message: text }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryDelta(text) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::Completed {
response_id,
token_usage,

View File

@@ -282,9 +282,15 @@ pub enum EventMsg {
/// Agent text output message
AgentMessage(AgentMessageEvent),
/// Incremental assistant text delta
AgentMessageDelta(AgentMessageEvent),
/// Reasoning event from agent.
AgentReasoning(AgentReasoningEvent),
/// Incremental reasoning text delta.
AgentReasoningDelta(AgentReasoningEvent),
/// Ack the client's configure message.
SessionConfigured(SessionConfiguredEvent),

View File

@@ -18,6 +18,7 @@ use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TokenUsage;
use owo_colors::OwoColorize;
use std::io::{self, Write};
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
@@ -191,6 +192,10 @@ impl EventProcessor {
"codex".style(self.bold).style(self.magenta)
);
}
EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => {
print!("{message}");
let _ = io::stdout().flush();
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command,
@@ -449,6 +454,12 @@ impl EventProcessor {
);
}
}
EventMsg::AgentReasoningDelta(agent_reasoning_event) => {
if self.show_agent_reasoning {
print!("{}", agent_reasoning_event.text);
let _ = io::stdout().flush();
}
}
EventMsg::SessionConfigured(session_configured_event) => {
let SessionConfiguredEvent {
session_id,

View File

@@ -175,6 +175,8 @@ pub async fn run_codex_tool_session(
| EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)

View File

@@ -244,6 +244,12 @@ impl ChatWidget<'_> {
.add_agent_message(&self.config, message);
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => {
self
.conversation_history
.append_agent_message_delta(&self.config, message);
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
if !self.config.hide_agent_reasoning {
self.conversation_history
@@ -251,6 +257,13 @@ impl ChatWidget<'_> {
self.request_redraw();
}
}
EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }) => {
if !self.config.hide_agent_reasoning {
self.conversation_history
.append_agent_reasoning_delta(&self.config, text);
self.request_redraw();
}
}
EventMsg::TaskStarted => {
self.bottom_pane.clear_ctrl_c_quit_hint();
self.bottom_pane.set_task_running(true);

View File

@@ -3,6 +3,7 @@ use crate::history_cell::CommandOutput;
use crate::history_cell::HistoryCell;
use crate::history_cell::PatchEventType;
use codex_core::config::Config;
use crate::markdown::append_markdown;
use codex_core::protocol::FileChange;
use codex_core::protocol::SessionConfiguredEvent;
use crossterm::event::KeyCode;
@@ -202,6 +203,47 @@ impl ConversationHistoryWidget {
self.add_to_history(HistoryCell::new_agent_reasoning(config, text));
}
pub fn append_agent_message_delta(&mut self, config: &Config, text: String) {
if let Some(entry) = self.entries.last_mut() {
if let HistoryCell::AgentMessage { view } = &mut entry.cell {
if let Some(last) = view.lines.last() {
if last.spans.len() == 1 && last.spans[0].content.is_empty() {
view.lines.pop();
}
}
append_markdown(&text, &mut view.lines, config);
view.lines.push(Line::from(""));
let width = self.cached_width.get();
if width > 0 {
entry.line_count.set(view.height(width));
}
return;
}
}
// Fallback: create new entry
self.add_agent_message(config, text);
}
pub fn append_agent_reasoning_delta(&mut self, config: &Config, text: String) {
if let Some(entry) = self.entries.last_mut() {
if let HistoryCell::AgentReasoning { view } = &mut entry.cell {
if let Some(last) = view.lines.last() {
if last.spans.len() == 1 && last.spans[0].content.is_empty() {
view.lines.pop();
}
}
append_markdown(&text, &mut view.lines, config);
view.lines.push(Line::from(""));
let width = self.cached_width.get();
if width > 0 {
entry.line_count.set(view.height(width));
}
return;
}
}
self.add_agent_reasoning(config, text);
}
pub fn add_background_event(&mut self, message: String) {
self.add_to_history(HistoryCell::new_background_event(message));
}