Compare commits

...

12 Commits

Author SHA1 Message Date
Ahmed Ibrahim
c58b5c0910 tests 2025-07-15 19:23:01 -07:00
Ahmed Ibrahim
0bd53ac078 fixing tests 2025-07-15 16:14:43 -07:00
Ahmed Ibrahim
9aa1331c92 add rerendring 2025-07-15 10:52:14 -07:00
Ahmed Ibrahim
555a172722 fixing bugs 2025-07-14 23:50:37 -07:00
Ahmed Ibrahim
bd0f423d36 fixing bugs 2025-07-14 23:45:44 -07:00
Ahmed Ibrahim
b0757a1c23 fixing bugs 2025-07-14 23:45:11 -07:00
Ahmed Ibrahim
e98b35ac78 fixing bugs 2025-07-14 23:39:01 -07:00
Ahmed Ibrahim
e6939025f5 fixing bugs 2025-07-14 23:36:57 -07:00
Ahmed Ibrahim
f123cc6541 Merge branch 'codex/add-full-streaming-support-to-codex-cli' of github.com:openai/codex into codex/add-full-streaming-support-to-codex-cli 2025-07-14 23:11:00 -07:00
Ahmed Ibrahim
5f16fe8dda fixing bugs 2025-07-14 23:09:52 -07:00
aibrahim-oai
98b31b0390 Merge branch 'main' into codex/add-full-streaming-support-to-codex-cli 2025-07-14 18:40:36 -07:00
aibrahim-oai
9487ae4ce7 feat: enable streaming deltas 2025-07-14 18:31:37 -07:00
13 changed files with 789 additions and 95 deletions

View File

@@ -426,6 +426,12 @@ where
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
// Deltas are ignored here since aggregation waits for the
// final OutputItemDone.
continue;
}
}
}
}

View File

@@ -197,6 +197,10 @@ impl ModelClient {
}
}
}
pub fn streaming_enabled(&self) -> bool {
self.config.streaming_enabled
}
}
#[derive(Debug, Deserialize, Serialize)]
@@ -205,6 +209,7 @@ struct SseEvent {
kind: String,
response: Option<Value>,
item: Option<Value>,
delta: Option<String>,
}
#[derive(Debug, Deserialize)]
@@ -315,7 +320,7 @@ where
// duplicated `output` array embedded in the `response.completed`
// payload. That produced two concrete issues:
// 1. No realtime streaming the user only saw output after the
// entire turn had finished, which broke the typing UX and
// entire turn had finished, which broke the "typing" UX and
// made longrunning turns look stalled.
// 2. Duplicate `function_call_output` items both the
// individual *and* the completed array were forwarded, which
@@ -337,6 +342,22 @@ where
return;
}
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.created" => {
if event.response.is_some() {
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
@@ -360,10 +381,8 @@ where
| "response.function_call_arguments.delta"
| "response.in_progress"
| "response.output_item.added"
| "response.output_text.delta"
| "response.output_text.done"
| "response.reasoning_summary_part.added"
| "response.reasoning_summary_text.delta"
| "response.reasoning_summary_text.done" => {
// Currently, we ignore these events, but we handle them
// separately to skip the logging message in the `other` case.

View File

@@ -53,6 +53,10 @@ impl Prompt {
pub enum ResponseEvent {
Created,
OutputItemDone(ResponseItem),
/// Streaming text from an assistant message.
OutputTextDelta(String),
/// Streaming text from a reasoning summary.
ReasoningSummaryDelta(String),
Completed {
response_id: String,
token_usage: Option<TokenUsage>,

View File

@@ -1121,19 +1121,15 @@ async fn try_run_turn(
let mut stream = sess.client.clone().stream(&prompt).await?;
// Buffer all the incoming messages from the stream first, then execute them.
// If we execute a function call in the middle of handling the stream, it can time out.
let mut input = Vec::new();
while let Some(event) = stream.next().await {
input.push(event?);
}
let mut output = Vec::new();
for event in input {
// Patch: buffer for non-streaming mode
let mut assistant_message_buf = String::new();
let streaming_enabled = sess.client.streaming_enabled();
while let Some(event) = stream.next().await {
let event = event?;
match event {
ResponseEvent::Created => {
let mut state = sess.state.lock().unwrap();
// We successfully created a new response and ensured that all pending calls were included so we can clear the pending call ids.
state.pending_call_ids.clear();
}
ResponseEvent::OutputItemDone(item) => {
@@ -1146,18 +1142,59 @@ async fn try_run_turn(
_ => None,
};
if let Some(call_id) = call_id {
// We just got a new call id so we need to make sure to respond to it in the next turn.
let mut state = sess.state.lock().unwrap();
state.pending_call_ids.insert(call_id.clone());
}
let response = handle_response_item(sess, sub_id, item.clone()).await?;
// Patch: buffer assistant message text if streaming is disabled
if !streaming_enabled {
if let ResponseItem::Message { role, content } = &item {
if role == "assistant" {
for c in content {
if let ContentItem::OutputText { text } = c {
assistant_message_buf.push_str(text);
}
}
}
}
}
let response = match &item {
ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } => None,
_ => handle_response_item(sess, sub_id, item.clone()).await?,
};
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::OutputTextDelta(text) => {
if streaming_enabled {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageEvent { message: text }),
};
sess.tx_event.send(event).await.ok();
} else {
assistant_message_buf.push_str(&text);
}
}
ResponseEvent::ReasoningSummaryDelta(text) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::Completed {
response_id,
token_usage,
} => {
// Patch: emit full message if we buffered deltas
if !streaming_enabled && !assistant_message_buf.is_empty() {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: assistant_message_buf.clone(),
}),
};
sess.tx_event.send(event).await.ok();
}
if let Some(token_usage) = token_usage {
sess.tx_event
.send(Event {

View File

@@ -131,6 +131,13 @@ pub struct Config {
/// request using the Responses API.
pub model_reasoning_summary: ReasoningSummary,
/// Whether to surface live streaming delta events in front-ends. When `true`
/// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta`
/// events and UIs may show a typing indicator. When `false` Codex UIs should
/// ignore delta events and rely solely on the final aggregated
/// `AgentMessage`/`AgentReasoning` items (legacy behaviour).
pub streaming_enabled: bool,
/// When set to `true`, overrides the default heuristic and forces
/// `model_supports_reasoning_summaries()` to return `true`.
pub model_supports_reasoning_summaries: bool,
@@ -321,6 +328,13 @@ pub struct ConfigToml {
/// Base URL for requests to ChatGPT (as opposed to the OpenAI API).
pub chatgpt_base_url: Option<String>,
/// Whether to surface live streaming delta events in front-ends. When `true`
/// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta`
/// events and UIs may show a typing indicator. When `false` Codex UIs should
/// ignore delta events and rely solely on the final aggregated
/// `AgentMessage`/`AgentReasoning` items (legacy behaviour).
pub streaming: Option<bool>,
}
impl ConfigToml {
@@ -486,6 +500,7 @@ impl Config {
.or(cfg.model_reasoning_summary)
.unwrap_or_default(),
streaming_enabled: cfg.streaming.unwrap_or(true),
model_supports_reasoning_summaries: cfg
.model_supports_reasoning_summaries
.unwrap_or(false),
@@ -798,6 +813,7 @@ disable_response_storage = true
hide_agent_reasoning: false,
model_reasoning_effort: ReasoningEffort::High,
model_reasoning_summary: ReasoningSummary::Detailed,
streaming_enabled: true,
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
},
@@ -844,6 +860,7 @@ disable_response_storage = true
hide_agent_reasoning: false,
model_reasoning_effort: ReasoningEffort::default(),
model_reasoning_summary: ReasoningSummary::default(),
streaming_enabled: true,
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
};
@@ -905,6 +922,7 @@ disable_response_storage = true
hide_agent_reasoning: false,
model_reasoning_effort: ReasoningEffort::default(),
model_reasoning_summary: ReasoningSummary::default(),
streaming_enabled: true,
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
};

View File

@@ -150,7 +150,7 @@ pub type EnvironmentVariablePattern = WildMatchPattern<'*', '?'>;
/// Deriving the `env` based on this policy works as follows:
/// 1. Create an initial map based on the `inherit` policy.
/// 2. If `ignore_default_excludes` is false, filter the map using the default
/// exclude pattern(s), which are: `"*KEY*"` and `"*TOKEN*"`.
/// exclude pattern(s), which are: "*KEY*" and "*TOKEN*".
/// 3. If `exclude` is not empty, filter the map using the provided patterns.
/// 4. Insert any entries from `r#set` into the map.
/// 5. If non-empty, filter the map using the `include_only` patterns.
@@ -228,3 +228,10 @@ pub enum ReasoningSummary {
/// Option to disable reasoning summaries.
None,
}
// ---------------------------------------------------------------------------
// NOTE: The canonical ConfigToml definition lives in `crate::config`.
// Historically this file accidentally re-declared that struct, which caused
// drift and confusion. The duplicate has been removed; please use
// `codex_core::config::ConfigToml` instead.
// ---------------------------------------------------------------------------

View File

@@ -282,9 +282,15 @@ pub enum EventMsg {
/// Agent text output message
AgentMessage(AgentMessageEvent),
/// Incremental assistant text delta
AgentMessageDelta(AgentMessageEvent),
/// Reasoning event from agent.
AgentReasoning(AgentReasoningEvent),
/// Incremental reasoning text delta.
AgentReasoningDelta(AgentReasoningEvent),
/// Ack the client's configure message.
SessionConfigured(SessionConfiguredEvent),

View File

@@ -58,6 +58,8 @@ async fn chat_mode_stream_cli() {
.arg(&provider_override)
.arg("-c")
.arg("model_provider=\"mock\"")
.arg("-c")
.arg("streaming=false")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
.arg("hello?");
@@ -71,8 +73,8 @@ async fn chat_mode_stream_cli() {
println!("Stderr:\n{}", String::from_utf8_lossy(&output.stderr));
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(stdout.contains("hi"));
assert_eq!(stdout.matches("hi").count(), 1);
let hi_lines = stdout.lines().filter(|line| line.trim() == "hi").count();
assert_eq!(hi_lines, 1, "Expected exactly one line with 'hi'");
server.verify().await;
}
@@ -104,6 +106,8 @@ async fn responses_api_stream_cli() {
.arg("--")
.arg("exec")
.arg("--skip-git-repo-check")
.arg("-c")
.arg("streaming=false")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
.arg("hello?");
@@ -117,3 +121,188 @@ async fn responses_api_stream_cli() {
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(stdout.contains("fixture hello"));
}
/// Tests chat completions with streaming enabled (streaming=true) through the CLI using a mock server.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn chat_mode_streaming_enabled_cli() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let server = MockServer::start().await;
// Simulate streaming deltas: 'h' and 'i' as separate chunks
let sse = concat!(
"data: {\"choices\":[{\"delta\":{\"content\":\"h\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"content\":\"i\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{}}]}\n\n",
"data: [DONE]\n\n"
);
Mock::given(method("POST"))
.and(path("/v1/chat/completions"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse, "text/event-stream"),
)
.expect(1)
.mount(&server)
.await;
let home = TempDir::new().unwrap();
let provider_override = format!(
"model_providers.mock={{ name = \"mock\", base_url = \"{}/v1\", env_key = \"PATH\", wire_api = \"chat\" }}",
server.uri()
);
let mut cmd = AssertCommand::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
.arg("--skip-git-repo-check")
.arg("-c")
.arg(&provider_override)
.arg("-c")
.arg("model_provider=\"mock\"")
.arg("-c")
.arg("streaming=true")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
.arg("hello?");
cmd.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("OPENAI_BASE_URL", format!("{}/v1", server.uri()));
let output = cmd.output().unwrap();
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
// Assert that 'h' and 'i' are output as two separate chunks from stdout, not as a single chunk
// We split the output on 'h' and 'i' and check their order and separation
let mut chunks = Vec::new();
let mut last = 0;
for (idx, c) in stdout.char_indices() {
if c == 'h' || c == 'i' {
if last != idx {
let chunk = &stdout[last..idx];
if !chunk.trim().is_empty() {
chunks.push(chunk);
}
}
chunks.push(&stdout[idx..idx + c.len_utf8()]);
last = idx + c.len_utf8();
}
}
if last < stdout.len() {
let chunk = &stdout[last..];
if !chunk.trim().is_empty() {
chunks.push(chunk);
}
}
// Only keep the 'h' and 'i' chunks
let delta_chunks: Vec<&str> = chunks
.iter()
.cloned()
.filter(|s| *s == "h" || *s == "i")
.collect();
assert_eq!(
delta_chunks,
vec!["h", "i"],
"Expected two separate delta chunks 'h' and 'i' from stdout"
);
server.verify().await;
}
/// Tests responses API with streaming enabled (streaming=true) through the CLI using a local SSE fixture file.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_api_streaming_enabled_cli() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Create a fixture with two deltas: 'fixture ' and 'hello'
use std::fs;
use std::io::Write;
let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/cli_responses_fixture_streaming.sse");
let mut fixture_file = fs::File::create(&fixture_path).unwrap();
writeln!(fixture_file, "event: response.created").unwrap();
writeln!(
fixture_file,
"data: {{\"type\":\"response.created\",\"response\":{{\"id\":\"resp1\"}}}}\n"
)
.unwrap();
writeln!(fixture_file, "event: response.output_text.delta").unwrap();
writeln!(fixture_file, "data: {{\"type\":\"response.output_text.delta\",\"delta\":\"fixture \",\"item_id\":\"msg1\"}}\n").unwrap();
writeln!(fixture_file, "event: response.output_text.delta").unwrap();
writeln!(fixture_file, "data: {{\"type\":\"response.output_text.delta\",\"delta\":\"hello\",\"item_id\":\"msg1\"}}\n").unwrap();
writeln!(fixture_file, "event: response.output_text.done").unwrap();
writeln!(fixture_file, "data: {{\"type\":\"response.output_text.done\",\"text\":\"fixture hello\",\"item_id\":\"msg1\"}}\n").unwrap();
writeln!(fixture_file, "event: response.output_item.done").unwrap();
writeln!(fixture_file, "data: {{\"type\":\"response.output_item.done\",\"item\":{{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{{\"type\":\"output_text\",\"text\":\"fixture hello\"}}]}}}}\n").unwrap();
writeln!(fixture_file, "event: response.completed").unwrap();
writeln!(fixture_file, "data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"resp1\",\"output\":[]}}}}\n").unwrap();
let home = TempDir::new().unwrap();
let mut cmd = AssertCommand::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
.arg("--skip-git-repo-check")
.arg("-c")
.arg("streaming=true")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
.arg("hello?");
cmd.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture_path)
.env("OPENAI_BASE_URL", "http://unused.local");
let output = cmd.output().unwrap();
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
// Assert that 'fixture ' and 'hello' are output as two separate chunks from stdout, not as a single chunk
// We split the output on the known delta substrings and check their order and separation
let mut chunks = Vec::new();
let mut last = 0;
for pat in ["fixture ", "hello"] {
if let Some(idx) = stdout[last..].find(pat) {
if last != last + idx {
let chunk = &stdout[last..last + idx];
if !chunk.trim().is_empty() {
chunks.push(chunk);
}
}
chunks.push(&stdout[last + idx..last + idx + pat.len()]);
last = last + idx + pat.len();
}
}
if last < stdout.len() {
let chunk = &stdout[last..];
if !chunk.trim().is_empty() {
chunks.push(chunk);
}
}
// Only keep the delta chunks
let delta_chunks: Vec<&str> = chunks
.iter()
.cloned()
.filter(|s| *s == "fixture " || *s == "hello")
.collect();
assert_eq!(
delta_chunks,
vec!["fixture ", "hello"],
"Expected two separate delta chunks 'fixture ' and 'hello' from stdout"
);
}

View File

@@ -21,6 +21,8 @@ use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
use std::io::Write;
use std::io::{self};
use std::time::Instant;
/// This should be configurable. When used in CI, users may not want to impose
@@ -50,10 +52,20 @@ pub(crate) struct EventProcessor {
/// Whether to include `AgentReasoning` events in the output.
show_agent_reasoning: bool,
/// Whether to surface streaming deltas (true = print deltas + suppress final message).
streaming_enabled: bool,
/// Internal: have we already printed the `codex` header for the current streaming turn?
printed_agent_header: bool,
/// Internal: have we already printed the `thinking` header for current streaming turn?
printed_reasoning_header: bool,
}
impl EventProcessor {
pub(crate) fn create_with_ansi(with_ansi: bool, show_agent_reasoning: bool) -> Self {
pub(crate) fn create_with_ansi(
with_ansi: bool,
show_agent_reasoning: bool,
streaming_enabled: bool,
) -> Self {
let call_id_to_command = HashMap::new();
let call_id_to_patch = HashMap::new();
let call_id_to_tool_call = HashMap::new();
@@ -71,6 +83,9 @@ impl EventProcessor {
cyan: Style::new().cyan(),
call_id_to_tool_call,
show_agent_reasoning,
streaming_enabled,
printed_agent_header: false,
printed_reasoning_header: false,
}
} else {
Self {
@@ -85,6 +100,9 @@ impl EventProcessor {
cyan: Style::new(),
call_id_to_tool_call,
show_agent_reasoning,
streaming_enabled,
printed_agent_header: false,
printed_reasoning_header: false,
}
}
}
@@ -179,17 +197,46 @@ impl EventProcessor {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
// Reset streaming headers at start/end boundaries.
if matches!(msg, EventMsg::TaskStarted) {
self.printed_agent_header = false;
self.printed_reasoning_header = false;
}
// Ignore.
}
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}");
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
ts_println!(
self,
"{}\n{message}",
"codex".style(self.bold).style(self.magenta)
);
if self.streaming_enabled {
// Suppress full message when streaming; final markdown not printed in CLI.
// If no deltas were seen, fall back to printing now.
if !self.printed_agent_header {
ts_println!(
self,
"{}\n{message}",
"codex".style(self.bold).style(self.magenta)
);
}
} else {
ts_println!(
self,
"{}\n{message}",
"codex".style(self.bold).style(self.magenta)
);
}
}
EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => {
if !self.streaming_enabled {
// streaming disabled, ignore
} else {
if !self.printed_agent_header {
ts_println!(self, "{}", "codex".style(self.bold).style(self.magenta));
self.printed_agent_header = true;
}
print!("{message}");
let _ = io::stdout().flush();
}
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
@@ -343,7 +390,7 @@ impl EventProcessor {
);
// Pretty-print the patch summary with colored diff markers so
// its easy to scan in the terminal output.
// it's easy to scan in the terminal output.
for (path, change) in changes.iter() {
match change {
FileChange::Add { content } => {
@@ -441,12 +488,37 @@ impl EventProcessor {
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {
ts_println!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
agent_reasoning_event.text
);
if self.streaming_enabled {
if !self.printed_reasoning_header {
ts_println!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
agent_reasoning_event.text
);
}
} else {
ts_println!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
agent_reasoning_event.text
);
}
}
}
EventMsg::AgentReasoningDelta(agent_reasoning_event) => {
if self.show_agent_reasoning && self.streaming_enabled {
if !self.printed_reasoning_header {
ts_println!(
self,
"{}",
"thinking".style(self.italic).style(self.magenta)
);
self.printed_reasoning_header = true;
}
print!("{}", agent_reasoning_event.text);
let _ = io::stdout().flush();
}
}
EventMsg::SessionConfigured(session_configured_event) => {

View File

@@ -115,8 +115,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
};
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
let mut event_processor =
EventProcessor::create_with_ansi(stdout_with_ansi, !config.hide_agent_reasoning);
println!("[DEBUG] streaming_enabled: {}", config.streaming_enabled);
let mut event_processor = EventProcessor::create_with_ansi(
stdout_with_ansi,
!config.hide_agent_reasoning,
config.streaming_enabled,
);
// Print the effective configuration and prompt so users can see what Codex
// is using.
event_processor.print_config_summary(&config, &prompt);

View File

@@ -175,6 +175,8 @@ pub async fn run_codex_tool_session(
| EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)

View File

@@ -40,6 +40,15 @@ use crate::history_cell::PatchEventType;
use crate::user_approval_widget::ApprovalRequest;
use codex_file_search::FileMatch;
/// Bookkeeping for a live streaming cell. We track the `sub_id` to know when
/// a new turn has started (and thus when to start a new cell) and accumulate
/// the full text so we can re-render markdown cleanly when the turn ends.
#[derive(Default)]
struct StreamingBuf {
sub_id: Option<String>,
text: String,
}
pub(crate) struct ChatWidget<'a> {
app_event_tx: AppEventSender,
codex_op_tx: UnboundedSender<Op>,
@@ -49,6 +58,10 @@ pub(crate) struct ChatWidget<'a> {
config: Config,
initial_user_message: Option<UserMessage>,
token_usage: TokenUsage,
/// Accumulates assistant streaming text for the *current* turn.
streaming_agent: StreamingBuf,
/// Accumulates reasoning streaming text for the *current* turn.
streaming_reasoning: StreamingBuf,
}
#[derive(Clone, Copy, Eq, PartialEq)]
@@ -135,6 +148,8 @@ impl ChatWidget<'_> {
initial_images,
),
token_usage: TokenUsage::default(),
streaming_agent: StreamingBuf::default(),
streaming_reasoning: StreamingBuf::default(),
}
}
@@ -220,6 +235,8 @@ impl ChatWidget<'_> {
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
// We need a copy of `id` for streaming bookkeeping because it is moved into some match arms.
let event_id = id.clone();
match msg {
EventMsg::SessionConfigured(event) => {
// Record session information at the top of the conversation.
@@ -240,14 +257,111 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
if self.config.streaming_enabled {
// Final full assistant message. If we have an in-flight streaming cell for this id, replace it.
let same_turn = self
.streaming_agent
.sub_id
.as_ref()
.map(|s| s == &event_id)
.unwrap_or(false);
if same_turn {
self.conversation_history
.replace_last_agent_message(&self.config, message.clone());
self.streaming_agent.sub_id = None;
self.streaming_agent.text.clear();
} else {
// Streaming enabled but we never saw deltas just render normally.
self.finalize_streams_if_new_turn(&event_id);
self.conversation_history
.add_agent_message(&self.config, message.clone());
}
} else {
// Streaming disabled -> always render final message, ignore any deltas.
self.conversation_history
.add_agent_message(&self.config, message.clone());
}
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => {
// Streaming Assistant text.
if !self.config.streaming_enabled {
// Ignore when streaming disabled.
return;
}
// Start a new cell if this delta belongs to a new turn.
let is_new_stream = self
.streaming_agent
.sub_id
.as_ref()
.map(|s| s != &event_id)
.unwrap_or(true);
if is_new_stream {
// Finalise any in-flight stream from the prior turn.
self.finalize_streams_if_new_turn(&event_id);
// Start a header-only streaming cell so we don't parse partial markdown.
self.conversation_history
.add_agent_message(&self.config, String::new());
self.streaming_agent.sub_id = Some(event_id.clone());
self.streaming_agent.text.clear();
}
// Accumulate full text; incremental markdown re-render happens in ConversationHistoryWidget.
self.streaming_agent.text.push_str(&message);
self.conversation_history
.add_agent_message(&self.config, message);
.append_agent_message_delta(&self.config, message);
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
if !self.config.hide_agent_reasoning {
if self.config.streaming_enabled {
// Final full reasoning summary. Replace streaming cell if same turn.
let same_turn = self
.streaming_reasoning
.sub_id
.as_ref()
.map(|s| s == &event_id)
.unwrap_or(false);
if same_turn {
self.conversation_history
.replace_last_agent_reasoning(&self.config, text.clone());
self.streaming_reasoning.sub_id = None;
self.streaming_reasoning.text.clear();
} else {
self.finalize_streams_if_new_turn(&event_id);
self.conversation_history
.add_agent_reasoning(&self.config, text.clone());
}
} else {
self.conversation_history
.add_agent_reasoning(&self.config, text.clone());
}
self.request_redraw();
}
}
EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }) => {
if !self.config.hide_agent_reasoning {
if !self.config.streaming_enabled {
// Ignore when streaming disabled.
return;
}
let is_new_stream = self
.streaming_reasoning
.sub_id
.as_ref()
.map(|s| s != &event_id)
.unwrap_or(true);
if is_new_stream {
self.finalize_streams_if_new_turn(&event_id);
// Start header-only streaming cell.
self.conversation_history
.add_agent_reasoning(&self.config, String::new());
self.streaming_reasoning.sub_id = Some(event_id.clone());
self.streaming_reasoning.text.clear();
}
// Accumulate full text; incremental markdown re-render happens in ConversationHistoryWidget.
self.streaming_reasoning.text.push_str(&text);
self.conversation_history
.add_agent_reasoning(&self.config, text);
.append_agent_reasoning_delta(&self.config, text);
self.request_redraw();
}
}
@@ -259,6 +373,8 @@ impl ChatWidget<'_> {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
// Turn has ended ensure no lingering streaming cells remain un-finalised.
self.finalize_streams();
self.bottom_pane.set_task_running(false);
self.request_redraw();
}
@@ -438,6 +554,42 @@ impl ChatWidget<'_> {
tracing::error!("failed to submit op: {e}");
}
}
/// Finalise (render) streaming buffers when we detect a new turn id.
fn finalize_streams_if_new_turn(&mut self, new_id: &str) {
// If the incoming id differs from the current stream id(s) we must flush.
let agent_changed = self
.streaming_agent
.sub_id
.as_ref()
.map(|s| s != new_id)
.unwrap_or(false);
let reasoning_changed = self
.streaming_reasoning
.sub_id
.as_ref()
.map(|s| s != new_id)
.unwrap_or(false);
if agent_changed || reasoning_changed {
self.finalize_streams();
}
}
/// Re-render any in-flight streaming cells with full markdown and clear buffers.
fn finalize_streams(&mut self) {
let had_agent = self.streaming_agent.sub_id.take().is_some();
if had_agent {
let text = std::mem::take(&mut self.streaming_agent.text);
self.conversation_history
.replace_last_agent_message(&self.config, text);
}
let had_reasoning = self.streaming_reasoning.sub_id.take().is_some();
if had_reasoning {
let text = std::mem::take(&mut self.streaming_reasoning.text);
self.conversation_history
.replace_last_agent_reasoning(&self.config, text);
}
}
}
impl WidgetRef for &ChatWidget<'_> {

View File

@@ -9,9 +9,9 @@ use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
use ratatui::prelude::*;
use ratatui::style::Style;
use ratatui::text::Span;
use ratatui::widgets::*;
use serde_json::Value as JsonValue;
use std::cell::Cell as StdCell;
use std::cell::Cell;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -26,24 +26,30 @@ pub struct ConversationHistoryWidget {
entries: Vec<Entry>,
/// The width (in terminal cells/columns) that [`Entry::line_count`] was
/// computed for. When the available width changes we recompute counts.
cached_width: StdCell<u16>,
cached_width: Cell<u16>,
scroll_position: usize,
/// Number of lines the last time render_ref() was called
num_rendered_lines: StdCell<usize>,
num_rendered_lines: Cell<usize>,
/// The height of the viewport last time render_ref() was called
last_viewport_height: StdCell<usize>,
last_viewport_height: Cell<usize>,
has_input_focus: bool,
/// Scratch buffer used while incrementally streaming an agent message so we can re-render markdown at newline boundaries.
streaming_agent_message_buf: String,
/// Scratch buffer used while incrementally streaming agent reasoning so we can re-render markdown at newline boundaries.
streaming_agent_reasoning_buf: String,
}
impl ConversationHistoryWidget {
pub fn new() -> Self {
Self {
entries: Vec::new(),
cached_width: StdCell::new(0),
cached_width: Cell::new(0),
scroll_position: usize::MAX,
num_rendered_lines: StdCell::new(0),
last_viewport_height: StdCell::new(0),
num_rendered_lines: Cell::new(0),
last_viewport_height: Cell::new(0),
has_input_focus: false,
streaming_agent_message_buf: String::new(),
streaming_agent_reasoning_buf: String::new(),
}
}
@@ -84,38 +90,26 @@ impl ConversationHistoryWidget {
}
fn scroll_up(&mut self, num_lines: u32) {
// If a user is scrolling up from the "stick to bottom" mode, we need to
// map this to a specific scroll position so we can calculate the delta.
// This requires us to care about how tall the screen is.
// Convert sticky-to-bottom sentinel into a concrete offset anchored at the bottom.
if self.scroll_position == usize::MAX {
self.scroll_position = self
.num_rendered_lines
.get()
.saturating_sub(self.last_viewport_height.get());
self.scroll_position = sticky_offset(
self.num_rendered_lines.get(),
self.last_viewport_height.get(),
);
}
self.scroll_position = self.scroll_position.saturating_sub(num_lines as usize);
}
fn scroll_down(&mut self, num_lines: u32) {
// If we're already pinned to the bottom there's nothing to do.
// Nothing to do if we're already pinned to the bottom.
if self.scroll_position == usize::MAX {
return;
}
let viewport_height = self.last_viewport_height.get().max(1);
let num_rendered_lines = self.num_rendered_lines.get();
// Compute the maximum explicit scroll offset that still shows a full
// viewport. This mirrors the calculation in `scroll_page_down()` and
// in the render path.
let max_scroll = num_rendered_lines.saturating_sub(viewport_height);
let max_scroll = sticky_offset(self.num_rendered_lines.get(), viewport_height);
let new_pos = self.scroll_position.saturating_add(num_lines as usize);
if new_pos >= max_scroll {
// Reached (or passed) the bottom switch to sticktobottom mode
// so that additional output keeps the view pinned automatically.
// Switch to sticky-bottom mode so subsequent output pins view.
self.scroll_position = usize::MAX;
} else {
self.scroll_position = new_pos;
@@ -125,44 +119,21 @@ impl ConversationHistoryWidget {
/// Scroll up by one full viewport height (Page Up).
fn scroll_page_up(&mut self) {
let viewport_height = self.last_viewport_height.get().max(1);
// If we are currently in the "stick to bottom" mode, first convert the
// implicit scroll position (`usize::MAX`) into an explicit offset that
// represents the very bottom of the scroll region. This mirrors the
// logic from `scroll_up()`.
if self.scroll_position == usize::MAX {
self.scroll_position = self
.num_rendered_lines
.get()
.saturating_sub(viewport_height);
self.scroll_position = sticky_offset(self.num_rendered_lines.get(), viewport_height);
}
// Move up by a full page.
self.scroll_position = self.scroll_position.saturating_sub(viewport_height);
}
/// Scroll down by one full viewport height (Page Down).
fn scroll_page_down(&mut self) {
// Nothing to do if we're already stuck to the bottom.
if self.scroll_position == usize::MAX {
return;
}
let viewport_height = self.last_viewport_height.get().max(1);
let num_lines = self.num_rendered_lines.get();
// Calculate the maximum explicit scroll offset that is still within
// range. This matches the logic in `scroll_down()` and the render
// method.
let max_scroll = num_lines.saturating_sub(viewport_height);
// Attempt to move down by a full page.
let max_scroll = sticky_offset(self.num_rendered_lines.get(), viewport_height);
let new_pos = self.scroll_position.saturating_add(viewport_height);
if new_pos >= max_scroll {
// We have reached (or passed) the bottom switch back to
// automatic sticktobottom mode so that subsequent output keeps
// the viewport pinned.
self.scroll_position = usize::MAX;
} else {
self.scroll_position = new_pos;
@@ -195,13 +166,107 @@ impl ConversationHistoryWidget {
}
pub fn add_agent_message(&mut self, config: &Config, message: String) {
// Reset streaming scratch because we are starting a fresh agent message.
self.streaming_agent_message_buf.clear();
self.streaming_agent_message_buf.push_str(&message);
self.add_to_history(HistoryCell::new_agent_message(config, message));
}
pub fn add_agent_reasoning(&mut self, config: &Config, text: String) {
self.streaming_agent_reasoning_buf.clear();
self.streaming_agent_reasoning_buf.push_str(&text);
self.add_to_history(HistoryCell::new_agent_reasoning(config, text));
}
/// Append incremental assistant text.
///
/// Previous heuristic: fastappend chunks until we saw a newline, then rerender.
/// This caused visible "oneword" lines (e.g., "The" -> "The user") when models
/// streamed small token fragments and also delayed Markdown styling (headings, code fences)
/// until the first newline arrived. To improve perceived quality we now *always* rerender
/// the accumulated markdown buffer on every incoming delta chunk. We still apply the
/// softbreak collapsing heuristic (outside fenced code blocks) so interim layout more closely
/// matches the final message and reduces layout thrash.
pub fn append_agent_message_delta(&mut self, _config: &Config, text: String) {
if text.is_empty() {
return;
}
// Accumulate full buffer.
self.streaming_agent_message_buf.push_str(&text);
let collapsed = collapse_single_newlines_for_streaming(&self.streaming_agent_message_buf);
if let Some(idx) = last_agent_message_idx(&self.entries) {
let width = self.cached_width.get();
let entry = &mut self.entries[idx];
entry.cell = HistoryCell::new_agent_message(_config, collapsed);
// Drop trailing blank so we can continue streaming additional tokens cleanly.
if let HistoryCell::AgentMessage { view } = &mut entry.cell {
drop_trailing_blank_line(&mut view.lines);
}
if width > 0 {
update_entry_height(entry, width);
}
} else {
// No existing cell? Start a new one.
self.add_agent_message(_config, self.streaming_agent_message_buf.clone());
}
}
/// Append incremental reasoning text (mirrors `append_agent_message_delta`).
pub fn append_agent_reasoning_delta(&mut self, _config: &Config, text: String) {
if text.is_empty() {
return;
}
self.streaming_agent_reasoning_buf.push_str(&text);
let collapsed = collapse_single_newlines_for_streaming(&self.streaming_agent_reasoning_buf);
if let Some(idx) = last_agent_reasoning_idx(&self.entries) {
let width = self.cached_width.get();
let entry = &mut self.entries[idx];
entry.cell = HistoryCell::new_agent_reasoning(_config, collapsed);
if let HistoryCell::AgentReasoning { view } = &mut entry.cell {
drop_trailing_blank_line(&mut view.lines);
}
if width > 0 {
update_entry_height(entry, width);
}
} else {
self.add_agent_reasoning(_config, self.streaming_agent_reasoning_buf.clone());
}
}
/// Replace the most recent AgentMessage cell with the fully accumulated `text`.
/// This should be called once the turn is complete so we can render proper markdown.
pub fn replace_last_agent_message(&mut self, config: &Config, text: String) {
self.streaming_agent_message_buf.clear();
if let Some(idx) = last_agent_message_idx(&self.entries) {
let width = self.cached_width.get();
let entry = &mut self.entries[idx];
entry.cell = HistoryCell::new_agent_message(config, text);
if width > 0 {
update_entry_height(entry, width);
}
} else {
// No existing AgentMessage (shouldn't happen) append new.
self.add_agent_message(config, text);
}
}
/// Replace the most recent AgentReasoning cell with the fully accumulated `text`.
pub fn replace_last_agent_reasoning(&mut self, config: &Config, text: String) {
self.streaming_agent_reasoning_buf.clear();
if let Some(idx) = last_agent_reasoning_idx(&self.entries) {
let width = self.cached_width.get();
let entry = &mut self.entries[idx];
entry.cell = HistoryCell::new_agent_reasoning(config, text);
if width > 0 {
update_entry_height(entry, width);
}
} else {
self.add_agent_reasoning(config, text);
}
}
pub fn add_background_event(&mut self, message: String) {
self.add_to_history(HistoryCell::new_background_event(message));
}
@@ -279,7 +344,7 @@ impl ConversationHistoryWidget {
// Update cached line count.
if width > 0 {
entry.line_count.set(cell.height(width));
update_entry_height(entry, width);
}
break;
}
@@ -313,7 +378,7 @@ impl ConversationHistoryWidget {
entry.cell = completed;
if width > 0 {
entry.line_count.set(entry.cell.height(width));
update_entry_height(entry, width);
}
break;
@@ -370,14 +435,12 @@ impl WidgetRef for ConversationHistoryWidget {
self.entries.iter().map(|e| e.line_count.get()).sum()
};
// Determine the scroll position. Note the existing value of
// `self.scroll_position` could exceed the maximum scroll offset if the
// user made the window wider since the last render.
let max_scroll = num_lines.saturating_sub(viewport_height);
// Determine the scroll position (respect sticky-to-bottom sentinel and clamp).
let max_scroll = sticky_offset(num_lines, viewport_height);
let scroll_pos = if self.scroll_position == usize::MAX {
max_scroll
} else {
self.scroll_position.min(max_scroll)
clamp_scroll_pos(self.scroll_position, max_scroll)
};
// ------------------------------------------------------------------
@@ -454,7 +517,7 @@ impl WidgetRef for ConversationHistoryWidget {
{
// Choose a thumb color that stands out only when this pane has focus so that the
// users attention is naturally drawn to the active viewport. When unfocused we show
// user's attention is naturally drawn to the active viewport. When unfocused we show
// a low-contrast thumb so the scrollbar fades into the background without becoming
// invisible.
let thumb_style = if self.has_input_focus {
@@ -497,3 +560,118 @@ impl WidgetRef for ConversationHistoryWidget {
pub(crate) const fn wrap_cfg() -> ratatui::widgets::Wrap {
ratatui::widgets::Wrap { trim: false }
}
// ---------------------------------------------------------------------------
// Scrolling helpers (private)
// ---------------------------------------------------------------------------
#[inline]
fn sticky_offset(num_lines: usize, viewport_height: usize) -> usize {
num_lines.saturating_sub(viewport_height.max(1))
}
#[inline]
fn clamp_scroll_pos(pos: usize, max_scroll: usize) -> usize {
pos.min(max_scroll)
}
// ---------------------------------------------------------------------------
// Streaming helpers (private)
// ---------------------------------------------------------------------------
/// Locate the most recent `HistoryCell::AgentMessage` entry.
fn last_agent_message_idx(entries: &[Entry]) -> Option<usize> {
entries
.iter()
.rposition(|e| matches!(e.cell, HistoryCell::AgentMessage { .. }))
}
/// Locate the most recent `HistoryCell::AgentReasoning` entry.
fn last_agent_reasoning_idx(entries: &[Entry]) -> Option<usize> {
entries
.iter()
.rposition(|e| matches!(e.cell, HistoryCell::AgentReasoning { .. }))
}
/// True if the line is an empty spacer (single empty span).
fn is_blank_line(line: &Line<'_>) -> bool {
line.spans.len() == 1 && line.spans[0].content.is_empty()
}
/// Ensure that the vector has *at least* one body line after the header.
/// A freshly-created AgentMessage/Reasoning cell always has a header + blank line,
/// but streaming cells may be created empty; this makes sure we have a target line.
#[allow(dead_code)]
fn ensure_body_line(lines: &mut Vec<Line<'static>>) {
if lines.len() < 2 {
lines.push(Line::from(""));
}
}
/// Trim a single trailing blank spacer line (but preserve intentional paragraph breaks).
fn drop_trailing_blank_line(lines: &mut Vec<Line<'static>>) {
if let Some(last) = lines.last() {
if is_blank_line(last) {
lines.pop();
}
}
}
/// Append streaming text, honouring embedded newlines.
#[allow(dead_code)]
fn append_streaming_text_chunks(lines: &mut Vec<Line<'static>>, text: &str) {
// NOTE: This helper is now a fallback path only (we eagerly re-render accumulated markdown).
// Still, keep behaviour sane: drop trailing spacer, ensure a writable body line, then append.
drop_trailing_blank_line(lines);
ensure_body_line(lines);
if let Some(last_line) = lines.last_mut() {
last_line.spans.push(Span::raw(text.to_string()));
} else {
lines.push(Line::from(text.to_string()));
}
}
/// Re-measure a mutated entry at `width` columns and update its cached height.
fn update_entry_height(entry: &Entry, width: u16) {
entry.line_count.set(entry.cell.height(width));
}
/// Collapse *single* newlines in a streaming buffer into spaces so that interim streaming
/// renders more closely match final Markdown layout — *except* when we detect fenced code blocks.
/// If the accumulated text contains a Markdown code fence (``` or ~~~), we preserve **all**
/// newlines verbatim so multi-line code renders correctly while streaming.
fn collapse_single_newlines_for_streaming(src: &str) -> String {
// Quick fence detection. If we see a code fence marker anywhere in the accumulated text,
// skip collapsing entirely so we do not mangle code formatting.
if src.contains("```") || src.contains("~~~") {
return src.to_string();
}
let mut out = String::with_capacity(src.len());
let mut pending_newlines = 0usize;
for ch in src.chars() {
if ch == '\n' {
pending_newlines += 1;
continue;
}
if pending_newlines == 1 {
// soft break -> space
out.push(' ');
} else if pending_newlines > 1 {
// preserve paragraph breaks exactly
for _ in 0..pending_newlines {
out.push('\n');
}
}
pending_newlines = 0;
out.push(ch);
}
// flush tail
if pending_newlines == 1 {
out.push(' ');
} else if pending_newlines > 1 {
for _ in 0..pending_newlines {
out.push('\n');
}
}
out
}