From 5b820c5ce759158d274c22939eccc4839cb49c49 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Wed, 16 Jul 2025 08:59:26 -0700 Subject: [PATCH 1/5] feat: ctrl-d only exits when there is no user input (#1589) While this does make it so that `ctrl-d` will not exit Codex when the composer is not empty, `ctrl-d` will still exit Codex if it is in the "working" state. Fixes https://github.com/openai/codex/issues/1443. --- codex-rs/tui/src/app.rs | 16 +++++++++++++++- codex-rs/tui/src/bottom_pane/chat_composer.rs | 5 +++++ .../tui/src/bottom_pane/chat_composer_history.rs | 4 ++-- codex-rs/tui/src/bottom_pane/mod.rs | 4 ++++ codex-rs/tui/src/chatwidget.rs | 4 ++++ 5 files changed, 30 insertions(+), 3 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index e1dde8332d..33297ad372 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -199,7 +199,21 @@ impl<'a> App<'a> { modifiers: crossterm::event::KeyModifiers::CONTROL, .. } => { - self.app_event_tx.send(AppEvent::ExitRequest); + match &mut self.app_state { + AppState::Chat { widget } => { + if widget.composer_is_empty() { + self.app_event_tx.send(AppEvent::ExitRequest); + } else { + // Treat Ctrl+D as a normal key event when the composer + // is not empty so that it doesn't quit the application + // prematurely. + self.dispatch_key_event(key_event); + } + } + AppState::Login { .. } | AppState::GitWarning { .. } => { + self.app_event_tx.send(AppEvent::ExitRequest); + } + } } _ => { self.dispatch_key_event(key_event); diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index e89187d165..b49bce4046 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -76,6 +76,11 @@ impl ChatComposer<'_> { this } + /// Returns true if the composer currently contains no user input. + pub(crate) fn is_empty(&self) -> bool { + self.textarea.is_empty() + } + /// Update the cached *context-left* percentage and refresh the placeholder /// text. The UI relies on the placeholder to convey the remaining /// context when the composer is empty. diff --git a/codex-rs/tui/src/bottom_pane/chat_composer_history.rs b/codex-rs/tui/src/bottom_pane/chat_composer_history.rs index fc85c28262..5715c99492 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer_history.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer_history.rs @@ -72,8 +72,7 @@ impl ChatComposerHistory { return false; } - let lines = textarea.lines(); - if lines.len() == 1 && lines[0].is_empty() { + if textarea.is_empty() { return true; } @@ -85,6 +84,7 @@ impl ChatComposerHistory { return false; } + let lines = textarea.lines(); matches!(&self.last_history_text, Some(prev) if prev == &lines.join("\n")) } diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index 350492b3e9..e4ea1d3823 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -162,6 +162,10 @@ impl BottomPane<'_> { } } + pub(crate) fn composer_is_empty(&self) -> bool { + self.composer.is_empty() + } + pub(crate) fn is_task_running(&self) -> bool { self.is_task_running } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 865e339763..51fdfc3e8a 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -432,6 +432,10 @@ impl ChatWidget<'_> { } } + pub(crate) fn composer_is_empty(&self) -> bool { + self.bottom_pane.composer_is_empty() + } + /// Forward an `Op` directly to codex. pub(crate) fn submit_op(&self, op: Op) { if let Err(e) = self.codex_op_tx.send(op) { From 2bd3314886cb1fe341778f00fc0f0537f6a1c976 Mon Sep 17 00:00:00 2001 From: aibrahim-oai Date: Wed, 16 Jul 2025 15:11:18 -0700 Subject: [PATCH 2/5] support deltas in core (#1587) - Added support for message and reasoning deltas - Skipped adding the support in the cli and tui for later - Commented a failing test (wrong merge) that needs fix in a separate PR. Side note: I think we need to disable merge when the CI don't pass. --- codex-rs/core/src/chat_completions.rs | 8 +++++- codex-rs/core/src/client.rs | 24 ++++++++++++++--- codex-rs/core/src/client_common.rs | 2 ++ codex-rs/core/src/codex.rs | 27 +++++++++++++------- codex-rs/core/src/protocol.rs | 16 ++++++++++++ codex-rs/core/tests/cli_stream.rs | 4 +-- codex-rs/core/tests/stream_no_completed.rs | 2 ++ codex-rs/exec/src/event_processor.rs | 8 ++++++ codex-rs/mcp-server/src/codex_tool_runner.rs | 6 +++++ codex-rs/tui/src/chatwidget.rs | 8 ++++++ 10 files changed, 89 insertions(+), 16 deletions(-) diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 816fc80f9b..ad7b55952a 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -134,7 +134,7 @@ pub(crate) async fn stream_chat_completions( match res { Ok(resp) if resp.status().is_success() => { - let (tx_event, rx_event) = mpsc::channel::>(16); + let (tx_event, rx_event) = mpsc::channel::>(1600); let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); tokio::spawn(process_chat_sse(stream, tx_event)); return Ok(ResponseStream { rx_event }); @@ -426,6 +426,12 @@ where // will never appear in a Chat Completions stream. continue; } + Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_)))) + | Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { + // Deltas are ignored here since aggregation waits for the + // final OutputItemDone. + continue; + } } } } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 2fa182cf7f..8ec68d02e8 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -125,6 +125,7 @@ impl ModelClient { reasoning, previous_response_id: prompt.prev_id.clone(), store: prompt.store, + // TODO: make this configurable stream: true, }; @@ -148,7 +149,7 @@ impl ModelClient { let res = req_builder.send().await; match res { Ok(resp) if resp.status().is_success() => { - let (tx_event, rx_event) = mpsc::channel::>(16); + let (tx_event, rx_event) = mpsc::channel::>(1600); // spawn task to process SSE let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); @@ -205,6 +206,7 @@ struct SseEvent { kind: String, response: Option, item: Option, + delta: Option, } #[derive(Debug, Deserialize)] @@ -337,6 +339,22 @@ where return; } } + "response.output_text.delta" => { + if let Some(delta) = event.delta { + let event = ResponseEvent::OutputTextDelta(delta); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + } + } + "response.reasoning_summary_text.delta" => { + if let Some(delta) = event.delta { + let event = ResponseEvent::ReasoningSummaryDelta(delta); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + } + } "response.created" => { if event.response.is_some() { let _ = tx_event.send(Ok(ResponseEvent::Created {})).await; @@ -360,10 +378,8 @@ where | "response.function_call_arguments.delta" | "response.in_progress" | "response.output_item.added" - | "response.output_text.delta" | "response.output_text.done" | "response.reasoning_summary_part.added" - | "response.reasoning_summary_text.delta" | "response.reasoning_summary_text.done" => { // Currently, we ignore these events, but we handle them // separately to skip the logging message in the `other` case. @@ -375,7 +391,7 @@ where /// used in tests to stream from a text SSE file async fn stream_from_fixture(path: impl AsRef) -> Result { - let (tx_event, rx_event) = mpsc::channel::>(16); + let (tx_event, rx_event) = mpsc::channel::>(1600); let f = std::fs::File::open(path.as_ref())?; let lines = std::io::BufReader::new(f).lines(); diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index f9a816a7a9..3e3c2e7efa 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -57,6 +57,8 @@ pub enum ResponseEvent { response_id: String, token_usage: Option, }, + OutputTextDelta(String), + ReasoningSummaryDelta(String), } #[derive(Debug, Serialize)] diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 52c37c51ee..5227f93c8e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -61,7 +61,9 @@ use crate::models::ResponseInputItem; use crate::models::ResponseItem; use crate::models::ShellToolCallParams; use crate::project_doc::get_user_instructions; +use crate::protocol::AgentMessageDeltaEvent; use crate::protocol::AgentMessageEvent; +use crate::protocol::AgentReasoningDeltaEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; use crate::protocol::AskForApproval; @@ -103,7 +105,7 @@ impl Codex { /// submitted to start the session. pub async fn spawn(config: Config, ctrl_c: Arc) -> CodexResult<(Codex, String)> { let (tx_sub, rx_sub) = async_channel::bounded(64); - let (tx_event, rx_event) = async_channel::bounded(64); + let (tx_event, rx_event) = async_channel::bounded(1600); let instructions = get_user_instructions(&config).await; let configure_session = Op::ConfigureSession { @@ -1121,15 +1123,8 @@ async fn try_run_turn( let mut stream = sess.client.clone().stream(&prompt).await?; - // Buffer all the incoming messages from the stream first, then execute them. - // If we execute a function call in the middle of handling the stream, it can time out. - let mut input = Vec::new(); - while let Some(event) = stream.next().await { - input.push(event?); - } - let mut output = Vec::new(); - for event in input { + while let Some(Ok(event)) = stream.next().await { match event { ResponseEvent::Created => { let mut state = sess.state.lock().unwrap(); @@ -1172,6 +1167,20 @@ async fn try_run_turn( state.previous_response_id = Some(response_id); break; } + ResponseEvent::OutputTextDelta(delta) => { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }), + }; + sess.tx_event.send(event).await.ok(); + } + ResponseEvent::ReasoningSummaryDelta(delta) => { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), + }; + sess.tx_event.send(event).await.ok(); + } } } Ok(output) diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index fa25a2fe38..b233d4f27b 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -282,9 +282,15 @@ pub enum EventMsg { /// Agent text output message AgentMessage(AgentMessageEvent), + /// Agent text output delta message + AgentMessageDelta(AgentMessageDeltaEvent), + /// Reasoning event from agent. AgentReasoning(AgentReasoningEvent), + /// Agent reasoning delta event from agent. + AgentReasoningDelta(AgentReasoningDeltaEvent), + /// Ack the client's configure message. SessionConfigured(SessionConfiguredEvent), @@ -340,11 +346,21 @@ pub struct AgentMessageEvent { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AgentMessageDeltaEvent { + pub delta: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentReasoningEvent { pub text: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AgentReasoningDeltaEvent { + pub delta: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct McpToolCallBeginEvent { /// Identifier so this can be paired with the McpToolCallEnd event. diff --git a/codex-rs/core/tests/cli_stream.rs b/codex-rs/core/tests/cli_stream.rs index df3fedfd48..9ef042eb1a 100644 --- a/codex-rs/core/tests/cli_stream.rs +++ b/codex-rs/core/tests/cli_stream.rs @@ -71,8 +71,8 @@ async fn chat_mode_stream_cli() { println!("Stderr:\n{}", String::from_utf8_lossy(&output.stderr)); assert!(output.status.success()); let stdout = String::from_utf8_lossy(&output.stdout); - assert!(stdout.contains("hi")); - assert_eq!(stdout.matches("hi").count(), 1); + let hi_lines = stdout.lines().filter(|line| line.trim() == "hi").count(); + assert_eq!(hi_lines, 1, "Expected exactly one line with 'hi'"); server.verify().await; } diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index da2736aa77..8883eff373 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -32,6 +32,8 @@ fn sse_completed(id: &str) -> String { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// this test is flaky (has race conditions), so we ignore it for now +#[ignore] async fn retries_on_early_close() { #![allow(clippy::unwrap_used)] diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 540e014298..2a7c4c621b 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -3,7 +3,9 @@ use codex_common::summarize_sandbox_policy; use codex_core::WireApi; use codex_core::config::Config; use codex_core::model_supports_reasoning_summaries; +use codex_core::protocol::AgentMessageDeltaEvent; use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::AgentReasoningDeltaEvent; use codex_core::protocol::BackgroundEventEvent; use codex_core::protocol::ErrorEvent; use codex_core::protocol::Event; @@ -184,6 +186,12 @@ impl EventProcessor { EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => { ts_println!(self, "tokens used: {total_tokens}"); } + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: _ }) => { + // TODO: think how we want to support this in the CLI + } + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta: _ }) => { + // TODO: think how we want to support this in the CLI + } EventMsg::AgentMessage(AgentMessageEvent { message }) => { ts_println!( self, diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 7c3b02fe5e..88dcf649dc 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -171,6 +171,12 @@ pub async fn run_codex_tool_session( EventMsg::SessionConfigured(_) => { tracing::error!("unexpected SessionConfigured event"); } + EventMsg::AgentMessageDelta(_) => { + // TODO: think how we want to support this in the MCP + } + EventMsg::AgentReasoningDelta(_) => { + // TODO: think how we want to support this in the MCP + } EventMsg::Error(_) | EventMsg::TaskStarted | EventMsg::TokenCount(_) diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 51fdfc3e8a..28014c6e40 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use codex_core::codex_wrapper::init_codex; use codex_core::config::Config; +use codex_core::protocol::AgentMessageDeltaEvent; use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::AgentReasoningDeltaEvent; use codex_core::protocol::AgentReasoningEvent; use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::ErrorEvent; @@ -375,6 +377,12 @@ impl ChatWidget<'_> { self.bottom_pane .on_history_entry_response(log_id, offset, entry.map(|e| e.text)); } + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: _ }) => { + // TODO: think how we want to support this in the TUI + } + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta: _ }) => { + // TODO: think how we want to support this in the TUI + } event => { self.conversation_history .add_background_event(format!("{event:?}")); From 0bc7ee91937d5cb9ff0e6a77e35cc89a91a81892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Preet=20=F0=9F=9A=80?= <114741744+preetDev004@users.noreply.github.com> Date: Wed, 16 Jul 2025 19:00:39 -0400 Subject: [PATCH 3/5] Added mcp-server name validation (#1591) This PR implements server name validation for MCP (Model Context Protocol) servers to ensure they conform to the required pattern ^[a-zA-Z0-9_-]+$. This addresses the TODO comment in mcp_connection_manager.rs:82. + Added validation before spawning MCP client tasks + Invalid server names are added to errors map with descriptive messages I have read the CLA Document and I hereby sign the CLA --------- Co-authored-by: Michael Bolin --- codex-rs/core/src/mcp_connection_manager.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index 6ae1865f16..7cf6762752 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -79,9 +79,19 @@ impl McpConnectionManager { // Launch all configured servers concurrently. let mut join_set = JoinSet::new(); + let mut errors = ClientStartErrors::new(); for (server_name, cfg) in mcp_servers { - // TODO: Verify server name: require `^[a-zA-Z0-9_-]+$`? + // Validate server name before spawning + if !is_valid_mcp_server_name(&server_name) { + let error = anyhow::anyhow!( + "invalid server name '{}': must match pattern ^[a-zA-Z0-9_-]+$", + server_name + ); + errors.insert(server_name, error); + continue; + } + join_set.spawn(async move { let McpServerConfig { command, args, env } = cfg; let client_res = McpClient::new_stdio_client(command, args, env).await; @@ -117,7 +127,6 @@ impl McpConnectionManager { let mut clients: HashMap> = HashMap::with_capacity(join_set.len()); - let mut errors = ClientStartErrors::new(); while let Some(res) = join_set.join_next().await { let (server_name, client_res) = res?; // JoinError propagation @@ -208,3 +217,10 @@ pub async fn list_all_tools( Ok(aggregated) } + +fn is_valid_mcp_server_name(server_name: &str) -> bool { + !server_name.is_empty() + && server_name + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-') +} From d3dbc104798eb15ab88c71ce8a33d42f78668c96 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Wed, 16 Jul 2025 16:35:29 -0700 Subject: [PATCH 4/5] fix: update bin/codex.js so it listens for exit on the child process (#1590) When Codex CLI is installed via `npm`, we use a `.js` wrapper script to launch the Rust binary. - Previously, we were not listening for signals to ensure that killing the Node.js process would also kill the underlying Rust process. - We also did not have a proper `exit` handler in place on the child process to ensure we exited from the Node.js process. This PR fixes these things and hopefully addresses https://github.com/openai/codex/issues/1570. This also adds logic so that Windows falls back to the TypeScript CLI again, which should address https://github.com/openai/codex/issues/1573. --- codex-cli/bin/codex.js | 79 +++++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/codex-cli/bin/codex.js b/codex-cli/bin/codex.js index 54b99078e4..ae1fb9593c 100755 --- a/codex-cli/bin/codex.js +++ b/codex-cli/bin/codex.js @@ -15,7 +15,6 @@ * current platform / architecture, an error is thrown. */ -import { spawnSync } from "child_process"; import fs from "fs"; import path from "path"; import { fileURLToPath, pathToFileURL } from "url"; @@ -35,7 +34,7 @@ const wantsNative = fs.existsSync(path.join(__dirname, "use-native")) || : false); // Try native binary if requested. -if (wantsNative) { +if (wantsNative && process.platform !== 'win32') { const { platform, arch } = process; let targetTriple = null; @@ -74,22 +73,76 @@ if (wantsNative) { } const binaryPath = path.join(__dirname, "..", "bin", `codex-${targetTriple}`); - const result = spawnSync(binaryPath, process.argv.slice(2), { + + // Use an asynchronous spawn instead of spawnSync so that Node is able to + // respond to signals (e.g. Ctrl-C / SIGINT) while the native binary is + // executing. This allows us to forward those signals to the child process + // and guarantees that when either the child terminates or the parent + // receives a fatal signal, both processes exit in a predictable manner. + const { spawn } = await import("child_process"); + + const child = spawn(binaryPath, process.argv.slice(2), { stdio: "inherit", }); - const exitCode = typeof result.status === "number" ? result.status : 1; - process.exit(exitCode); -} + child.on("error", (err) => { + // Typically triggered when the binary is missing or not executable. + // Re-throwing here will terminate the parent with a non-zero exit code + // while still printing a helpful stack trace. + // eslint-disable-next-line no-console + console.error(err); + process.exit(1); + }); -// Fallback: execute the original JavaScript CLI. + // Forward common termination signals to the child so that it shuts down + // gracefully. In the handler we temporarily disable the default behavior of + // exiting immediately; once the child has been signaled we simply wait for + // its exit event which will in turn terminate the parent (see below). + const forwardSignal = (signal) => { + if (child.killed) { + return; + } + try { + child.kill(signal); + } catch { + /* ignore */ + } + }; -// Resolve the path to the compiled CLI bundle -const cliPath = path.resolve(__dirname, "../dist/cli.js"); -const cliUrl = pathToFileURL(cliPath).href; + ["SIGINT", "SIGTERM", "SIGHUP"].forEach((sig) => { + process.on(sig, () => forwardSignal(sig)); + }); -// Load and execute the CLI -(async () => { + // When the child exits, mirror its termination reason in the parent so that + // shell scripts and other tooling observe the correct exit status. + // Wrap the lifetime of the child process in a Promise so that we can await + // its termination in a structured way. The Promise resolves with an object + // describing how the child exited: either via exit code or due to a signal. + const childResult = await new Promise((resolve) => { + child.on("exit", (code, signal) => { + if (signal) { + resolve({ type: "signal", signal }); + } else { + resolve({ type: "code", exitCode: code ?? 1 }); + } + }); + }); + + if (childResult.type === "signal") { + // Re-emit the same signal so that the parent terminates with the expected + // semantics (this also sets the correct exit code of 128 + n). + process.kill(process.pid, childResult.signal); + } else { + process.exit(childResult.exitCode); + } +} else { + // Fallback: execute the original JavaScript CLI. + + // Resolve the path to the compiled CLI bundle + const cliPath = path.resolve(__dirname, "../dist/cli.js"); + const cliUrl = pathToFileURL(cliPath).href; + + // Load and execute the CLI try { await import(cliUrl); } catch (err) { @@ -97,4 +150,4 @@ const cliUrl = pathToFileURL(cliPath).href; console.error(err); process.exit(1); } -})(); +} From 643ab1f582a248a9f995bf94110d28fe9677d387 Mon Sep 17 00:00:00 2001 From: aibrahim-oai Date: Wed, 16 Jul 2025 22:26:31 -0700 Subject: [PATCH 5/5] Add streaming to exec and tui (#1594) Added support for streaming in `tui` Added support for streaming in `exec` https://github.com/user-attachments/assets/4215892e-d940-452c-a1d0-416ed0cf14eb --- codex-rs/exec/src/event_processor.rs | 77 ++++++++++++++----- codex-rs/exec/src/lib.rs | 3 +- codex-rs/tui/src/app.rs | 2 + codex-rs/tui/src/chatwidget.rs | 53 ++++++++++--- .../tui/src/conversation_history_widget.rs | 46 ++++++++++- 5 files changed, 149 insertions(+), 32 deletions(-) diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 2a7c4c621b..5ab09994b1 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -23,6 +23,7 @@ use owo_colors::OwoColorize; use owo_colors::Style; use shlex::try_join; use std::collections::HashMap; +use std::io::Write; use std::time::Instant; /// This should be configurable. When used in CI, users may not want to impose @@ -52,10 +53,12 @@ pub(crate) struct EventProcessor { /// Whether to include `AgentReasoning` events in the output. show_agent_reasoning: bool, + answer_started: bool, + reasoning_started: bool, } impl EventProcessor { - pub(crate) fn create_with_ansi(with_ansi: bool, show_agent_reasoning: bool) -> Self { + pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self { let call_id_to_command = HashMap::new(); let call_id_to_patch = HashMap::new(); let call_id_to_tool_call = HashMap::new(); @@ -72,7 +75,9 @@ impl EventProcessor { green: Style::new().green(), cyan: Style::new().cyan(), call_id_to_tool_call, - show_agent_reasoning, + show_agent_reasoning: !config.hide_agent_reasoning, + answer_started: false, + reasoning_started: false, } } else { Self { @@ -86,7 +91,9 @@ impl EventProcessor { green: Style::new(), cyan: Style::new(), call_id_to_tool_call, - show_agent_reasoning, + show_agent_reasoning: !config.hide_agent_reasoning, + answer_started: false, + reasoning_started: false, } } } @@ -186,18 +193,45 @@ impl EventProcessor { EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => { ts_println!(self, "tokens used: {total_tokens}"); } - EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the CLI + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { + if !self.answer_started { + ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta)); + self.answer_started = true; + } + print!("{delta}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); } - EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the CLI + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { + if !self.show_agent_reasoning { + return; + } + if !self.reasoning_started { + ts_println!( + self, + "{}\n", + "thinking".style(self.italic).style(self.magenta), + ); + self.reasoning_started = true; + } + print!("{delta}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); } EventMsg::AgentMessage(AgentMessageEvent { message }) => { - ts_println!( - self, - "{}\n{message}", - "codex".style(self.bold).style(self.magenta) - ); + // if answer_started is false, this means we haven't received any + // delta. Thus, we need to print the message as a new answer. + if !self.answer_started { + ts_println!( + self, + "{}\n{}", + "codex".style(self.italic).style(self.magenta), + message, + ); + } else { + println!(); + self.answer_started = false; + } } EventMsg::ExecCommandBegin(ExecCommandBeginEvent { call_id, @@ -351,7 +385,7 @@ impl EventProcessor { ); // Pretty-print the patch summary with colored diff markers so - // it’s easy to scan in the terminal output. + // it's easy to scan in the terminal output. for (path, change) in changes.iter() { match change { FileChange::Add { content } => { @@ -449,12 +483,17 @@ impl EventProcessor { } EventMsg::AgentReasoning(agent_reasoning_event) => { if self.show_agent_reasoning { - ts_println!( - self, - "{}\n{}", - "thinking".style(self.italic).style(self.magenta), - agent_reasoning_event.text - ); + if !self.reasoning_started { + ts_println!( + self, + "{}\n{}", + "codex".style(self.italic).style(self.magenta), + agent_reasoning_event.text, + ); + } else { + println!(); + self.reasoning_started = false; + } } } EventMsg::SessionConfigured(session_configured_event) => { diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 44dddd4d0f..afefed1a93 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -115,8 +115,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any }; let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; - let mut event_processor = - EventProcessor::create_with_ansi(stdout_with_ansi, !config.hide_agent_reasoning); + let mut event_processor = EventProcessor::create_with_ansi(stdout_with_ansi, &config); // Print the effective configuration and prompt so users can see what Codex // is using. event_processor.print_config_summary(&config, &prompt); diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 33297ad372..883250400d 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -297,6 +297,8 @@ impl<'a> App<'a> { } fn draw_next_frame(&mut self, terminal: &mut tui::Tui) -> Result<()> { + // TODO: add a throttle to avoid redrawing too often + match &mut self.app_state { AppState::Chat { widget } => { terminal.draw(|frame| frame.render_widget_ref(&**widget, frame.area()))?; diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 28014c6e40..860439ffb6 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -51,6 +51,8 @@ pub(crate) struct ChatWidget<'a> { config: Config, initial_user_message: Option, token_usage: TokenUsage, + reasoning_buffer: String, + answer_buffer: String, } #[derive(Clone, Copy, Eq, PartialEq)] @@ -137,6 +139,8 @@ impl ChatWidget<'_> { initial_images, ), token_usage: TokenUsage::default(), + reasoning_buffer: String::new(), + answer_buffer: String::new(), } } @@ -242,16 +246,51 @@ impl ChatWidget<'_> { self.request_redraw(); } EventMsg::AgentMessage(AgentMessageEvent { message }) => { + // if the answer buffer is empty, this means we haven't received any + // delta. Thus, we need to print the message as a new answer. + if self.answer_buffer.is_empty() { + self.conversation_history + .add_agent_message(&self.config, message); + } else { + self.conversation_history + .replace_prev_agent_message(&self.config, message); + } + self.answer_buffer.clear(); + self.request_redraw(); + } + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { + if self.answer_buffer.is_empty() { + self.conversation_history + .add_agent_message(&self.config, "".to_string()); + } + self.answer_buffer.push_str(&delta.clone()); self.conversation_history - .add_agent_message(&self.config, message); + .replace_prev_agent_message(&self.config, self.answer_buffer.clone()); + self.request_redraw(); + } + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { + if self.reasoning_buffer.is_empty() { + self.conversation_history + .add_agent_reasoning(&self.config, "".to_string()); + } + self.reasoning_buffer.push_str(&delta.clone()); + self.conversation_history + .replace_prev_agent_reasoning(&self.config, self.reasoning_buffer.clone()); self.request_redraw(); } EventMsg::AgentReasoning(AgentReasoningEvent { text }) => { - if !self.config.hide_agent_reasoning { + // if the reasoning buffer is empty, this means we haven't received any + // delta. Thus, we need to print the message as a new reasoning. + if self.reasoning_buffer.is_empty() { self.conversation_history - .add_agent_reasoning(&self.config, text); - self.request_redraw(); + .add_agent_reasoning(&self.config, "".to_string()); + } else { + // else, we rerender one last time. + self.conversation_history + .replace_prev_agent_reasoning(&self.config, text); } + self.reasoning_buffer.clear(); + self.request_redraw(); } EventMsg::TaskStarted => { self.bottom_pane.clear_ctrl_c_quit_hint(); @@ -377,12 +416,6 @@ impl ChatWidget<'_> { self.bottom_pane .on_history_entry_response(log_id, offset, entry.map(|e| e.text)); } - EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the TUI - } - EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the TUI - } event => { self.conversation_history .add_background_event(format!("{event:?}")); diff --git a/codex-rs/tui/src/conversation_history_widget.rs b/codex-rs/tui/src/conversation_history_widget.rs index c0e5031d70..01a8dc6834 100644 --- a/codex-rs/tui/src/conversation_history_widget.rs +++ b/codex-rs/tui/src/conversation_history_widget.rs @@ -202,6 +202,14 @@ impl ConversationHistoryWidget { self.add_to_history(HistoryCell::new_agent_reasoning(config, text)); } + pub fn replace_prev_agent_reasoning(&mut self, config: &Config, text: String) { + self.replace_last_agent_reasoning(config, text); + } + + pub fn replace_prev_agent_message(&mut self, config: &Config, text: String) { + self.replace_last_agent_message(config, text); + } + pub fn add_background_event(&mut self, message: String) { self.add_to_history(HistoryCell::new_background_event(message)); } @@ -249,6 +257,42 @@ impl ConversationHistoryWidget { }); } + pub fn replace_last_agent_reasoning(&mut self, config: &Config, text: String) { + if let Some(idx) = self + .entries + .iter() + .rposition(|entry| matches!(entry.cell, HistoryCell::AgentReasoning { .. })) + { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_reasoning(config, text); + let height = if width > 0 { + entry.cell.height(width) + } else { + 0 + }; + entry.line_count.set(height); + } + } + + pub fn replace_last_agent_message(&mut self, config: &Config, text: String) { + if let Some(idx) = self + .entries + .iter() + .rposition(|entry| matches!(entry.cell, HistoryCell::AgentMessage { .. })) + { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_message(config, text); + let height = if width > 0 { + entry.cell.height(width) + } else { + 0 + }; + entry.line_count.set(height); + } + } + pub fn record_completed_exec_command( &mut self, call_id: String, @@ -454,7 +498,7 @@ impl WidgetRef for ConversationHistoryWidget { { // Choose a thumb color that stands out only when this pane has focus so that the - // user’s attention is naturally drawn to the active viewport. When unfocused we show + // user's attention is naturally drawn to the active viewport. When unfocused we show // a low-contrast thumb so the scrollbar fades into the background without becoming // invisible. let thumb_style = if self.has_input_focus {