Compare commits

...

35 Commits

Author SHA1 Message Date
Ahmed Ibrahim
9780b6d550 400 error tool calling fix 2025-07-18 00:22:16 -07:00
Ahmed Ibrahim
db54493c92 bug fix 2025-07-17 21:42:32 -07:00
Ahmed Ibrahim
aabce31e84 review 2025-07-17 21:25:28 -07:00
Ahmed Ibrahim
78c6f0eb70 Merge branch 'codex/remove-unsafe-env-variable-pattern' of github.com:openai/codex into codex/remove-unsafe-env-variable-pattern 2025-07-17 21:20:31 -07:00
Ahmed Ibrahim
098462494e review 2025-07-17 21:20:25 -07:00
aibrahim-oai
761ea58759 Update codex-rs/core/src/model_provider_info.rs
Co-authored-by: Michael Bolin <mbolin@openai.com>
2025-07-17 21:18:02 -07:00
Ahmed Ibrahim
4a7b8aaace Merge branch 'codex/remove-unsafe-env-variable-pattern' of github.com:openai/codex into codex/remove-unsafe-env-variable-pattern 2025-07-17 19:53:46 -07:00
Ahmed Ibrahim
2e30a84c68 review 2025-07-17 19:53:35 -07:00
aibrahim-oai
52d6655de9 Update codex-rs/core/src/codex.rs
Co-authored-by: Michael Bolin <mbolin@openai.com>
2025-07-17 19:52:15 -07:00
Ahmed Ibrahim
3ef1f26ecc removing ignore 2025-07-17 18:17:05 -07:00
Ahmed Ibrahim
aad6dc1e4c Merge branch 'codex/remove-unsafe-env-variable-pattern' of github.com:openai/codex into codex/remove-unsafe-env-variable-pattern 2025-07-17 17:48:09 -07:00
Ahmed Ibrahim
aa4f9dff7a preserving older test config 2025-07-17 17:47:48 -07:00
aibrahim-oai
3baa5a73ae Update codex-rs/config.md
Co-authored-by: Michael Bolin <mbolin@openai.com>
2025-07-17 17:40:50 -07:00
Ahmed Ibrahim
fb8622ac6a Merge branch 'codex/remove-unsafe-env-variable-pattern' of github.com:openai/codex into codex/remove-unsafe-env-variable-pattern 2025-07-17 17:40:24 -07:00
Ahmed Ibrahim
0b30945eef remove openai_ prefix 2025-07-17 17:40:15 -07:00
aibrahim-oai
790c5ace10 Merge branch 'main' into codex/remove-unsafe-env-variable-pattern 2025-07-17 17:16:21 -07:00
Ahmed Ibrahim
7bcc77bb3c remove them from env flags 2025-07-17 17:16:03 -07:00
aibrahim-oai
9cedeadf6a change the default debounce rate to 10ms (#1606)
changed the default debounce rate to 10ms because typing was laggy.

Before:


https://github.com/user-attachments/assets/e5d15fcb-6a2b-4837-b2b4-c3dcb4cc3409

After



https://github.com/user-attachments/assets/6f0005eb-fd49-4130-ba68-635ee0f2831f
2025-07-17 17:00:17 -07:00
Ahmed Ibrahim
80bc428b37 failing test 2025-07-17 16:44:50 -07:00
Ahmed Ibrahim
9b3e1a8b56 review 2025-07-17 16:32:21 -07:00
aibrahim-oai
666a546adc Merge branch 'main' into codex/remove-unsafe-env-variable-pattern 2025-07-17 16:32:01 -07:00
pakrym-oai
327e2254f6 chore: rename toolchain file (#1604)
Rename toolchain file so older versions of cargo can pick it up.
2025-07-17 15:36:15 -07:00
Michael Bolin
e16657ca45 feat: add --json flag to codex exec (#1603)
This is designed to facilitate programmatic use of Codex in a more
lightweight way than using `codex mcp`.

Passing `--json` to `codex exec` will print each event as a line of JSON
to stdout. Note that it does not print the individual tokens as they are
streamed, only full messages, as this is aimed at programmatic use
rather than to power UI.

<img width="1348" height="1307" alt="image"
src="https://github.com/user-attachments/assets/fc7908de-b78d-46e4-a6ff-c85de28415c7"
/>

I changed the existing `EventProcessor` into a trait and moved the
implementation to `EventProcessorWithHumanOutput`. Then I introduced an
alternative implementation, `EventProcessorWithJsonOutput`. The `--json`
flag determines which implementation to use.
2025-07-17 15:10:15 -07:00
aibrahim-oai
f90d91b1c3 Merge branch 'main' into codex/remove-unsafe-env-variable-pattern 2025-07-17 12:55:06 -07:00
aibrahim-oai
bb30ab9e96 Implement redraw debounce (#1599)
## Summary
- debouce redraw events so repeated requests don't overwhelm the
terminal
- add `RequestRedraw` event and schedule redraws after 100ms

## Testing
- `cargo clippy --tests`
- `cargo test` *(fails: Sandbox Denied errors in landlock tests)*

------
https://chatgpt.com/codex/tasks/task_i_68792a65b8b483218ec90a8f68746cd8

---------

Co-authored-by: Michael Bolin <mbolin@openai.com>
2025-07-17 12:54:55 -07:00
Ahmed Ibrahim
b73b211ee5 cargo 2025-07-17 12:44:14 -07:00
Ahmed Ibrahim
2bb8d37b12 convert to model provider 2025-07-17 12:38:50 -07:00
aibrahim-oai
79825c08f1 Merge branch 'main' into codex/remove-unsafe-env-variable-pattern 2025-07-17 11:42:22 -07:00
pakrym-oai
6949329a7f chore: auto format code on save and add more details to AGENTS.md (#1582)
Adds a default vscode config with generally applicable settings.
Adds more entrypoints to justfile both  for environment setup and to help
agents better verify changes.
2025-07-17 11:40:00 -07:00
pakrym-oai
b95a010e86 fix: trim MCP tool names to fit into tool name length limit (#1571)
Store fully qualified names along with tool entries so we don't have to re-parse them.

Fixes: https://github.com/openai/codex/issues/1289
2025-07-17 11:35:38 -07:00
aibrahim-oai
4758897e6f Merge branch 'main' into codex/remove-unsafe-env-variable-pattern 2025-07-17 10:52:08 -07:00
Ahmed Ibrahim
6655653d77 remove unsafe 2025-07-17 10:51:42 -07:00
Ahmed Ibrahim
df04fddbc4 remove unsafe 2025-07-17 10:51:19 -07:00
aibrahim-oai
47725f9fa8 refactor: use config for openai retries 2025-07-17 10:40:28 -07:00
aibrahim-oai
fcbcc40f51 Storing the sessions in a more organized way for easier look up. (#1596)
now storing the sessions in `~/.codex/sessions/YYYY/MM/DD/<file>`
2025-07-17 10:12:15 -07:00
31 changed files with 1478 additions and 656 deletions

18
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,18 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Cargo launch",
"cargo": {
"cwd": "${workspaceFolder}/codex-rs",
"args": [
"build",
"--bin=codex-tui"
]
},
"args": []
}
]
}

10
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,10 @@
{
"rust-analyzer.checkOnSave": true,
"rust-analyzer.check.command": "clippy",
"rust-analyzer.check.extraArgs": ["--all-features", "--tests"],
"rust-analyzer.rustfmt.extraArgs": ["--config", "imports_granularity=Item"],
"[rust]": {
"editor.defaultFormatter": "rust-lang.rust-analyzer",
"editor.formatOnSave": true,
}
}

View File

@@ -3,3 +3,7 @@
In the codex-rs folder where the rust code lives:
- Never add or modify any code related to `CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR`. You operate in a sandbox where `CODEX_SANDBOX_NETWORK_DISABLED=1` will be set whenever you use the `shell` tool. Any existing code that uses `CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR` was authored with this fact in mind. It is often used to early exit out of tests that the author knew you would not be able to run given your sandbox limitations.
Before creating a pull request with changes to `codex-rs`, run `just fmt` (in `codex-rs` directory) to format the code and `just fix` (in `codex-rs` directory) to fix any linter issues in the code, ensure the test suite passes by running `cargo test --all-features` in the `codex-rs` directory.
When making individual changes prefer running tests on individual files or projects first.

67
codex-rs/Cargo.lock generated
View File

@@ -399,6 +399,15 @@ version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6099cdc01846bc367c4e7dd630dc5966dccf36b652fae7a74e17b640411a91b2"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "bstr"
version = "1.12.0"
@@ -671,6 +680,7 @@ dependencies = [
"seccompiler",
"serde",
"serde_json",
"sha1",
"strum_macros 0.27.1",
"tempfile",
"thiserror 2.0.12",
@@ -683,6 +693,7 @@ dependencies = [
"tree-sitter",
"tree-sitter-bash",
"uuid",
"walkdir",
"wildmatch",
"wiremock",
]
@@ -932,6 +943,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.4.2"
@@ -1006,6 +1026,16 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "ctor"
version = "0.1.26"
@@ -1156,6 +1186,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "dirs"
version = "6.0.0"
@@ -1645,6 +1685,16 @@ dependencies = [
"byteorder",
]
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getopts"
version = "0.2.23"
@@ -3944,6 +3994,17 @@ dependencies = [
"syn 2.0.104",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -4851,6 +4912,12 @@ dependencies = [
"unicode-width 0.2.0",
]
[[package]]
name = "typenum"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
name = "unicase"
version = "2.8.1"

View File

@@ -92,6 +92,32 @@ http_headers = { "X-Example-Header" = "example-value" }
env_http_headers = { "X-Example-Features": "EXAMPLE_FEATURES" }
```
### Per-provider network tuning
The following optional settings control retry behaviour and streaming idle timeouts **per model provider**. They must be specified inside the corresponding `[model_providers.<id>]` block in `config.toml`. (Older releases accepted toplevel keys; those are now ignored.)
Example:
```toml
[model_providers.openai]
name = "OpenAI"
base_url = "https://api.openai.com/v1"
env_key = "OPENAI_API_KEY"
# network tuning overrides (all optional; falls back to builtin defaults)
request_max_retries = 4 # retry failed HTTP requests
stream_max_retries = 10 # retry dropped SSE streams
stream_idle_timeout_ms = 300000 # 5m idle timeout
```
#### request_max_retries
How many times Codex will retry a failed HTTP request to the model provider. Defaults to `4`.
#### stream_max_retries
Number of times Codex will attempt to reconnect when a streaming response is interrupted. Defaults to `10`.
#### stream_idle_timeout_ms
How long Codex will wait for activity on a streaming response before treating the connection as lost. Defaults to `300_000` (5 minutes).
## model_provider
Identifies which provider to use from the `model_providers` map. Defaults to `"openai"`. You can override the `base_url` for the built-in `openai` provider via the `OPENAI_BASE_URL` environment variable.
@@ -444,7 +470,7 @@ Currently, `"vscode"` is the default, though Codex does not verify VS Code is in
## hide_agent_reasoning
Codex intermittently emits "reasoning" events that show the models internal "thinking" before it produces a final answer. Some users may find these events distracting, especially in CI logs or minimal terminal output.
Codex intermittently emits "reasoning" events that show the model's internal "thinking" before it produces a final answer. Some users may find these events distracting, especially in CI logs or minimal terminal output.
Setting `hide_agent_reasoning` to `true` suppresses these events in **both** the TUI as well as the headless `exec` sub-command:

View File

@@ -28,6 +28,7 @@ rand = "0.9"
reqwest = { version = "0.12", features = ["json", "stream"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha1 = "0.10.6"
strum_macros = "0.27.1"
thiserror = "2.0.12"
time = { version = "0.3", features = ["formatting", "local-offset", "macros"] }
@@ -65,4 +66,5 @@ predicates = "3"
pretty_assertions = "1.4.1"
tempfile = "3"
tokio-test = "0.4"
walkdir = "2.5.0"
wiremock = "0.6"

View File

@@ -21,8 +21,6 @@ use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::error::CodexErr;
use crate::error::Result;
use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
use crate::models::ContentItem;
use crate::models::ResponseItem;
use crate::openai_tools::create_tools_json_for_chat_completions_api;
@@ -121,6 +119,7 @@ pub(crate) async fn stream_chat_completions(
);
let mut attempt = 0;
let max_retries = provider.request_max_retries();
loop {
attempt += 1;
@@ -136,7 +135,11 @@ pub(crate) async fn stream_chat_completions(
Ok(resp) if resp.status().is_success() => {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
tokio::spawn(process_chat_sse(stream, tx_event));
tokio::spawn(process_chat_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
));
return Ok(ResponseStream { rx_event });
}
Ok(res) => {
@@ -146,7 +149,7 @@ pub(crate) async fn stream_chat_completions(
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(CodexErr::RetryLimit(status));
}
@@ -162,7 +165,7 @@ pub(crate) async fn stream_chat_completions(
tokio::time::sleep(delay).await;
}
Err(e) => {
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(e.into());
}
let delay = backoff(attempt);
@@ -175,14 +178,15 @@ pub(crate) async fn stream_chat_completions(
/// Lightweight SSE processor for the Chat Completions streaming format. The
/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
/// of the pipeline can stay agnostic of the underlying wire format.
async fn process_chat_sse<S>(stream: S, tx_event: mpsc::Sender<Result<ResponseEvent>>)
where
async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
let mut stream = stream.eventsource();
let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS;
// State to accumulate a function call across streaming chunks.
// OpenAI may split the `arguments` string over multiple `delta` events
// until the chunk whose `finish_reason` is `tool_calls` is emitted. We

View File

@@ -29,8 +29,6 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::error::CodexErr;
use crate::error::Result;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::models::ResponseItem;
@@ -109,7 +107,7 @@ impl ModelClient {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
// short circuit for tests
warn!(path, "Streaming from fixture");
return stream_from_fixture(path).await;
return stream_from_fixture(path, self.provider.clone()).await;
}
let full_instructions = prompt.get_full_instructions(&self.config.model);
@@ -136,6 +134,7 @@ impl ModelClient {
);
let mut attempt = 0;
let max_retries = self.provider.request_max_retries();
loop {
attempt += 1;
@@ -153,7 +152,11 @@ impl ModelClient {
// spawn task to process SSE
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
tokio::spawn(process_sse(stream, tx_event));
tokio::spawn(process_sse(
stream,
tx_event,
self.provider.stream_idle_timeout(),
));
return Ok(ResponseStream { rx_event });
}
@@ -172,7 +175,7 @@ impl ModelClient {
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(CodexErr::RetryLimit(status));
}
@@ -189,7 +192,7 @@ impl ModelClient {
tokio::time::sleep(delay).await;
}
Err(e) => {
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(e.into());
}
let delay = backoff(attempt);
@@ -198,6 +201,10 @@ impl ModelClient {
}
}
}
pub fn get_provider(&self) -> ModelProviderInfo {
self.provider.clone()
}
}
#[derive(Debug, Deserialize, Serialize)]
@@ -249,14 +256,16 @@ struct ResponseCompletedOutputTokensDetails {
reasoning_tokens: u64,
}
async fn process_sse<S>(stream: S, tx_event: mpsc::Sender<Result<ResponseEvent>>)
where
async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
let mut stream = stream.eventsource();
// If the stream stays completely silent for an extended period treat it as disconnected.
let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS;
// The response id returned from the "complete" message.
let mut response_completed: Option<ResponseCompleted> = None;
@@ -317,7 +326,7 @@ where
// duplicated `output` array embedded in the `response.completed`
// payload. That produced two concrete issues:
// 1. No realtime streaming the user only saw output after the
// entire turn had finished, which broke the typing UX and
// entire turn had finished, which broke the "typing" UX and
// made longrunning turns look stalled.
// 2. Duplicate `function_call_output` items both the
// individual *and* the completed array were forwarded, which
@@ -390,7 +399,10 @@ where
}
/// used in tests to stream from a text SSE file
async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
async fn stream_from_fixture(
path: impl AsRef<Path>,
provider: ModelProviderInfo,
) -> Result<ResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let f = std::fs::File::open(path.as_ref())?;
let lines = std::io::BufReader::new(f).lines();
@@ -404,7 +416,11 @@ async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
let rdr = std::io::Cursor::new(content);
let stream = ReaderStream::new(rdr).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx_event));
tokio::spawn(process_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
));
Ok(ResponseStream { rx_event })
}
@@ -424,7 +440,10 @@ mod tests {
/// Runs the SSE parser on pre-chunked byte slices and returns every event
/// (including any final `Err` from a stream-closure check).
async fn collect_events(chunks: &[&[u8]]) -> Vec<Result<ResponseEvent>> {
async fn collect_events(
chunks: &[&[u8]],
provider: ModelProviderInfo,
) -> Vec<Result<ResponseEvent>> {
let mut builder = IoBuilder::new();
for chunk in chunks {
builder.read(chunk);
@@ -433,7 +452,7 @@ mod tests {
let reader = builder.build();
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
tokio::spawn(process_sse(stream, tx));
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -444,7 +463,10 @@ mod tests {
/// Builds an in-memory SSE stream from JSON fixtures and returns only the
/// successfully parsed events (panics on internal channel errors).
async fn run_sse(events: Vec<serde_json::Value>) -> Vec<ResponseEvent> {
async fn run_sse(
events: Vec<serde_json::Value>,
provider: ModelProviderInfo,
) -> Vec<ResponseEvent> {
let mut body = String::new();
for e in events {
let kind = e
@@ -460,7 +482,7 @@ mod tests {
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx));
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
let mut out = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -505,7 +527,25 @@ mod tests {
let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n");
let sse3 = format!("event: response.completed\ndata: {completed}\n\n");
let events = collect_events(&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()]).await;
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: "https://test.com".to_string(),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
};
let events = collect_events(
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
provider,
)
.await;
assert_eq!(events.len(), 3);
@@ -546,8 +586,21 @@ mod tests {
.to_string();
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: "https://test.com".to_string(),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
};
let events = collect_events(&[sse1.as_bytes()]).await;
let events = collect_events(&[sse1.as_bytes()], provider).await;
assert_eq!(events.len(), 2);
@@ -635,7 +688,21 @@ mod tests {
let mut evs = vec![case.event];
evs.push(completed.clone());
let out = run_sse(evs).await;
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: "https://test.com".to_string(),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
};
let out = run_sse(evs, provider).await;
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
assert!(
(case.expect_first)(&out[0]),

View File

@@ -49,9 +49,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::process_exec_tool_call;
use crate::exec_env::create_env;
use crate::flags::OPENAI_STREAM_MAX_RETRIES;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::try_parse_fully_qualified_tool_name;
use crate::mcp_tool_call::handle_mcp_tool_call;
use crate::models::ContentItem;
use crate::models::FunctionCallOutputPayload;
@@ -992,6 +990,52 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
sess.tx_event.send(event).await.ok();
}
// ---
// Helpers --------------------------------------------------------------------
//
// When a turn is interrupted before Codex can deliver tool output(s) back to
// the model, the next request can fail with a 400 from the OpenAI API:
// {"error": {"message": "No tool output found for function call call_XXXXX", ...}}
// Historically this manifested as a confusing retry loop ("stream error: 400 …")
// because we never learned about the missing `call_id` (the stream was aborted
// before we observed the `ResponseEvent::OutputItemDone` that would have let us
// record it in `pending_call_ids`).
//
// To make interruption robust we parse the error body for the offending call id
// and add it to `pending_call_ids` so the very next retry can inject a synthetic
// `FunctionCallOutput { content: "aborted" }` and satisfy the API contract.
// -----------------------------------------------------------------------------
fn extract_missing_tool_call_id(body: &str) -> Option<String> {
// Try to parse the canonical JSON error shape first.
if let Ok(v) = serde_json::from_str::<serde_json::Value>(body) {
if let Some(msg) = v
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
{
if let Some(id) = extract_missing_tool_call_id_from_msg(msg) {
return Some(id);
}
}
}
// Fallback: scan the raw body.
extract_missing_tool_call_id_from_msg(body)
}
fn extract_missing_tool_call_id_from_msg(msg: &str) -> Option<String> {
const NEEDLE: &str = "No tool output found for function call";
let idx = msg.find(NEEDLE)?;
let rest = &msg[idx + NEEDLE.len()..];
// Find the beginning of the call id (typically starts with "call_").
let start = rest.find("call_")?;
let rest = &rest[start..];
// Capture valid id chars [A-Za-z0-9_-/]. Hyphen shows up in some IDs; be permissive.
let end = rest
.find(|c: char| !(c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '/'))
.unwrap_or(rest.len());
Some(rest[..end].to_string())
}
async fn run_turn(
sess: &Session,
sub_id: String,
@@ -1026,13 +1070,58 @@ async fn run_turn(
Ok(output) => return Ok(output),
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
Err(e) => {
if retries < *OPENAI_STREAM_MAX_RETRIES {
Err(CodexErr::UnexpectedStatus(status, body)) => {
// Detect the specific 400 "No tool output found for function call ..." error that
// occurs when a user interrupted before Codex could answer a tool call.
if status == reqwest::StatusCode::BAD_REQUEST {
if let Some(call_id) = extract_missing_tool_call_id(&body) {
{
let mut state = sess.state.lock().unwrap();
state.pending_call_ids.insert(call_id.clone());
}
// Surface a friendlier background event so users understand the recovery.
sess
.notify_background_event(
&sub_id,
format!(
"previous turn interrupted before responding to tool {call_id}; sending aborted output and retrying…",
),
)
.await;
// Immediately retry the turn without consuming a provider stream retry budget.
continue;
}
}
// Fall through to generic retry path if we could not autorecover.
let e = CodexErr::UnexpectedStatus(status, body);
// Use the configured provider-specific stream retry budget.
let max_retries = sess.client.get_provider().stream_max_retries();
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
warn!(
"stream disconnected - retrying turn ({retries}/{} in {delay:?})...",
*OPENAI_STREAM_MAX_RETRIES
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}",
),
)
.await;
tokio::time::sleep(delay).await;
} else {
return Err(e);
}
}
Err(e) => {
// Use the configured provider-specific stream retry budget.
let max_retries = sess.client.get_provider().stream_max_retries();
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
warn!(
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);
// Surface retry information to any UI/frontend so the
@@ -1041,8 +1130,7 @@ async fn run_turn(
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{} in {:?}",
*OPENAI_STREAM_MAX_RETRIES, delay
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}",
),
)
.await;
@@ -1124,7 +1212,28 @@ async fn try_run_turn(
let mut stream = sess.client.clone().stream(&prompt).await?;
let mut output = Vec::new();
while let Some(Ok(event)) = stream.next().await {
loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().await;
let Some(event) = event else {
// Channel closed without yielding a final Completed event or explicit error.
// Treat as a disconnected stream so the caller can retry.
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
));
};
let event = match event {
Ok(ev) => ev,
Err(e) => {
// Propagate the underlying stream error to the caller (run_turn), which
// will apply the configured `stream_max_retries` policy.
return Err(e);
}
};
match event {
ResponseEvent::Created => {
let mut state = sess.state.lock().unwrap();
@@ -1165,7 +1274,7 @@ async fn try_run_turn(
let mut state = sess.state.lock().unwrap();
state.previous_response_id = Some(response_id);
break;
return Ok(output);
}
ResponseEvent::OutputTextDelta(delta) => {
let event = Event {
@@ -1183,7 +1292,6 @@ async fn try_run_turn(
}
}
}
Ok(output)
}
async fn handle_response_item(
@@ -1292,7 +1400,7 @@ async fn handle_function_call(
handle_container_exec_with_params(params, sess, sub_id, call_id).await
}
_ => {
match try_parse_fully_qualified_tool_name(&name) {
match sess.mcp_connection_manager.parse_tool_name(&name) {
Some((server, tool_name)) => {
// TODO(mbolin): Determine appropriate timeout for tool call.
let timeout = None;

View File

@@ -682,6 +682,9 @@ name = "OpenAI using Chat Completions"
base_url = "https://api.openai.com/v1"
env_key = "OPENAI_API_KEY"
wire_api = "chat"
request_max_retries = 4 # retry failed HTTP requests
stream_max_retries = 10 # retry dropped SSE streams
stream_idle_timeout_ms = 300000 # 5m idle timeout
[profiles.o3]
model = "o3"
@@ -722,6 +725,9 @@ disable_response_storage = true
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(4),
stream_max_retries: Some(10),
stream_idle_timeout_ms: Some(300_000),
};
let model_provider_map = {
let mut model_provider_map = built_in_model_providers();

View File

@@ -11,14 +11,6 @@ env_flags! {
pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
value.parse().map(Duration::from_millis)
};
pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4;
pub OPENAI_STREAM_MAX_RETRIES: u64 = 10;
// We generally don't want to disconnect; this updates the timeout to be five minutes
// which matches the upstream typescript codex impl.
pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
value.parse().map(Duration::from_millis)
};
/// Fixture path for offline tests (see client.rs).
pub CODEX_RS_SSE_FIXTURE: Option<&str> = None;

View File

@@ -7,6 +7,7 @@
//! `"<server><MCP_TOOL_NAME_DELIMITER><tool>"` as the key.
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use anyhow::Context;
@@ -16,8 +17,12 @@ use codex_mcp_client::McpClient;
use mcp_types::ClientCapabilities;
use mcp_types::Implementation;
use mcp_types::Tool;
use sha1::Digest;
use sha1::Sha1;
use tokio::task::JoinSet;
use tracing::info;
use tracing::warn;
use crate::config_types::McpServerConfig;
@@ -26,7 +31,8 @@ use crate::config_types::McpServerConfig;
///
/// OpenAI requires tool names to conform to `^[a-zA-Z0-9_-]+$`, so we must
/// choose a delimiter from this character set.
const MCP_TOOL_NAME_DELIMITER: &str = "__OAI_CODEX_MCP__";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
const MAX_TOOL_NAME_LENGTH: usize = 64;
/// Timeout for the `tools/list` request.
const LIST_TOOLS_TIMEOUT: Duration = Duration::from_secs(10);
@@ -35,16 +41,42 @@ const LIST_TOOLS_TIMEOUT: Duration = Duration::from_secs(10);
/// spawned successfully.
pub type ClientStartErrors = HashMap<String, anyhow::Error>;
fn fully_qualified_tool_name(server: &str, tool: &str) -> String {
format!("{server}{MCP_TOOL_NAME_DELIMITER}{tool}")
fn qualify_tools(tools: Vec<ToolInfo>) -> HashMap<String, ToolInfo> {
let mut used_names = HashSet::new();
let mut qualified_tools = HashMap::new();
for tool in tools {
let mut qualified_name = format!(
"{}{}{}",
tool.server_name, MCP_TOOL_NAME_DELIMITER, tool.tool_name
);
if qualified_name.len() > MAX_TOOL_NAME_LENGTH {
let mut hasher = Sha1::new();
hasher.update(qualified_name.as_bytes());
let sha1 = hasher.finalize();
let sha1_str = format!("{sha1:x}");
// Truncate to make room for the hash suffix
let prefix_len = MAX_TOOL_NAME_LENGTH - sha1_str.len();
qualified_name = format!("{}{}", &qualified_name[..prefix_len], sha1_str);
}
if used_names.contains(&qualified_name) {
warn!("skipping duplicated tool {}", qualified_name);
continue;
}
used_names.insert(qualified_name.clone());
qualified_tools.insert(qualified_name, tool);
}
qualified_tools
}
pub(crate) fn try_parse_fully_qualified_tool_name(fq_name: &str) -> Option<(String, String)> {
let (server, tool) = fq_name.split_once(MCP_TOOL_NAME_DELIMITER)?;
if server.is_empty() || tool.is_empty() {
return None;
}
Some((server.to_string(), tool.to_string()))
struct ToolInfo {
server_name: String,
tool_name: String,
tool: Tool,
}
/// A thin wrapper around a set of running [`McpClient`] instances.
@@ -57,7 +89,7 @@ pub(crate) struct McpConnectionManager {
clients: HashMap<String, std::sync::Arc<McpClient>>,
/// Fully qualified tool name -> tool instance.
tools: HashMap<String, Tool>,
tools: HashMap<String, ToolInfo>,
}
impl McpConnectionManager {
@@ -141,7 +173,9 @@ impl McpConnectionManager {
}
}
let tools = list_all_tools(&clients).await?;
let all_tools = list_all_tools(&clients).await?;
let tools = qualify_tools(all_tools);
Ok((Self { clients, tools }, errors))
}
@@ -149,7 +183,10 @@ impl McpConnectionManager {
/// Returns a single map that contains **all** tools. Each key is the
/// fully-qualified name for the tool.
pub fn list_all_tools(&self) -> HashMap<String, Tool> {
self.tools.clone()
self.tools
.iter()
.map(|(name, tool)| (name.clone(), tool.tool.clone()))
.collect()
}
/// Invoke the tool indicated by the (server, tool) pair.
@@ -171,13 +208,19 @@ impl McpConnectionManager {
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))
}
pub fn parse_tool_name(&self, tool_name: &str) -> Option<(String, String)> {
self.tools
.get(tool_name)
.map(|tool| (tool.server_name.clone(), tool.tool_name.clone()))
}
}
/// Query every server for its available tools and return a single map that
/// contains **all** tools. Each key is the fully-qualified name for the tool.
pub async fn list_all_tools(
async fn list_all_tools(
clients: &HashMap<String, std::sync::Arc<McpClient>>,
) -> Result<HashMap<String, Tool>> {
) -> Result<Vec<ToolInfo>> {
let mut join_set = JoinSet::new();
// Spawn one task per server so we can query them concurrently. This
@@ -194,18 +237,19 @@ pub async fn list_all_tools(
});
}
let mut aggregated: HashMap<String, Tool> = HashMap::with_capacity(join_set.len());
let mut aggregated: Vec<ToolInfo> = Vec::with_capacity(join_set.len());
while let Some(join_res) = join_set.join_next().await {
let (server_name, list_result) = join_res?;
let list_result = list_result?;
for tool in list_result.tools {
// TODO(mbolin): escape tool names that contain invalid characters.
let fq_name = fully_qualified_tool_name(&server_name, &tool.name);
if aggregated.insert(fq_name.clone(), tool).is_some() {
panic!("tool name collision for '{fq_name}': suspicious");
}
let tool_info = ToolInfo {
server_name: server_name.clone(),
tool_name: tool.name.clone(),
tool,
};
aggregated.push(tool_info);
}
}
@@ -224,3 +268,90 @@ fn is_valid_mcp_server_name(server_name: &str) -> bool {
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use mcp_types::ToolInputSchema;
fn create_test_tool(server_name: &str, tool_name: &str) -> ToolInfo {
ToolInfo {
server_name: server_name.to_string(),
tool_name: tool_name.to_string(),
tool: Tool {
annotations: None,
description: Some(format!("Test tool: {tool_name}")),
input_schema: ToolInputSchema {
properties: None,
required: None,
r#type: "object".to_string(),
},
name: tool_name.to_string(),
},
}
}
#[test]
fn test_qualify_tools_short_non_duplicated_names() {
let tools = vec![
create_test_tool("server1", "tool1"),
create_test_tool("server1", "tool2"),
];
let qualified_tools = qualify_tools(tools);
assert_eq!(qualified_tools.len(), 2);
assert!(qualified_tools.contains_key("server1__tool1"));
assert!(qualified_tools.contains_key("server1__tool2"));
}
#[test]
fn test_qualify_tools_duplicated_names_skipped() {
let tools = vec![
create_test_tool("server1", "duplicate_tool"),
create_test_tool("server1", "duplicate_tool"),
];
let qualified_tools = qualify_tools(tools);
// Only the first tool should remain, the second is skipped
assert_eq!(qualified_tools.len(), 1);
assert!(qualified_tools.contains_key("server1__duplicate_tool"));
}
#[test]
fn test_qualify_tools_long_names_same_server() {
let server_name = "my_server";
let tools = vec![
create_test_tool(
server_name,
"extremely_lengthy_function_name_that_absolutely_surpasses_all_reasonable_limits",
),
create_test_tool(
server_name,
"yet_another_extremely_lengthy_function_name_that_absolutely_surpasses_all_reasonable_limits",
),
];
let qualified_tools = qualify_tools(tools);
assert_eq!(qualified_tools.len(), 2);
let mut keys: Vec<_> = qualified_tools.keys().cloned().collect();
keys.sort();
assert_eq!(keys[0].len(), 64);
assert_eq!(
keys[0],
"my_server__extremely_lena02e507efc5a9de88637e436690364fd4219e4ef"
);
assert_eq!(keys[1].len(), 64);
assert_eq!(
keys[1],
"my_server__yet_another_e1c3987bd9c50b826cbe1687966f79f0c602d19ca"
);
}
}

View File

@@ -9,6 +9,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::env::VarError;
use std::time::Duration;
use crate::error::EnvVarError;
use crate::openai_api_key::get_openai_api_key;
@@ -16,6 +17,9 @@ use crate::openai_api_key::get_openai_api_key;
/// Value for the `OpenAI-Originator` header that is sent with requests to
/// OpenAI.
const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs";
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
const DEFAULT_STREAM_MAX_RETRIES: u64 = 10;
const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4;
/// Wire protocol that the provider speaks. Most third-party services only
/// implement the classic OpenAI Chat Completions JSON schema, whereas OpenAI
@@ -26,7 +30,7 @@ const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs";
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum WireApi {
/// The experimental Responses API exposed by OpenAI at `/v1/responses`.
/// The experimental "Responses" API exposed by OpenAI at `/v1/responses`.
Responses,
/// Regular Chat Completions compatible with `/v1/chat/completions`.
@@ -64,6 +68,16 @@ pub struct ModelProviderInfo {
/// value should be used. If the environment variable is not set, or the
/// value is empty, the header will not be included in the request.
pub env_http_headers: Option<HashMap<String, String>>,
/// Maximum number of times to retry a failed HTTP request to this provider.
pub request_max_retries: Option<u64>,
/// Number of times to retry reconnecting a dropped streaming response before failing.
pub stream_max_retries: Option<u64>,
/// Idle timeout (in milliseconds) to wait for activity on a streaming response before treating
/// the connection as lost.
pub stream_idle_timeout_ms: Option<u64>,
}
impl ModelProviderInfo {
@@ -161,6 +175,25 @@ impl ModelProviderInfo {
None => Ok(None),
}
}
/// Effective maximum number of request retries for this provider.
pub fn request_max_retries(&self) -> u64 {
self.request_max_retries
.unwrap_or(DEFAULT_REQUEST_MAX_RETRIES)
}
/// Effective maximum number of stream reconnection attempts for this provider.
pub fn stream_max_retries(&self) -> u64 {
self.stream_max_retries
.unwrap_or(DEFAULT_STREAM_MAX_RETRIES)
}
/// Effective idle timeout for streaming responses.
pub fn stream_idle_timeout(&self) -> Duration {
self.stream_idle_timeout_ms
.map(Duration::from_millis)
.unwrap_or(Duration::from_millis(DEFAULT_STREAM_IDLE_TIMEOUT_MS))
}
}
/// Built-in default provider list.
@@ -205,6 +238,10 @@ pub fn built_in_model_providers() -> HashMap<String, ModelProviderInfo> {
.into_iter()
.collect(),
),
// Use global defaults for retry/timeout unless overridden in config.toml.
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
},
),
]
@@ -234,6 +271,9 @@ base_url = "http://localhost:11434/v1"
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
@@ -259,6 +299,9 @@ query_params = { api-version = "2025-04-01-preview" }
}),
http_headers: None,
env_http_headers: None,
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
@@ -287,6 +330,9 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
env_http_headers: Some(maplit::hashmap! {
"X-Example-Env-Header".to_string() => "EXAMPLE_ENV_VAR".to_string(),
}),
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();

View File

@@ -153,13 +153,15 @@ struct LogFileInfo {
}
fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions and create it if missing.
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
fs::create_dir_all(&dir)?;
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
dir.push(format!("{:02}", u8::from(timestamp.month())));
dir.push(format!("{:02}", timestamp.day()));
fs::create_dir_all(&dir)?;
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.

View File

@@ -2,7 +2,12 @@
use assert_cmd::Command as AssertCommand;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use serde_json::Value;
use std::time::Duration;
use std::time::Instant;
use tempfile::TempDir;
use uuid::Uuid;
use walkdir::WalkDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -117,3 +122,154 @@ async fn responses_api_stream_cli() {
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(stdout.contains("fixture hello"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn integration_creates_and_checks_session_file() {
// Honor sandbox network restrictions for CI parity with the other tests.
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// 1. Temp home so we read/write isolated session files.
let home = TempDir::new().unwrap();
// 2. Unique marker we'll look for in the session log.
let marker = format!("integration-test-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
// 3. Use the same offline SSE fixture as responses_api_stream_cli so the test is hermetic.
let fixture =
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/cli_responses_fixture.sse");
// 4. Run the codex CLI through cargo (ensures the right bin is built) and invoke `exec`,
// which is what records a session.
let mut cmd = AssertCommand::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
.arg(&prompt);
cmd.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
// Required for CLI arg parsing even though fixture short-circuits network usage.
.env("OPENAI_BASE_URL", "http://unused.local");
let output = cmd.output().unwrap();
assert!(
output.status.success(),
"codex-cli exec failed: {}",
String::from_utf8_lossy(&output.stderr)
);
// 5. Sessions are written asynchronously; wait briefly for the directory to appear.
let sessions_dir = home.path().join("sessions");
let start = Instant::now();
while !sessions_dir.exists() && start.elapsed() < Duration::from_secs(3) {
std::thread::sleep(Duration::from_millis(50));
}
// 6. Scan all session files and find the one that contains our marker.
let mut matching_files = vec![];
for entry in WalkDir::new(&sessions_dir) {
let entry = entry.unwrap();
if entry.file_type().is_file() && entry.file_name().to_string_lossy().ends_with(".jsonl") {
let path = entry.path();
let content = std::fs::read_to_string(path).unwrap();
let mut lines = content.lines();
// Skip SessionMeta (first line)
let _ = lines.next();
for line in lines {
let item: Value = serde_json::from_str(line).unwrap();
if let Some("message") = item.get("type").and_then(|t| t.as_str()) {
if let Some(content) = item.get("content") {
if content.to_string().contains(&marker) {
matching_files.push(path.to_owned());
break;
}
}
}
}
}
}
assert_eq!(
matching_files.len(),
1,
"Expected exactly one session file containing the marker, found {}",
matching_files.len()
);
let path = &matching_files[0];
// 7. Verify directory structure: sessions/YYYY/MM/DD/filename.jsonl
let rel = match path.strip_prefix(&sessions_dir) {
Ok(r) => r,
Err(_) => panic!("session file should live under sessions/"),
};
let comps: Vec<String> = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect();
assert_eq!(
comps.len(),
4,
"Expected sessions/YYYY/MM/DD/<file>, got {rel:?}"
);
let year = &comps[0];
let month = &comps[1];
let day = &comps[2];
assert!(
year.len() == 4 && year.chars().all(|c| c.is_ascii_digit()),
"Year dir not 4-digit numeric: {year}"
);
assert!(
month.len() == 2 && month.chars().all(|c| c.is_ascii_digit()),
"Month dir not zero-padded 2-digit numeric: {month}"
);
assert!(
day.len() == 2 && day.chars().all(|c| c.is_ascii_digit()),
"Day dir not zero-padded 2-digit numeric: {day}"
);
// Range checks (best-effort; won't fail on leading zeros)
if let Ok(m) = month.parse::<u8>() {
assert!((1..=12).contains(&m), "Month out of range: {m}");
}
if let Ok(d) = day.parse::<u8>() {
assert!((1..=31).contains(&d), "Day out of range: {d}");
}
// 8. Parse SessionMeta line and basic sanity checks.
let content = std::fs::read_to_string(path).unwrap();
let mut lines = content.lines();
let meta: Value = serde_json::from_str(lines.next().unwrap()).unwrap();
assert!(meta.get("id").is_some(), "SessionMeta missing id");
assert!(
meta.get("timestamp").is_some(),
"SessionMeta missing timestamp"
);
// 9. Confirm at least one message contains the marker.
let mut found_message = false;
for line in lines {
let item: Value = serde_json::from_str(line).unwrap();
if item.get("type").map(|t| t == "message").unwrap_or(false) {
if let Some(content) = item.get("content") {
if content.to_string().contains(&marker) {
found_message = true;
break;
}
}
}
}
assert!(
found_message,
"No message found in session file containing the marker"
);
}

View File

@@ -45,22 +45,10 @@ async fn spawn_codex() -> Result<Codex, CodexErr> {
"OPENAI_API_KEY must be set for live tests"
);
// Environment tweaks to keep the tests snappy and inexpensive while still
// exercising retry/robustness logic.
//
// NOTE: Starting with the 2024 edition `std::env::set_var` is `unsafe`
// because changing the process environment races with any other threads
// that might be performing environment look-ups at the same time.
// Restrict the unsafety to this tiny block that happens at the very
// beginning of the test, before we spawn any background tasks that could
// observe the environment.
unsafe {
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "2");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2");
}
let codex_home = TempDir::new().unwrap();
let config = load_default_config_for_test(&codex_home);
let mut config = load_default_config_for_test(&codex_home);
config.model_provider.request_max_retries = Some(2);
config.model_provider.stream_max_retries = Some(2);
let (agent, _init_id) = Codex::spawn(config, std::sync::Arc::new(Notify::new())).await?;
Ok(agent)
@@ -79,7 +67,7 @@ async fn live_streaming_and_prev_id_reset() {
let codex = spawn_codex().await.unwrap();
// ---------- Task 1 ----------
// ---------- Task 1 ----------
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
@@ -113,7 +101,7 @@ async fn live_streaming_and_prev_id_reset() {
"Agent did not stream any AgentMessage before TaskComplete"
);
// ---------- Task 2 (same session) ----------
// ---------- Task 2 (same session) ----------
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {

View File

@@ -88,13 +88,8 @@ async fn keeps_previous_response_id_between_tasks() {
.mount(&server)
.await;
// Environment
// Update environment `set_var` is `unsafe` starting with the 2024
// edition so we group the calls into a single `unsafe { … }` block.
unsafe {
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0");
}
// Configure retry behavior explicitly to avoid mutating process-wide
// environment variables.
let model_provider = ModelProviderInfo {
name: "openai".into(),
base_url: format!("{}/v1", server.uri()),
@@ -107,6 +102,10 @@ async fn keeps_previous_response_id_between_tasks() {
query_params: None,
http_headers: None,
env_http_headers: None,
// disable retries so we don't get duplicate calls in this test
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: None,
};
// Init session

View File

@@ -32,8 +32,6 @@ fn sse_completed(id: &str) -> String {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// this test is flaky (has race conditions), so we ignore it for now
#[ignore]
async fn retries_on_early_close() {
#![allow(clippy::unwrap_used)]
@@ -72,19 +70,8 @@ async fn retries_on_early_close() {
.mount(&server)
.await;
// Environment
//
// As of Rust 2024 `std::env::set_var` has been made `unsafe` because
// mutating the process environment is inherently racy when other threads
// are running. We therefore have to wrap every call in an explicit
// `unsafe` block. These are limited to the test-setup section so the
// scope is very small and clearly delineated.
unsafe {
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1");
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
}
// Configure retry behavior explicitly to avoid mutating process-wide
// environment variables.
let model_provider = ModelProviderInfo {
name: "openai".into(),
@@ -98,6 +85,10 @@ async fn retries_on_early_close() {
query_params: None,
http_headers: None,
env_http_headers: None,
// exercise retry path: first attempt yields incomplete stream, so allow 1 retry
request_max_retries: Some(0),
stream_max_retries: Some(1),
stream_idle_timeout_ms: Some(2000),
};
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());

View File

@@ -51,6 +51,10 @@ pub struct Cli {
#[arg(long = "color", value_enum, default_value_t = Color::Auto)]
pub color: Color,
/// Print events to stdout as JSONL.
#[arg(long = "json", default_value_t = false)]
pub json: bool,
/// Specifies file where the last message from the agent should be written.
#[arg(long = "output-last-message")]
pub last_message_file: Option<PathBuf>,

View File

@@ -1,539 +1,37 @@
use codex_common::elapsed::format_elapsed;
use codex_common::summarize_sandbox_policy;
use codex_core::WireApi;
use codex_core::config::Config;
use codex_core::model_supports_reasoning_summaries;
use codex_core::protocol::AgentMessageDeltaEvent;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningDeltaEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandBeginEvent;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TokenUsage;
use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
use std::io::Write;
use std::time::Instant;
/// This should be configurable. When used in CI, users may not want to impose
/// a limit so they can see the full transcript.
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
pub(crate) trait EventProcessor {
/// Print summary of effective configuration and user prompt.
fn print_config_summary(&mut self, config: &Config, prompt: &str);
pub(crate) struct EventProcessor {
call_id_to_command: HashMap<String, ExecCommandBegin>,
call_id_to_patch: HashMap<String, PatchApplyBegin>,
/// Tracks in-flight MCP tool calls so we can calculate duration and print
/// a concise summary when the corresponding `McpToolCallEnd` event is
/// received.
call_id_to_tool_call: HashMap<String, McpToolCallBegin>,
// To ensure that --color=never is respected, ANSI escapes _must_ be added
// using .style() with one of these fields. If you need a new style, add a
// new field here.
bold: Style,
italic: Style,
dimmed: Style,
magenta: Style,
red: Style,
green: Style,
cyan: Style,
/// Whether to include `AgentReasoning` events in the output.
show_agent_reasoning: bool,
answer_started: bool,
reasoning_started: bool,
/// Handle a single event emitted by the agent.
fn process_event(&mut self, event: Event);
}
impl EventProcessor {
pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self {
let call_id_to_command = HashMap::new();
let call_id_to_patch = HashMap::new();
let call_id_to_tool_call = HashMap::new();
if with_ansi {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new().bold(),
italic: Style::new().italic(),
dimmed: Style::new().dimmed(),
magenta: Style::new().magenta(),
red: Style::new().red(),
green: Style::new().green(),
cyan: Style::new().cyan(),
call_id_to_tool_call,
show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false,
reasoning_started: false,
}
} else {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new(),
italic: Style::new(),
dimmed: Style::new(),
magenta: Style::new(),
red: Style::new(),
green: Style::new(),
cyan: Style::new(),
call_id_to_tool_call,
show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false,
reasoning_started: false,
}
}
}
}
struct ExecCommandBegin {
command: Vec<String>,
start_time: Instant,
}
/// Metadata captured when an `McpToolCallBegin` event is received.
struct McpToolCallBegin {
/// Formatted invocation string, e.g. `server.tool({"city":"sf"})`.
invocation: String,
/// Timestamp when the call started so we can compute duration later.
start_time: Instant,
}
struct PatchApplyBegin {
start_time: Instant,
auto_approved: bool,
}
// Timestamped println helper. The timestamp is styled with self.dimmed.
#[macro_export]
macro_rules! ts_println {
($self:ident, $($arg:tt)*) => {{
let now = chrono::Utc::now();
let formatted = now.format("[%Y-%m-%dT%H:%M:%S]");
print!("{} ", formatted.style($self.dimmed));
println!($($arg)*);
}};
}
impl EventProcessor {
/// Print a concise summary of the effective configuration that will be used
/// for the session. This mirrors the information shown in the TUI welcome
/// screen.
pub(crate) fn print_config_summary(&mut self, config: &Config, prompt: &str) {
const VERSION: &str = env!("CARGO_PKG_VERSION");
ts_println!(
self,
"OpenAI Codex v{} (research preview)\n--------",
VERSION
);
let mut entries = vec![
("workdir", config.cwd.display().to_string()),
("model", config.model.clone()),
("provider", config.model_provider_id.clone()),
("approval", format!("{:?}", config.approval_policy)),
("sandbox", summarize_sandbox_policy(&config.sandbox_policy)),
];
if config.model_provider.wire_api == WireApi::Responses
&& model_supports_reasoning_summaries(config)
{
entries.push((
"reasoning effort",
config.model_reasoning_effort.to_string(),
));
entries.push((
"reasoning summaries",
config.model_reasoning_summary.to_string(),
));
}
for (key, value) in entries {
println!("{} {}", format!("{key}:").style(self.bold), value);
}
println!("--------");
// Echo the prompt that will be sent to the agent so it is visible in the
// transcript/logs before any events come in. Note the prompt may have been
// read from stdin, so it may not be visible in the terminal otherwise.
ts_println!(
self,
"{}\n{}",
"User instructions:".style(self.bold).style(self.cyan),
prompt
);
pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static str, String)> {
let mut entries = vec![
("workdir", config.cwd.display().to_string()),
("model", config.model.clone()),
("provider", config.model_provider_id.clone()),
("approval", format!("{:?}", config.approval_policy)),
("sandbox", summarize_sandbox_policy(&config.sandbox_policy)),
];
if config.model_provider.wire_api == WireApi::Responses
&& model_supports_reasoning_summaries(config)
{
entries.push((
"reasoning effort",
config.model_reasoning_effort.to_string(),
));
entries.push((
"reasoning summaries",
config.model_reasoning_summary.to_string(),
));
}
pub(crate) fn process_event(&mut self, event: Event) {
let Event { id: _, msg } = event;
match msg {
EventMsg::Error(ErrorEvent { message }) => {
let prefix = "ERROR:".style(self.red);
ts_println!(self, "{prefix} {message}");
}
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
// Ignore.
}
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}");
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if !self.answer_started {
ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta));
self.answer_started = true;
}
print!("{delta}");
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if !self.show_agent_reasoning {
return;
}
if !self.reasoning_started {
ts_println!(
self,
"{}\n",
"thinking".style(self.italic).style(self.magenta),
);
self.reasoning_started = true;
}
print!("{delta}");
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// if answer_started is false, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if !self.answer_started {
ts_println!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
message,
);
} else {
println!();
self.answer_started = false;
}
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command,
cwd,
}) => {
self.call_id_to_command.insert(
call_id.clone(),
ExecCommandBegin {
command: command.clone(),
start_time: Instant::now(),
},
);
ts_println!(
self,
"{} {} in {}",
"exec".style(self.magenta),
escape_command(&command).style(self.bold),
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout,
stderr,
exit_code,
}) => {
let exec_command = self.call_id_to_command.remove(&call_id);
let (duration, call) = if let Some(ExecCommandBegin {
command,
start_time,
}) = exec_command
{
(
format!(" in {}", format_elapsed(start_time)),
format!("{}", escape_command(&command).style(self.bold)),
)
} else {
("".to_string(), format!("exec('{call_id}')"))
};
let output = if exit_code == 0 { stdout } else { stderr };
let truncated_output = output
.lines()
.take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL)
.collect::<Vec<_>>()
.join("\n");
match exit_code {
0 => {
let title = format!("{call} succeeded{duration}:");
ts_println!(self, "{}", title.style(self.green));
}
_ => {
let title = format!("{call} exited {exit_code}{duration}:");
ts_println!(self, "{}", title.style(self.red));
}
}
println!("{}", truncated_output.style(self.dimmed));
}
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id,
server,
tool,
arguments,
}) => {
// Build fully-qualified tool name: server.tool
let fq_tool_name = format!("{server}.{tool}");
// Format arguments as compact JSON so they fit on one line.
let args_str = arguments
.as_ref()
.map(|v: &serde_json::Value| {
serde_json::to_string(v).unwrap_or_else(|_| v.to_string())
})
.unwrap_or_default();
let invocation = if args_str.is_empty() {
format!("{fq_tool_name}()")
} else {
format!("{fq_tool_name}({args_str})")
};
self.call_id_to_tool_call.insert(
call_id.clone(),
McpToolCallBegin {
invocation: invocation.clone(),
start_time: Instant::now(),
},
);
ts_println!(
self,
"{} {}",
"tool".style(self.magenta),
invocation.style(self.bold),
);
}
EventMsg::McpToolCallEnd(tool_call_end_event) => {
let is_success = tool_call_end_event.is_success();
let McpToolCallEndEvent { call_id, result } = tool_call_end_event;
// Retrieve start time and invocation for duration calculation and labeling.
let info = self.call_id_to_tool_call.remove(&call_id);
let (duration, invocation) = if let Some(McpToolCallBegin {
invocation,
start_time,
..
}) = info
{
(format!(" in {}", format_elapsed(start_time)), invocation)
} else {
(String::new(), format!("tool('{call_id}')"))
};
let status_str = if is_success { "success" } else { "failed" };
let title_style = if is_success { self.green } else { self.red };
let title = format!("{invocation} {status_str}{duration}:");
ts_println!(self, "{}", title.style(title_style));
if let Ok(res) = result {
let val: serde_json::Value = res.into();
let pretty =
serde_json::to_string_pretty(&val).unwrap_or_else(|_| val.to_string());
for line in pretty.lines().take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL) {
println!("{}", line.style(self.dimmed));
}
}
}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id,
auto_approved,
changes,
}) => {
// Store metadata so we can calculate duration later when we
// receive the corresponding PatchApplyEnd event.
self.call_id_to_patch.insert(
call_id.clone(),
PatchApplyBegin {
start_time: Instant::now(),
auto_approved,
},
);
ts_println!(
self,
"{} auto_approved={}:",
"apply_patch".style(self.magenta),
auto_approved,
);
// Pretty-print the patch summary with colored diff markers so
// it's easy to scan in the terminal output.
for (path, change) in changes.iter() {
match change {
FileChange::Add { content } => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
println!("{}", header.style(self.magenta));
for line in content.lines() {
println!("{}", line.style(self.green));
}
}
FileChange::Delete => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
println!("{}", header.style(self.magenta));
}
FileChange::Update {
unified_diff,
move_path,
} => {
let header = if let Some(dest) = move_path {
format!(
"{} {} -> {}",
format_file_change(change),
path.to_string_lossy(),
dest.to_string_lossy()
)
} else {
format!("{} {}", format_file_change(change), path.to_string_lossy())
};
println!("{}", header.style(self.magenta));
// Colorize diff lines. We keep file header lines
// (--- / +++) without extra coloring so they are
// still readable.
for diff_line in unified_diff.lines() {
if diff_line.starts_with('+') && !diff_line.starts_with("+++") {
println!("{}", diff_line.style(self.green));
} else if diff_line.starts_with('-')
&& !diff_line.starts_with("---")
{
println!("{}", diff_line.style(self.red));
} else {
println!("{diff_line}");
}
}
}
}
}
}
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id,
stdout,
stderr,
success,
}) => {
let patch_begin = self.call_id_to_patch.remove(&call_id);
// Compute duration and summary label similar to exec commands.
let (duration, label) = if let Some(PatchApplyBegin {
start_time,
auto_approved,
}) = patch_begin
{
(
format!(" in {}", format_elapsed(start_time)),
format!("apply_patch(auto_approved={auto_approved})"),
)
} else {
(String::new(), format!("apply_patch('{call_id}')"))
};
let (exit_code, output, title_style) = if success {
(0, stdout, self.green)
} else {
(1, stderr, self.red)
};
let title = format!("{label} exited {exit_code}{duration}:");
ts_println!(self, "{}", title.style(title_style));
for line in output.lines() {
println!("{}", line.style(self.dimmed));
}
}
EventMsg::ExecApprovalRequest(_) => {
// Should we exit?
}
EventMsg::ApplyPatchApprovalRequest(_) => {
// Should we exit?
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {
if !self.reasoning_started {
ts_println!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
agent_reasoning_event.text,
);
} else {
println!();
self.reasoning_started = false;
}
}
}
EventMsg::SessionConfigured(session_configured_event) => {
let SessionConfiguredEvent {
session_id,
model,
history_log_id: _,
history_entry_count: _,
} = session_configured_event;
ts_println!(
self,
"{} {}",
"codex session".style(self.magenta).style(self.bold),
session_id.to_string().style(self.dimmed)
);
ts_println!(self, "model: {}", model);
println!();
}
EventMsg::GetHistoryEntryResponse(_) => {
// Currently ignored in exec output.
}
}
}
}
fn escape_command(command: &[String]) -> String {
try_join(command.iter().map(|s| s.as_str())).unwrap_or_else(|_| command.join(" "))
}
fn format_file_change(change: &FileChange) -> &'static str {
match change {
FileChange::Add { .. } => "A",
FileChange::Delete => "D",
FileChange::Update {
move_path: Some(_), ..
} => "R",
FileChange::Update {
move_path: None, ..
} => "M",
}
entries
}

View File

@@ -0,0 +1,520 @@
use codex_common::elapsed::format_elapsed;
use codex_core::config::Config;
use codex_core::protocol::AgentMessageDeltaEvent;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningDeltaEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandBeginEvent;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TokenUsage;
use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
use std::io::Write;
use std::time::Instant;
use crate::event_processor::EventProcessor;
use crate::event_processor::create_config_summary_entries;
/// This should be configurable. When used in CI, users may not want to impose
/// a limit so they can see the full transcript.
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
pub(crate) struct EventProcessorWithHumanOutput {
call_id_to_command: HashMap<String, ExecCommandBegin>,
call_id_to_patch: HashMap<String, PatchApplyBegin>,
/// Tracks in-flight MCP tool calls so we can calculate duration and print
/// a concise summary when the corresponding `McpToolCallEnd` event is
/// received.
call_id_to_tool_call: HashMap<String, McpToolCallBegin>,
// To ensure that --color=never is respected, ANSI escapes _must_ be added
// using .style() with one of these fields. If you need a new style, add a
// new field here.
bold: Style,
italic: Style,
dimmed: Style,
magenta: Style,
red: Style,
green: Style,
cyan: Style,
/// Whether to include `AgentReasoning` events in the output.
show_agent_reasoning: bool,
answer_started: bool,
reasoning_started: bool,
}
impl EventProcessorWithHumanOutput {
pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self {
let call_id_to_command = HashMap::new();
let call_id_to_patch = HashMap::new();
let call_id_to_tool_call = HashMap::new();
if with_ansi {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new().bold(),
italic: Style::new().italic(),
dimmed: Style::new().dimmed(),
magenta: Style::new().magenta(),
red: Style::new().red(),
green: Style::new().green(),
cyan: Style::new().cyan(),
call_id_to_tool_call,
show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false,
reasoning_started: false,
}
} else {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new(),
italic: Style::new(),
dimmed: Style::new(),
magenta: Style::new(),
red: Style::new(),
green: Style::new(),
cyan: Style::new(),
call_id_to_tool_call,
show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false,
reasoning_started: false,
}
}
}
}
struct ExecCommandBegin {
command: Vec<String>,
start_time: Instant,
}
/// Metadata captured when an `McpToolCallBegin` event is received.
struct McpToolCallBegin {
/// Formatted invocation string, e.g. `server.tool({"city":"sf"})`.
invocation: String,
/// Timestamp when the call started so we can compute duration later.
start_time: Instant,
}
struct PatchApplyBegin {
start_time: Instant,
auto_approved: bool,
}
// Timestamped println helper. The timestamp is styled with self.dimmed.
#[macro_export]
macro_rules! ts_println {
($self:ident, $($arg:tt)*) => {{
let now = chrono::Utc::now();
let formatted = now.format("[%Y-%m-%dT%H:%M:%S]");
print!("{} ", formatted.style($self.dimmed));
println!($($arg)*);
}};
}
impl EventProcessor for EventProcessorWithHumanOutput {
/// Print a concise summary of the effective configuration that will be used
/// for the session. This mirrors the information shown in the TUI welcome
/// screen.
fn print_config_summary(&mut self, config: &Config, prompt: &str) {
const VERSION: &str = env!("CARGO_PKG_VERSION");
ts_println!(
self,
"OpenAI Codex v{} (research preview)\n--------",
VERSION
);
let entries = create_config_summary_entries(config);
for (key, value) in entries {
println!("{} {}", format!("{key}:").style(self.bold), value);
}
println!("--------");
// Echo the prompt that will be sent to the agent so it is visible in the
// transcript/logs before any events come in. Note the prompt may have been
// read from stdin, so it may not be visible in the terminal otherwise.
ts_println!(
self,
"{}\n{}",
"User instructions:".style(self.bold).style(self.cyan),
prompt
);
}
fn process_event(&mut self, event: Event) {
let Event { id: _, msg } = event;
match msg {
EventMsg::Error(ErrorEvent { message }) => {
let prefix = "ERROR:".style(self.red);
ts_println!(self, "{prefix} {message}");
}
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
// Ignore.
}
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}");
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if !self.answer_started {
ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta));
self.answer_started = true;
}
print!("{delta}");
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if !self.show_agent_reasoning {
return;
}
if !self.reasoning_started {
ts_println!(
self,
"{}\n",
"thinking".style(self.italic).style(self.magenta),
);
self.reasoning_started = true;
}
print!("{delta}");
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// if answer_started is false, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if !self.answer_started {
ts_println!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
message,
);
} else {
println!();
self.answer_started = false;
}
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command,
cwd,
}) => {
self.call_id_to_command.insert(
call_id.clone(),
ExecCommandBegin {
command: command.clone(),
start_time: Instant::now(),
},
);
ts_println!(
self,
"{} {} in {}",
"exec".style(self.magenta),
escape_command(&command).style(self.bold),
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout,
stderr,
exit_code,
}) => {
let exec_command = self.call_id_to_command.remove(&call_id);
let (duration, call) = if let Some(ExecCommandBegin {
command,
start_time,
}) = exec_command
{
(
format!(" in {}", format_elapsed(start_time)),
format!("{}", escape_command(&command).style(self.bold)),
)
} else {
("".to_string(), format!("exec('{call_id}')"))
};
let output = if exit_code == 0 { stdout } else { stderr };
let truncated_output = output
.lines()
.take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL)
.collect::<Vec<_>>()
.join("\n");
match exit_code {
0 => {
let title = format!("{call} succeeded{duration}:");
ts_println!(self, "{}", title.style(self.green));
}
_ => {
let title = format!("{call} exited {exit_code}{duration}:");
ts_println!(self, "{}", title.style(self.red));
}
}
println!("{}", truncated_output.style(self.dimmed));
}
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id,
server,
tool,
arguments,
}) => {
// Build fully-qualified tool name: server.tool
let fq_tool_name = format!("{server}.{tool}");
// Format arguments as compact JSON so they fit on one line.
let args_str = arguments
.as_ref()
.map(|v: &serde_json::Value| {
serde_json::to_string(v).unwrap_or_else(|_| v.to_string())
})
.unwrap_or_default();
let invocation = if args_str.is_empty() {
format!("{fq_tool_name}()")
} else {
format!("{fq_tool_name}({args_str})")
};
self.call_id_to_tool_call.insert(
call_id.clone(),
McpToolCallBegin {
invocation: invocation.clone(),
start_time: Instant::now(),
},
);
ts_println!(
self,
"{} {}",
"tool".style(self.magenta),
invocation.style(self.bold),
);
}
EventMsg::McpToolCallEnd(tool_call_end_event) => {
let is_success = tool_call_end_event.is_success();
let McpToolCallEndEvent { call_id, result } = tool_call_end_event;
// Retrieve start time and invocation for duration calculation and labeling.
let info = self.call_id_to_tool_call.remove(&call_id);
let (duration, invocation) = if let Some(McpToolCallBegin {
invocation,
start_time,
..
}) = info
{
(format!(" in {}", format_elapsed(start_time)), invocation)
} else {
(String::new(), format!("tool('{call_id}')"))
};
let status_str = if is_success { "success" } else { "failed" };
let title_style = if is_success { self.green } else { self.red };
let title = format!("{invocation} {status_str}{duration}:");
ts_println!(self, "{}", title.style(title_style));
if let Ok(res) = result {
let val: serde_json::Value = res.into();
let pretty =
serde_json::to_string_pretty(&val).unwrap_or_else(|_| val.to_string());
for line in pretty.lines().take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL) {
println!("{}", line.style(self.dimmed));
}
}
}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id,
auto_approved,
changes,
}) => {
// Store metadata so we can calculate duration later when we
// receive the corresponding PatchApplyEnd event.
self.call_id_to_patch.insert(
call_id.clone(),
PatchApplyBegin {
start_time: Instant::now(),
auto_approved,
},
);
ts_println!(
self,
"{} auto_approved={}:",
"apply_patch".style(self.magenta),
auto_approved,
);
// Pretty-print the patch summary with colored diff markers so
// it's easy to scan in the terminal output.
for (path, change) in changes.iter() {
match change {
FileChange::Add { content } => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
println!("{}", header.style(self.magenta));
for line in content.lines() {
println!("{}", line.style(self.green));
}
}
FileChange::Delete => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
println!("{}", header.style(self.magenta));
}
FileChange::Update {
unified_diff,
move_path,
} => {
let header = if let Some(dest) = move_path {
format!(
"{} {} -> {}",
format_file_change(change),
path.to_string_lossy(),
dest.to_string_lossy()
)
} else {
format!("{} {}", format_file_change(change), path.to_string_lossy())
};
println!("{}", header.style(self.magenta));
// Colorize diff lines. We keep file header lines
// (--- / +++) without extra coloring so they are
// still readable.
for diff_line in unified_diff.lines() {
if diff_line.starts_with('+') && !diff_line.starts_with("+++") {
println!("{}", diff_line.style(self.green));
} else if diff_line.starts_with('-')
&& !diff_line.starts_with("---")
{
println!("{}", diff_line.style(self.red));
} else {
println!("{diff_line}");
}
}
}
}
}
}
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id,
stdout,
stderr,
success,
}) => {
let patch_begin = self.call_id_to_patch.remove(&call_id);
// Compute duration and summary label similar to exec commands.
let (duration, label) = if let Some(PatchApplyBegin {
start_time,
auto_approved,
}) = patch_begin
{
(
format!(" in {}", format_elapsed(start_time)),
format!("apply_patch(auto_approved={auto_approved})"),
)
} else {
(String::new(), format!("apply_patch('{call_id}')"))
};
let (exit_code, output, title_style) = if success {
(0, stdout, self.green)
} else {
(1, stderr, self.red)
};
let title = format!("{label} exited {exit_code}{duration}:");
ts_println!(self, "{}", title.style(title_style));
for line in output.lines() {
println!("{}", line.style(self.dimmed));
}
}
EventMsg::ExecApprovalRequest(_) => {
// Should we exit?
}
EventMsg::ApplyPatchApprovalRequest(_) => {
// Should we exit?
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {
if !self.reasoning_started {
ts_println!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
agent_reasoning_event.text,
);
} else {
println!();
self.reasoning_started = false;
}
}
}
EventMsg::SessionConfigured(session_configured_event) => {
let SessionConfiguredEvent {
session_id,
model,
history_log_id: _,
history_entry_count: _,
} = session_configured_event;
ts_println!(
self,
"{} {}",
"codex session".style(self.magenta).style(self.bold),
session_id.to_string().style(self.dimmed)
);
ts_println!(self, "model: {}", model);
println!();
}
EventMsg::GetHistoryEntryResponse(_) => {
// Currently ignored in exec output.
}
}
}
}
fn escape_command(command: &[String]) -> String {
try_join(command.iter().map(|s| s.as_str())).unwrap_or_else(|_| command.join(" "))
}
fn format_file_change(change: &FileChange) -> &'static str {
match change {
FileChange::Add { .. } => "A",
FileChange::Delete => "D",
FileChange::Update {
move_path: Some(_), ..
} => "R",
FileChange::Update {
move_path: None, ..
} => "M",
}
}

View File

@@ -0,0 +1,48 @@
use std::collections::HashMap;
use codex_core::config::Config;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use serde_json::json;
use crate::event_processor::EventProcessor;
use crate::event_processor::create_config_summary_entries;
pub(crate) struct EventProcessorWithJsonOutput;
impl EventProcessorWithJsonOutput {
pub fn new() -> Self {
Self {}
}
}
impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, config: &Config, prompt: &str) {
let entries = create_config_summary_entries(config)
.into_iter()
.map(|(key, value)| (key.to_string(), value))
.collect::<HashMap<String, String>>();
#[allow(clippy::expect_used)]
let config_json =
serde_json::to_string(&entries).expect("Failed to serialize config summary to JSON");
println!("{config_json}");
let prompt_json = json!({
"prompt": prompt,
});
println!("{prompt_json}");
}
fn process_event(&mut self, event: Event) {
match event.msg {
EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) => {
// Suppress streaming events in JSON mode.
}
_ => {
if let Ok(line) = serde_json::to_string(&event) {
println!("{line}");
}
}
}
}
}

View File

@@ -1,5 +1,7 @@
mod cli;
mod event_processor;
mod event_processor_with_human_output;
mod event_processor_with_json_output;
use std::io::IsTerminal;
use std::io::Read;
@@ -19,12 +21,15 @@ use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::util::is_inside_git_repo;
use event_processor::EventProcessor;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_json_output::EventProcessorWithJsonOutput;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use crate::event_processor::EventProcessor;
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
let Cli {
images,
@@ -36,6 +41,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
skip_git_repo_check,
color,
last_message_file,
json: json_mode,
sandbox_mode: sandbox_mode_cli_arg,
prompt,
config_overrides,
@@ -115,7 +121,15 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
};
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
let mut event_processor = EventProcessor::create_with_ansi(stdout_with_ansi, &config);
let mut event_processor: Box<dyn EventProcessor> = if json_mode {
Box::new(EventProcessorWithJsonOutput::new())
} else {
Box::new(EventProcessorWithHumanOutput::create_with_ansi(
stdout_with_ansi,
&config,
))
};
// Print the effective configuration and prompt so users can see what Codex
// is using.
event_processor.print_config_summary(&config, &prompt);

View File

@@ -23,3 +23,10 @@ file-search *args:
# format code
fmt:
cargo fmt -- --config imports_granularity=Item
fix:
cargo clippy --fix --all-features --tests --allow-dirty
install:
rustup show active-toolchain
cargo fetch

View File

@@ -0,0 +1,3 @@
[toolchain]
channel = "1.88.0"
components = [ "clippy", "rustfmt", "rust-src"]

View File

@@ -18,8 +18,15 @@ use crossterm::event::KeyEvent;
use crossterm::event::MouseEvent;
use crossterm::event::MouseEventKind;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
/// Time window for debouncing redraw requests.
const REDRAW_DEBOUNCE: Duration = Duration::from_millis(10);
/// Top-level application state: which full-screen view is currently active.
#[allow(clippy::large_enum_variant)]
@@ -46,6 +53,9 @@ pub(crate) struct App<'a> {
file_search: FileSearchManager,
/// True when a redraw has been scheduled but not yet executed.
pending_redraw: Arc<Mutex<bool>>,
/// Stored parameters needed to instantiate the ChatWidget later, e.g.,
/// after dismissing the Git-repo warning.
chat_args: Option<ChatWidgetArgs>,
@@ -60,7 +70,7 @@ struct ChatWidgetArgs {
initial_images: Vec<PathBuf>,
}
impl<'a> App<'a> {
impl App<'_> {
pub(crate) fn new(
config: Config,
initial_prompt: Option<String>,
@@ -70,6 +80,7 @@ impl<'a> App<'a> {
) -> Self {
let (app_event_tx, app_event_rx) = channel();
let app_event_tx = AppEventSender::new(app_event_tx);
let pending_redraw = Arc::new(Mutex::new(false));
let scroll_event_helper = ScrollEventHelper::new(app_event_tx.clone());
// Spawn a dedicated thread for reading the crossterm event loop and
@@ -83,7 +94,7 @@ impl<'a> App<'a> {
app_event_tx.send(AppEvent::KeyEvent(key_event));
}
crossterm::event::Event::Resize(_, _) => {
app_event_tx.send(AppEvent::Redraw);
app_event_tx.send(AppEvent::RequestRedraw);
}
crossterm::event::Event::Mouse(MouseEvent {
kind: MouseEventKind::ScrollUp,
@@ -152,6 +163,7 @@ impl<'a> App<'a> {
app_state,
config,
file_search,
pending_redraw,
chat_args,
}
}
@@ -162,6 +174,29 @@ impl<'a> App<'a> {
self.app_event_tx.clone()
}
/// Schedule a redraw if one is not already pending.
#[allow(clippy::unwrap_used)]
fn schedule_redraw(&self) {
{
#[allow(clippy::unwrap_used)]
let mut flag = self.pending_redraw.lock().unwrap();
if *flag {
return;
}
*flag = true;
}
let tx = self.app_event_tx.clone();
let pending_redraw = self.pending_redraw.clone();
thread::spawn(move || {
thread::sleep(REDRAW_DEBOUNCE);
tx.send(AppEvent::Redraw);
#[allow(clippy::unwrap_used)]
let mut f = pending_redraw.lock().unwrap();
*f = false;
});
}
pub(crate) fn run(
&mut self,
terminal: &mut tui::Tui,
@@ -169,10 +204,13 @@ impl<'a> App<'a> {
) -> Result<()> {
// Insert an event to trigger the first render.
let app_event_tx = self.app_event_tx.clone();
app_event_tx.send(AppEvent::Redraw);
app_event_tx.send(AppEvent::RequestRedraw);
while let Ok(event) = self.app_event_rx.recv() {
match event {
AppEvent::RequestRedraw => {
self.schedule_redraw();
}
AppEvent::Redraw => {
self.draw_next_frame(terminal)?;
}
@@ -249,7 +287,7 @@ impl<'a> App<'a> {
Vec::new(),
));
self.app_state = AppState::Chat { widget: new_widget };
self.app_event_tx.send(AppEvent::Redraw);
self.app_event_tx.send(AppEvent::RequestRedraw);
}
SlashCommand::ToggleMouseMode => {
if let Err(e) = mouse_capture.toggle() {
@@ -336,7 +374,7 @@ impl<'a> App<'a> {
args.initial_images,
));
self.app_state = AppState::Chat { widget };
self.app_event_tx.send(AppEvent::Redraw);
self.app_event_tx.send(AppEvent::RequestRedraw);
}
GitWarningOutcome::Quit => {
self.app_event_tx.send(AppEvent::ExitRequest);

View File

@@ -8,6 +8,10 @@ use crate::slash_command::SlashCommand;
pub(crate) enum AppEvent {
CodexEvent(Event),
/// Request a redraw which will be debounced by the [`App`].
RequestRedraw,
/// Actually draw the next frame.
Redraw,
KeyEvent(KeyEvent),

View File

@@ -212,7 +212,7 @@ impl BottomPane<'_> {
}
pub(crate) fn request_redraw(&self) {
self.app_event_tx.send(AppEvent::Redraw)
self.app_event_tx.send(AppEvent::RequestRedraw)
}
/// Returns true when a popup inside the composer is visible.

View File

@@ -24,7 +24,7 @@ impl StatusIndicatorView {
}
}
impl<'a> BottomPaneView<'a> for StatusIndicatorView {
impl BottomPaneView<'_> for StatusIndicatorView {
fn update_status_text(&mut self, text: String) -> ConditionalUpdate {
self.update_text(text);
ConditionalUpdate::NeedsRedraw

View File

@@ -53,6 +53,7 @@ pub(crate) struct ChatWidget<'a> {
token_usage: TokenUsage,
reasoning_buffer: String,
answer_buffer: String,
active_task_id: Option<String>,
}
#[derive(Clone, Copy, Eq, PartialEq)]
@@ -141,6 +142,7 @@ impl ChatWidget<'_> {
token_usage: TokenUsage::default(),
reasoning_buffer: String::new(),
answer_buffer: String::new(),
active_task_id: None,
}
}
@@ -222,10 +224,30 @@ impl ChatWidget<'_> {
self.conversation_history.add_user_message(text);
}
self.conversation_history.scroll_to_bottom();
// IMPORTANT: Starting a *new* user turn. Clear any partially streamed
// answer from a previous turn (e.g., one that was interrupted) so that
// the next AgentMessageDelta spawns a fresh agent message cell instead
// of overwriting the last one.
self.answer_buffer.clear();
self.reasoning_buffer.clear();
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
// Retain the event ID so we can refer to it after destructuring.
let event_id = event.id.clone();
let Event { id: _, msg } = event;
// When we are in the middle of a task (active_task_id is Some) we drop
// streaming text/reasoning events for *other* task IDs. This prevents
// late tokens from an interrupted run from bleeding into the current
// answer.
let should_drop_streaming = self
.active_task_id
.as_ref()
.map(|active| active != &event_id)
.unwrap_or(false);
match msg {
EventMsg::SessionConfigured(event) => {
// Record session information at the top of the conversation.
@@ -246,6 +268,9 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
if should_drop_streaming {
return;
}
// if the answer buffer is empty, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if self.answer_buffer.is_empty() {
@@ -259,6 +284,9 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if should_drop_streaming {
return;
}
if self.answer_buffer.is_empty() {
self.conversation_history
.add_agent_message(&self.config, "".to_string());
@@ -269,6 +297,9 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if should_drop_streaming {
return;
}
if self.reasoning_buffer.is_empty() {
self.conversation_history
.add_agent_reasoning(&self.config, "".to_string());
@@ -279,6 +310,9 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
if should_drop_streaming {
return;
}
// if the reasoning buffer is empty, this means we haven't received any
// delta. Thus, we need to print the message as a new reasoning.
if self.reasoning_buffer.is_empty() {
@@ -293,6 +327,10 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::TaskStarted => {
// New task has begun update state and clear any stale buffers.
self.active_task_id = Some(event_id);
self.answer_buffer.clear();
self.reasoning_buffer.clear();
self.bottom_pane.clear_ctrl_c_quit_hint();
self.bottom_pane.set_task_running(true);
self.request_redraw();
@@ -300,6 +338,10 @@ impl ChatWidget<'_> {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
// Task finished; clear active_task_id so that subsequent events are processed.
if self.active_task_id.as_ref() == Some(&event_id) {
self.active_task_id = None;
}
self.bottom_pane.set_task_running(false);
self.request_redraw();
}
@@ -309,16 +351,25 @@ impl ChatWidget<'_> {
.set_token_usage(self.token_usage.clone(), self.config.model_context_window);
}
EventMsg::Error(ErrorEvent { message }) => {
// Error events always get surfaced (even for stale task IDs) so that the user sees
// why a run stopped. However, only clear the running indicator if this is the
// active task.
if self.active_task_id.as_ref() == Some(&event_id) {
self.bottom_pane.set_task_running(false);
self.active_task_id = None;
}
self.conversation_history.add_error(message);
self.bottom_pane.set_task_running(false);
}
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
command,
cwd,
reason,
}) => {
if should_drop_streaming {
return;
}
let request = ApprovalRequest::Exec {
id,
id: event_id,
command,
cwd,
reason,
@@ -330,6 +381,9 @@ impl ChatWidget<'_> {
reason,
grant_root,
}) => {
if should_drop_streaming {
return;
}
// ------------------------------------------------------------------
// Before we even prompt the user for approval we surface the patch
// summary in the main conversation so that the dialog appears in a
@@ -348,7 +402,7 @@ impl ChatWidget<'_> {
// Now surface the approval request in the BottomPane as before.
let request = ApprovalRequest::ApplyPatch {
id,
id: event_id,
reason,
grant_root,
};
@@ -360,6 +414,9 @@ impl ChatWidget<'_> {
command,
cwd: _,
}) => {
if should_drop_streaming {
return;
}
self.conversation_history
.add_active_exec_command(call_id, command);
self.request_redraw();
@@ -369,6 +426,9 @@ impl ChatWidget<'_> {
auto_approved,
changes,
}) => {
if should_drop_streaming {
return;
}
// Even when a patch is autoapproved we still display the
// summary so the user can follow along.
self.conversation_history
@@ -384,6 +444,9 @@ impl ChatWidget<'_> {
stdout,
stderr,
}) => {
if should_drop_streaming {
return;
}
self.conversation_history
.record_completed_exec_command(call_id, stdout, stderr, exit_code);
self.request_redraw();
@@ -394,11 +457,17 @@ impl ChatWidget<'_> {
tool,
arguments,
}) => {
if should_drop_streaming {
return;
}
self.conversation_history
.add_active_mcp_tool_call(call_id, server, tool, arguments);
self.request_redraw();
}
EventMsg::McpToolCallEnd(mcp_tool_call_end_event) => {
if should_drop_streaming {
return;
}
let success = mcp_tool_call_end_event.is_success();
let McpToolCallEndEvent { call_id, result } = mcp_tool_call_end_event;
self.conversation_history
@@ -431,7 +500,7 @@ impl ChatWidget<'_> {
}
fn request_redraw(&mut self) {
self.app_event_tx.send(AppEvent::Redraw);
self.app_event_tx.send(AppEvent::RequestRedraw);
}
pub(crate) fn add_diff_output(&mut self, diff_output: String) {

View File

@@ -65,7 +65,7 @@ impl StatusIndicatorWidget {
std::thread::sleep(Duration::from_millis(200));
counter = counter.wrapping_add(1);
frame_idx_clone.store(counter, Ordering::Relaxed);
app_event_tx_clone.send(AppEvent::Redraw);
app_event_tx_clone.send(AppEvent::RequestRedraw);
}
});
}