Files
codex/codex-rs/core/tests/suite/unified_exec.rs
Michael Bolin c25d905f61 permissions: support workspace roots in profiles (#22610)
## Why

This is the configuration/model half of the alternative permissions
migration we discussed as a comparison point for
[#22401](https://github.com/openai/codex/pull/22401) and
[#22402](https://github.com/openai/codex/pull/22402).

The old `workspace-write` model mixes three concerns that we want to
keep separate:
- reusable profile rules that should stay immutable once selected
- user/runtime workspace roots from `cwd`, `--add-dir`, and legacy
workspace-write config
- internal Codex writable roots such as memories, which should not be
shown as user workspace roots

This PR gives permission profiles first-class `workspace_roots` so users
can opt multiple repositories into the same `:workspace_roots` rules
without using broad absolute-path write grants. It also starts
separating the raw selected profile from the effective runtime profile
by making `Permissions` expose explicit accessors instead of public
mutable fields.

A representative `config.toml` looks like this:

```toml
default_permissions = "dev"

[permissions.dev.workspace_roots]
"~/code/openai" = true
"~/code/developers-website" = true

[permissions.dev.filesystem.":workspace_roots"]
"." = "write"
".codex" = "read"
".git" = "read"
".vscode" = "read"
```

If Codex starts in `~/code/codex` with that profile selected, the
effective workspace-root set becomes:
- `~/code/codex` from the runtime `cwd`
- `~/code/openai` from the profile
- `~/code/developers-website` from the profile

The `:workspace_roots` rules are materialized across each root, so
`.git`, `.codex`, and `.vscode` stay scoped the same way everywhere.
Runtime additions such as `--add-dir` can still layer on later stack
entries without mutating the selected profile.

## Stack Shape

This PR intentionally stops before the profile-identity cleanup in
[#22683](https://github.com/openai/codex/pull/22683) so the base review
stays focused on config loading, workspace-root materialization, and
compatibility with legacy `workspace-write`.

The representation in this PR is therefore transitional: `Permissions`
carries enough state to distinguish the raw constrained profile from the
effective runtime profile, and there are still call sites that must keep
the active profile identity and constrained profile value in sync. The
follow-up PR replaces that with a single resolved profile state
(`ResolvedPermissionProfile` / `PermissionProfileState`) that keeps the
profile id, immutable `PermissionProfile`, and profile-declared
workspace roots together. That follow-up removes APIs such as
`set_constrained_permission_profile_with_active_profile()` where
separate arguments could drift out of sync.

Downstream PRs then build on this base to switch app-server turn updates
to profile ids plus runtime workspace roots and to finish the
user-visible summary behavior. Reviewers should judge this PR as the
workspace-roots foundation, not as the final in-memory shape of selected
permission profiles.

## Review Guide

Suggested review order:

1. Start with `codex-rs/core/src/config/mod.rs`.
This is the main shape change in the base slice. `Permissions` now
stores a private raw `Constrained<PermissionProfile>` plus runtime
`workspace_roots`. Callers use `permission_profile()` when they need the
raw constrained value and `effective_permission_profile()` when they
need a materialized runtime profile. As noted above,
[#22683](https://github.com/openai/codex/pull/22683) replaces this
transitional shape with a resolved profile state that keeps identity and
profile data together.

2. Review `codex-rs/config/src/permissions_toml.rs` and
`codex-rs/core/src/config/permissions.rs`.
These add `[permissions.<id>.workspace_roots]`, resolve enabled entries
relative to the policy cwd, and keep `:workspace_roots` deny-read glob
patterns symbolic until the actual roots are known.

3. Review `codex-rs/protocol/src/permissions.rs` and
`codex-rs/protocol/src/models.rs`.
These add the policy/profile materialization helpers that expand exact
`:workspace_roots` entries and scoped deny-read globs over every
workspace root. This is also where `ActivePermissionProfileModification`
is removed from the core model.

4. Review the legacy bridge in
`Config::load_from_base_config_with_overrides` and
`Config::set_legacy_sandbox_policy`.
This is where legacy `workspace-write` roots become runtime workspace
roots, while Codex internal writable roots stay internal and do not
appear as user-facing workspace roots.

5. Then skim downstream call sites.
The interesting pattern is raw-vs-effective access: state/proxy/bwrap
paths keep the raw constrained profile, while execution, summaries, and
user-visible status use the effective profile and workspace-root list.

## What Changed

- added `[permissions.<id>.workspace_roots]` to the config model and
schema
- added runtime `workspace_roots` state to `Config`/`Permissions` and
`ConfigOverrides`
- made `Permissions` profile fields private and replaced direct mutation
with accessors/setters
- added `PermissionProfile` and `FileSystemSandboxPolicy` helpers for
materializing `:workspace_roots` exact paths and deny-read globs across
all roots
- moved legacy additional writable roots into runtime workspace-root
state instead of active profile modifications
- removed `ActivePermissionProfileModification` and its app-server
protocol/schema export
- updated sandbox/status summary paths so internal writable roots are
not reported as user workspace roots

## Verification Strategy

The targeted tests cover the behavior at the layers where regressions
are most likely:
- `codex-rs/core/src/config/config_tests.rs` verifies config loading,
legacy workspace-root seeding, effective profile materialization, and
memory-root handling.
- `codex-rs/core/src/config/permissions_tests.rs` verifies profile
`workspace_roots` parsing and `:workspace_roots` scoped/glob
compilation.
- `codex-rs/protocol/src/permissions.rs` unit tests verify exact and
glob materialization over multiple workspace roots.
- `codex-rs/tui/src/status/tests.rs` and
`codex-rs/utils/sandbox-summary/src/sandbox_summary.rs` verify the
user-facing summaries show effective workspace roots and hide internal
writes.

I also ran `cargo check --tests` locally after the latest stack refresh
to catch cross-crate API breakage from the private-field/accessor
changes.







---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/22610).
* #22612
* #22611
* #22683
* __->__ #22610
2026-05-14 18:25:23 -07:00

3217 lines
100 KiB
Rust

use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs;
use std::sync::OnceLock;
use anyhow::Context;
use anyhow::Result;
use codex_exec_server::CreateDirectoryOptions;
use codex_features::Feature;
use codex_protocol::models::PermissionProfile;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::ExecCommandStatus;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::assert_regex_match;
use core_test_support::managed_network_requirements_loader;
use core_test_support::process::process_is_alive;
use core_test_support::process::wait_for_pid_file;
use core_test_support::process::wait_for_process_exit;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::skip_if_sandbox;
use core_test_support::skip_if_windows;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use core_test_support::wait_for_event_with_timeout;
use pretty_assertions::assert_eq;
use regex_lite::Regex;
use serde_json::Value;
use serde_json::json;
use tokio::time::Duration;
const UNIFIED_EXEC_LAGGED_OUTPUT_TIMEOUT: Duration = Duration::from_secs(30);
fn extract_output_text(item: &Value) -> Option<&str> {
item.get("output").and_then(|value| match value {
Value::String(text) => Some(text.as_str()),
Value::Object(obj) => obj.get("content").and_then(Value::as_str),
_ => None,
})
}
#[derive(Debug)]
struct ParsedUnifiedExecOutput {
chunk_id: Option<String>,
wall_time_seconds: f64,
process_id: Option<String>,
exit_code: Option<i32>,
original_token_count: Option<usize>,
output: String,
}
#[allow(clippy::expect_used)]
fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
static OUTPUT_REGEX: OnceLock<Regex> = OnceLock::new();
let regex = OUTPUT_REGEX.get_or_init(|| {
Regex::new(concat!(
r#"(?s)^(?:Total output lines: \d+\n\n)?"#,
r#"(?:Chunk ID: (?P<chunk_id>[^\n]+)\n)?"#,
r#"Wall time: (?P<wall_time>-?\d+(?:\.\d+)?) seconds\n"#,
r#"(?:Process exited with code (?P<exit_code>-?\d+)\n)?"#,
r#"(?:Process running with session ID (?P<process_id>-?\d+)\n)?"#,
r#"(?:Original token count: (?P<original_token_count>\d+)\n)?"#,
r#"Output:\n?(?P<output>.*)$"#,
))
.expect("valid unified exec output regex")
});
let cleaned = raw.trim_matches('\r');
let captures = regex
.captures(cleaned)
.ok_or_else(|| anyhow::anyhow!("missing Output section in unified exec output {raw}"))?;
let chunk_id = captures
.name("chunk_id")
.map(|value| value.as_str().to_string());
let wall_time_seconds = captures
.name("wall_time")
.expect("wall_time group present")
.as_str()
.parse::<f64>()
.context("failed to parse wall time seconds")?;
let exit_code = captures
.name("exit_code")
.map(|value| {
value
.as_str()
.parse::<i32>()
.context("failed to parse exit code from unified exec output")
})
.transpose()?;
let process_id = captures
.name("process_id")
.map(|value| value.as_str().to_string());
let original_token_count = captures
.name("original_token_count")
.map(|value| {
value
.as_str()
.parse::<usize>()
.context("failed to parse original token count from unified exec output")
})
.transpose()?;
let output = captures
.name("output")
.expect("output group present")
.as_str()
.to_string();
Ok(ParsedUnifiedExecOutput {
chunk_id,
wall_time_seconds,
process_id,
exit_code,
original_token_count,
output,
})
}
fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, ParsedUnifiedExecOutput>> {
let mut outputs = HashMap::new();
for body in bodies {
if let Some(items) = body.get("input").and_then(Value::as_array) {
for item in items {
if item.get("type").and_then(Value::as_str) != Some("function_call_output") {
continue;
}
if let Some(call_id) = item.get("call_id").and_then(Value::as_str) {
let content = extract_output_text(item)
.ok_or_else(|| anyhow::anyhow!("missing tool output content"))?;
let trimmed = content.trim();
if trimmed.is_empty() {
continue;
}
let parsed = parse_unified_exec_output(content).with_context(|| {
format!("failed to parse unified exec output for {call_id}")
})?;
outputs.insert(call_id.to_string(), parsed);
}
}
}
}
Ok(outputs)
}
async fn wait_for_raw_unified_exec_output(
test: &TestCodex,
call_id: &str,
) -> Result<ParsedUnifiedExecOutput> {
let content = wait_for_event_match(&test.codex, |event| match event {
EventMsg::RawResponseItem(raw) => match &raw.item {
ResponseItem::FunctionCallOutput {
call_id: output_call_id,
output,
} if output_call_id == call_id => output.text_content().map(str::to_string),
_ => None,
},
_ => None,
})
.await;
parse_unified_exec_output(&content)
.with_context(|| format!("failed to parse raw unified exec output for {call_id}"))
}
async fn submit_unified_exec_turn(
test: &TestCodex,
prompt: &str,
sandbox_policy: SandboxPolicy,
) -> Result<()> {
let session_model = test.session_configured.model.clone();
test.codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: prompt.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.config.cwd.to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy,
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
Ok(())
}
async fn create_workspace_directory(
test: &TestCodex,
rel_path: impl AsRef<std::path::Path>,
) -> Result<std::path::PathBuf> {
let abs_path = test.config.cwd.join(rel_path.as_ref());
test.fs()
.create_directory(
&abs_path,
CreateDirectoryOptions { recursive: true },
/*sandbox*/ None,
)
.await?;
Ok(abs_path.into_path_buf())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
if let Err(err) = config.features.enable(Feature::UnifiedExec) {
panic!("test config should allow feature update: {err}");
}
});
let harness = TestCodexHarness::with_builder(builder).await?;
let patch =
"*** Begin Patch\n*** Add File: uexec_apply.txt\n+hello from unified exec\n*** End Patch";
let command = format!("apply_patch <<'EOF'\n{patch}\nEOF\n");
let call_id = "uexec-apply-patch";
let args = json!({
"cmd": command,
// The intercepted apply_patch path spawns a helper process, which can
// take longer than a tiny unified-exec yield deadline on CI.
"yield_time_ms": 5_000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(harness.server(), responses).await;
let test = harness.test();
let codex = test.codex.clone();
let cwd = test.cwd_path().to_path_buf();
let session_model = test.session_configured.model.clone();
codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "apply patch via unified exec".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd,
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::DangerFullAccess,
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut saw_patch_begin = false;
let mut patch_end = None;
let mut saw_exec_begin = false;
let mut saw_exec_end = false;
wait_for_event(&codex, |event| match event {
EventMsg::PatchApplyBegin(begin) if begin.call_id == call_id => {
saw_patch_begin = true;
assert!(
begin
.changes
.keys()
.any(|path| path.file_name() == Some(OsStr::new("uexec_apply.txt"))),
"expected apply_patch changes to target uexec_apply.txt",
);
false
}
EventMsg::PatchApplyEnd(end) if end.call_id == call_id => {
patch_end = Some(end.clone());
false
}
EventMsg::ExecCommandBegin(event) if event.call_id == call_id => {
saw_exec_begin = true;
false
}
EventMsg::ExecCommandEnd(event) if event.call_id == call_id => {
saw_exec_end = true;
false
}
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
assert!(
saw_patch_begin,
"expected apply_patch to emit PatchApplyBegin"
);
let patch_end = patch_end.expect("expected apply_patch to emit PatchApplyEnd");
assert!(
patch_end.success,
"expected apply_patch to finish successfully: stdout={:?} stderr={:?}",
patch_end.stdout, patch_end.stderr,
);
assert!(
!saw_exec_begin,
"apply_patch should be intercepted before exec_command begin"
);
assert!(
!saw_exec_end,
"apply_patch should not emit exec_command end events"
);
let output = harness.function_call_stdout(call_id).await;
assert!(
output.contains("Success. Updated the following files:"),
"expected apply_patch output, got: {output:?}"
);
assert!(
output.contains("A uexec_apply.txt"),
"expected apply_patch file summary, got: {output:?}"
);
assert_eq!(
fs::read_to_string(harness.path("uexec_apply.txt"))?,
"hello from unified exec\n"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_model("gpt-5.2").with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let cwd = test.config.cwd.to_path_buf();
let call_id = "uexec-begin-event";
let args = json!({
"shell": "bash".to_string(),
"cmd": "/bin/echo hello unified exec".to_string(),
"yield_time_ms": 250,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "emit begin event", SandboxPolicy::DangerFullAccess).await?;
let begin_event = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::ExecCommandBegin(event) if event.call_id == call_id => Some(event.clone()),
_ => None,
})
.await;
assert_command(&begin_event.command, "-lc", "/bin/echo hello unified exec");
assert_eq!(begin_event.cwd.as_path(), cwd.as_path());
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_resolves_relative_workdir() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_model("gpt-5.2").with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let workdir_rel = std::path::PathBuf::from("uexec_relative_workdir");
let workdir = create_workspace_directory(&test, &workdir_rel).await?;
let call_id = "uexec-workdir-relative";
let args = json!({
"cmd": "pwd",
"yield_time_ms": 250,
"workdir": workdir_rel.to_string_lossy().to_string(),
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"run relative workdir test",
SandboxPolicy::DangerFullAccess,
)
.await?;
let begin_event = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::ExecCommandBegin(event) if event.call_id == call_id => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(
begin_event.cwd.as_path(),
workdir.as_path(),
"exec_command cwd should resolve relative workdir against turn cwd",
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "flaky"]
async fn unified_exec_respects_workdir_override() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_model("gpt-5.2").with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let workdir = create_workspace_directory(&test, "uexec_workdir_test").await?;
let call_id = "uexec-workdir";
let args = json!({
"cmd": "pwd",
"yield_time_ms": 250,
"workdir": workdir.to_string_lossy().to_string(),
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "run workdir test", SandboxPolicy::DangerFullAccess).await?;
let begin_event = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::ExecCommandBegin(event) if event.call_id == call_id => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(
begin_event.cwd.as_path(),
workdir.as_path(),
"exec_command cwd should reflect the requested workdir override"
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-end-event";
let args = json!({
"cmd": "/bin/echo END-EVENT".to_string(),
"yield_time_ms": 250,
});
let poll_call_id = "uexec-end-event-poll";
let poll_args = json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 250,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
poll_call_id,
"write_stdin",
&serde_json::to_string(&poll_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "emit end event", SandboxPolicy::DangerFullAccess).await?;
let end_event = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => Some(ev.clone()),
_ => None,
})
.await;
assert_eq!(end_event.exit_code, 0);
assert!(
end_event.aggregated_output.contains("END-EVENT"),
"expected aggregated output to contain marker"
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-delta-1";
let args = json!({
"cmd": "printf 'HELLO-UEXEC'",
"yield_time_ms": 1000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "emit delta", SandboxPolicy::DangerFullAccess).await?;
let event = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => Some(ev.clone()),
_ => None,
})
.await;
let text = event.stdout;
assert!(
text.contains("HELLO-UEXEC"),
"delta chunk missing expected text: {text:?}",
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_full_lifecycle_with_background_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-full-lifecycle";
// This timing force the long-standing PTY
let args = json!({
"cmd": "sleep 0.5; printf 'HELLO-FULL-LIFECYCLE'",
"yield_time_ms": 1000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"exercise full unified exec lifecycle",
SandboxPolicy::DangerFullAccess,
)
.await?;
let mut begin_event = None;
let mut end_event = None;
let mut task_completed = false;
loop {
let msg = wait_for_event(&test.codex, |_| true).await;
match msg {
EventMsg::ExecCommandBegin(ev) if ev.call_id == call_id => begin_event = Some(ev),
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => {
assert!(
end_event.is_none(),
"expected a single ExecCommandEnd event for this call id"
);
end_event = Some(ev);
if task_completed && end_event.is_some() {
break;
}
}
EventMsg::TurnComplete(_) => {
task_completed = true;
if task_completed && end_event.is_some() {
break;
}
}
_ => {}
}
}
let begin_event = begin_event.expect("expected ExecCommandBegin event");
assert_eq!(begin_event.call_id, call_id);
assert!(
begin_event.process_id.is_some(),
"begin event should include a process_id for a long-lived session"
);
let end_event = end_event.expect("expected ExecCommandEnd event");
assert_eq!(end_event.call_id, call_id);
assert_eq!(end_event.exit_code, 0);
assert!(
end_event.process_id.is_some(),
"end event should include process_id emitted by background watcher"
);
assert!(
end_event.aggregated_output.contains("HELLO-FULL-LIFECYCLE"),
"aggregated_output should contain the full PTY transcript; got {:?}",
end_event.aggregated_output
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_network_denial_emits_failed_background_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let (test, sandbox_policy) = unified_exec_network_denial_test(&server).await?;
let call_id = "uexec-network-denied";
let args = json!({
"cmd": "python3 -c \"import os, socket, time, urllib.parse; time.sleep(0.3); proxy = urllib.parse.urlparse(os.environ['HTTP_PROXY']); sock = socket.create_connection((proxy.hostname, proxy.port), timeout=2); sock.sendall(b'GET http://codex-network-denied.invalid/ HTTP/1.1\\r\\nHost: codex-network-denied.invalid\\r\\n\\r\\n'); sock.recv(1024); time.sleep(5)\"",
"yield_time_ms": 50,
});
let response_mock =
mount_unified_exec_network_denial_responses(&server, call_id, &args).await?;
submit_unified_exec_turn(&test, "exercise network denial", sandbox_policy).await?;
let (end_event, turn_completed) =
wait_for_unified_exec_end(&test, call_id, &response_mock).await;
assert_eq!(end_event.status, ExecCommandStatus::Failed);
assert_eq!(end_event.exit_code, -1);
assert!(
end_event.aggregated_output.contains("Network access"),
"expected network denial message in aggregated output: {:?}",
end_event.aggregated_output
);
assert!(
end_event.process_id.is_some(),
"background denial should end the stored unified exec process"
);
if !turn_completed {
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_short_lived_network_denial_emits_failed_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let (test, sandbox_policy) = unified_exec_network_denial_test(&server).await?;
let call_id = "uexec-short-network-denied";
let args = json!({
"cmd": "python3 -c \"import os, socket, urllib.parse; proxy = urllib.parse.urlparse(os.environ['HTTP_PROXY']); sock = socket.create_connection((proxy.hostname, proxy.port), timeout=2); sock.sendall(b'GET http://codex-short-network-denied.invalid/ HTTP/1.1\\r\\nHost: codex-short-network-denied.invalid\\r\\n\\r\\n'); sock.recv(1024)\"",
"yield_time_ms": 1000,
});
let response_mock =
mount_unified_exec_network_denial_responses(&server, call_id, &args).await?;
submit_unified_exec_turn(&test, "exercise short network denial", sandbox_policy).await?;
let (end_event, turn_completed) =
wait_for_unified_exec_end(&test, call_id, &response_mock).await;
assert_eq!(end_event.status, ExecCommandStatus::Failed);
assert_eq!(end_event.exit_code, -1);
assert!(
end_event.aggregated_output.contains("Network access"),
"expected network denial message in aggregated output: {:?}",
end_event.aggregated_output
);
assert!(
end_event.process_id.is_some(),
"short-lived denial should still emit an end event for the command"
);
if !turn_completed {
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
}
Ok(())
}
#[allow(clippy::expect_used)]
async fn unified_exec_network_denial_test(
server: &wiremock::MockServer,
) -> Result<(TestCodex, SandboxPolicy)> {
use codex_config::Constrained;
use std::sync::Arc;
use tempfile::TempDir;
let home = Arc::new(TempDir::new()?);
fs::write(
home.path().join("config.toml"),
r#"default_permissions = "workspace"
[permissions.workspace.filesystem]
":minimal" = "read"
[permissions.workspace.network]
enabled = true
mode = "limited"
allow_local_binding = true
"#,
)?;
let mut sandbox_policy = SandboxPolicy::new_workspace_write_policy();
if let SandboxPolicy::WorkspaceWrite { network_access, .. } = &mut sandbox_policy {
*network_access = true;
}
let sandbox_policy_for_config = sandbox_policy.clone();
let mut builder = test_codex()
.with_home(home)
.with_cloud_requirements(managed_network_requirements_loader())
.with_config(move |config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::Never);
config
.permissions
.set_permission_profile(PermissionProfile::from_legacy_sandbox_policy(
&sandbox_policy_for_config,
))
.expect("set permission profile");
});
let test = builder.build_with_remote_env(server).await?;
assert!(
test.config.permissions.network.is_some(),
"expected managed network proxy config to be present"
);
Ok((test, sandbox_policy))
}
async fn mount_unified_exec_network_denial_responses(
server: &wiremock::MockServer,
call_id: &str,
args: &Value,
) -> Result<core_test_support::responses::ResponseMock> {
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
Ok(mount_sse_sequence(server, responses).await)
}
async fn wait_for_unified_exec_end(
test: &TestCodex,
call_id: &str,
response_mock: &core_test_support::responses::ResponseMock,
) -> (codex_protocol::protocol::ExecCommandEndEvent, bool) {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15);
let mut observed_events = Vec::new();
let mut turn_completed = false;
let end_event = loop {
let remaining = deadline
.checked_duration_since(std::time::Instant::now())
.unwrap_or_default();
if remaining.is_zero() {
panic!(
"timed out waiting for network denial end event; observed {observed_events:?}; response requests: {}",
response_mock.requests().len()
);
}
let event = match tokio::time::timeout(remaining, test.codex.next_event()).await {
Ok(Ok(event)) => event.msg,
Ok(Err(err)) => panic!("event stream ended unexpectedly: {err}"),
Err(_) => panic!(
"timed out waiting for network denial end event; observed {observed_events:?}; response requests: {}",
response_mock.requests().len()
),
};
turn_completed |= matches!(event, EventMsg::TurnComplete(_));
observed_events.push(format!("{event:?}"));
if let EventMsg::ExecCommandEnd(ev) = event
&& ev.call_id == call_id
{
break ev;
}
};
(end_event, turn_completed)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_terminal_interaction_for_write_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let open_call_id = "uexec-open";
let open_args = json!({
"cmd": "/bin/bash -i",
"yield_time_ms": 200,
"tty": true,
});
let stdin_call_id = "uexec-stdin-delta";
let stdin_args = json!({
"chars": "echo WSTDIN-MARK\\n",
"session_id": 1000,
"yield_time_ms": 800,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
open_call_id,
"exec_command",
&serde_json::to_string(&open_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
stdin_call_id,
"write_stdin",
&serde_json::to_string(&stdin_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "stdin delta", SandboxPolicy::DangerFullAccess).await?;
let mut terminal_interaction = None;
loop {
let msg = wait_for_event(&test.codex, |_| true).await;
match msg {
EventMsg::TerminalInteraction(ev) if ev.call_id == open_call_id => {
terminal_interaction = Some(ev);
}
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
let delta = terminal_interaction.expect("expected TerminalInteraction event");
assert_eq!(delta.process_id, "1000");
let expected_stdin = stdin_args
.get("chars")
.and_then(Value::as_str)
.expect("stdin chars");
assert_eq!(delta.stdin, expected_stdin);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_terminal_interaction_captures_delayed_output() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let open_call_id = "uexec-delayed-open";
let open_args = json!({
"cmd": "sleep 3 && echo MARKER1 && sleep 3 && echo MARKER2",
"yield_time_ms": 10,
"tty": true,
});
// Poll stdin three times: first for no output, second after the first marker,
// and a final long poll to capture the second marker.
let first_poll_call_id = "uexec-delayed-poll-1";
let first_poll_args = json!({
"chars": "x",
"session_id": 1000,
"yield_time_ms": 10,
});
let second_poll_call_id = "uexec-delayed-poll-2";
let second_poll_args = json!({
"chars": "x",
"session_id": 1000,
"yield_time_ms": 4000,
});
let third_poll_call_id = "uexec-delayed-poll-3";
let third_poll_args = json!({
"chars": "x",
"session_id": 1000,
"yield_time_ms": 6000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
open_call_id,
"exec_command",
&serde_json::to_string(&open_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
first_poll_call_id,
"write_stdin",
&serde_json::to_string(&first_poll_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_function_call(
second_poll_call_id,
"write_stdin",
&serde_json::to_string(&second_poll_args)?,
),
ev_completed("resp-3"),
]),
sse(vec![
ev_response_created("resp-4"),
ev_function_call(
third_poll_call_id,
"write_stdin",
&serde_json::to_string(&third_poll_args)?,
),
ev_completed("resp-4"),
]),
sse(vec![
ev_response_created("resp-5"),
ev_assistant_message("msg-1", "complete"),
ev_completed("resp-5"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"delayed terminal interaction output",
SandboxPolicy::DangerFullAccess,
)
.await?;
let mut begin_event = None;
let mut end_event = None;
let mut task_completed = false;
let mut terminal_events = Vec::new();
let mut delta_text = String::new();
// Consume all events for this turn so we can assert on each stage.
loop {
let msg = wait_for_event(&test.codex, |_| true).await;
match msg {
EventMsg::ExecCommandBegin(ev) if ev.call_id == open_call_id => {
begin_event = Some(ev);
}
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == open_call_id => {
delta_text.push_str(&String::from_utf8_lossy(&ev.chunk));
}
EventMsg::TerminalInteraction(ev) if ev.call_id == open_call_id => {
terminal_events.push(ev);
}
EventMsg::ExecCommandEnd(ev) if ev.call_id == open_call_id => {
end_event = Some(ev);
}
EventMsg::TurnComplete(_) => {
task_completed = true;
}
_ => {}
};
if task_completed && end_event.is_some() {
break;
}
}
let begin_event = begin_event.expect("expected ExecCommandBegin event");
assert!(
begin_event.process_id.is_some(),
"begin event should include process_id for a live session"
);
// We expect three terminal interactions matching the three write_stdin calls.
assert_eq!(
terminal_events.len(),
3,
"expected three terminal interactions; got {terminal_events:?}"
);
for event in &terminal_events {
assert_eq!(event.call_id, open_call_id);
assert_eq!(event.process_id, "1000");
}
assert_eq!(
terminal_events
.iter()
.map(|ev| ev.stdin.as_str())
.collect::<Vec<_>>(),
vec!["x", "x", "x"],
"terminal interactions should reflect the three stdin polls"
);
assert!(
delta_text.contains("MARKER1") && delta_text.contains("MARKER2"),
"streamed deltas should contain both markers; got {delta_text:?}"
);
let end_event = end_event.expect("expected ExecCommandEnd event");
assert_eq!(end_event.call_id, open_call_id);
assert_eq!(end_event.exit_code, 0);
assert!(
end_event.process_id.is_some(),
"end event should include the process_id"
);
assert!(
end_event.aggregated_output.contains("MARKER1")
&& end_event.aggregated_output.contains("MARKER2"),
"aggregated output should include both markers in order; got {:?}",
end_event.aggregated_output
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_one_begin_and_one_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let open_call_id = "uexec-open-session";
let open_args = json!({
"shell": "bash".to_string(),
"cmd": "sleep 0.1".to_string(),
"yield_time_ms": 10,
});
let poll_call_id = "uexec-poll-empty";
let poll_args = json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 150,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
open_call_id,
"exec_command",
&serde_json::to_string(&open_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
poll_call_id,
"write_stdin",
&serde_json::to_string(&poll_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "complete"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"check poll event behavior",
SandboxPolicy::DangerFullAccess,
)
.await?;
let mut begin_events = Vec::new();
let mut end_events = Vec::new();
let mut task_completed = false;
loop {
let event_msg = wait_for_event(&test.codex, |_| true).await;
match event_msg {
EventMsg::ExecCommandBegin(event) if event.call_id == open_call_id => {
begin_events.push(event);
}
EventMsg::ExecCommandEnd(event) if event.call_id == open_call_id => {
end_events.push(event);
}
EventMsg::TurnComplete(_) => {
task_completed = true;
}
_ => {}
}
if task_completed && !end_events.is_empty() {
break;
}
}
assert_eq!(
begin_events.len(),
1,
"expected begin events for the startup command"
);
assert_eq!(
end_events.len(),
1,
"expected end event for the write_stdin call"
);
let open_event = &begin_events[0];
assert_command(&open_event.command, "-lc", "sleep 0.1");
assert!(
open_event.interaction_input.is_none(),
"startup begin events should not include interaction input"
);
assert_eq!(open_event.source, ExecCommandSource::UnifiedExecStartup);
let end_event = &end_events[0];
assert_eq!(end_event.call_id, open_call_id);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-metadata";
let args = serde_json::json!({
"cmd": "printf 'token one token two token three token four token five token six token seven'",
"yield_time_ms": 500,
"max_output_tokens": 6,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "run metadata test", SandboxPolicy::DangerFullAccess).await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let metadata = outputs
.get(call_id)
.expect("missing exec_command metadata output");
let chunk_id = metadata.chunk_id.as_ref().expect("missing chunk_id");
assert_eq!(chunk_id.len(), 6, "chunk id should be 6 hex characters");
assert!(
chunk_id.chars().all(|c| c.is_ascii_hexdigit()),
"chunk id should be hexadecimal: {chunk_id}"
);
let wall_time = metadata.wall_time_seconds;
assert!(
wall_time >= 0.0,
"wall_time_seconds should be non-negative, got {wall_time}"
);
assert!(
metadata.process_id.is_none(),
"exec_command for a completed process should not include process_id"
);
let exit_code = metadata.exit_code.expect("expected exit_code");
assert_eq!(exit_code, 0, "expected successful exit");
let output_text = &metadata.output;
assert!(
output_text.contains("tokens truncated"),
"expected truncation notice in output: {output_text:?}"
);
let original_tokens = metadata
.original_token_count
.expect("missing original_token_count") as usize;
assert!(
original_tokens > 6,
"original token count should exceed max_output_tokens"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_command_clamps_model_requested_max_output_tokens_to_policy() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_model("gpt-5.4").with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config.tool_output_token_limit = Some(50);
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-clamped-max-output";
let args = serde_json::json!({
"cmd": "line_number=1; while [ \"$line_number\" -le 999 ]; do printf 'EXEC-LINE-%04d xxxxxxxxxxxxxxxxxxxx\\n' \"$line_number\"; line_number=$((line_number + 1)); done",
"yield_time_ms": 3_000,
"max_output_tokens": 70_000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"run clamped max output test",
SandboxPolicy::DangerFullAccess,
)
.await?;
let output = wait_for_raw_unified_exec_output(&test, call_id).await?;
assert_eq!(output.original_token_count, Some(8_991));
let output_text = output.output.replace("\r\n", "\n");
assert_regex_match(
r"^Total output lines: 999\n\nEXEC-LINE-0001 x{20}\nEXEC-LINE-0002 x{20}\nEXEC-LINE-0003 x{13}…8941 tokens truncated…E-0997 x{20}\nEXEC-LINE-0998 x{20}\nEXEC-LINE-0999 x{20}\n$",
&output_text,
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_clamps_model_requested_max_output_tokens_to_policy() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_model("gpt-5.4").with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config.tool_output_token_limit = Some(50);
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let start_call_id = "uexec-stdin-clamp-start";
let start_args = serde_json::json!({
"cmd": "printf 'READY\\n'; read trigger; line_number=1; while [ \"$line_number\" -le 999 ]; do printf 'STDIN-LINE-%04d yyyyyyyyyyyyyyyyyyyy\\n' \"$line_number\"; line_number=$((line_number + 1)); done",
"yield_time_ms": 500,
"tty": true,
});
let stdin_call_id = "uexec-stdin-clamped-max-output";
let stdin_args = serde_json::json!({
"chars": "go\n",
"session_id": 1000,
"yield_time_ms": 3_000,
"max_output_tokens": 70_000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
stdin_call_id,
"write_stdin",
&serde_json::to_string(&stdin_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"run clamped write_stdin output test",
SandboxPolicy::DangerFullAccess,
)
.await?;
let start_output = wait_for_raw_unified_exec_output(&test, start_call_id).await?;
assert!(
start_output.process_id.is_some(),
"start command should leave a running process for write_stdin"
);
let stdin_output = wait_for_raw_unified_exec_output(&test, stdin_call_id).await?;
assert_eq!(stdin_output.original_token_count, Some(9_492));
let stdin_output_text = stdin_output.output.replace("\r\n", "\n");
assert_regex_match(
r"^Total output lines: 1000\n\ngo\nSTDIN-LINE-0001 y{20}\nSTDIN-LINE-0002 y{20}\nSTDIN-LINE-0003 yyyy…9442 tokens truncated…7 y{20}\nSTDIN-LINE-0998 y{20}\nSTDIN-LINE-0999 y{20}\n$",
&stdin_output_text,
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_defaults_to_pipe() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-default-pipe";
let args = serde_json::json!({
"cmd": "python3 -c \"import sys; print(sys.stdin.isatty())\"",
"yield_time_ms": 1500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"check default pipe mode",
SandboxPolicy::DangerFullAccess,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs
.get(call_id)
.expect("missing default pipe unified exec output");
let normalized = output.output.replace("\r\n", "\n");
assert!(
normalized.contains("False"),
"stdin should not be a tty by default: {normalized:?}"
);
assert_eq!(output.exit_code, Some(0));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_can_enable_tty() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-tty-enabled";
let args = serde_json::json!({
"cmd": "python3 -c \"import sys; print(sys.stdin.isatty())\"",
"yield_time_ms": 1500,
"tty": true,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "check tty enabled", SandboxPolicy::DangerFullAccess).await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs
.get(call_id)
.expect("missing tty-enabled unified exec output");
let normalized = output.output.replace("\r\n", "\n");
assert!(
normalized.contains("True"),
"stdin should be a tty when tty=true: {normalized:?}"
);
assert_eq!(output.exit_code, Some(0));
assert!(output.process_id.is_none(), "process should have exited");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec-early-exit";
let args = serde_json::json!({
"cmd": "sleep 0.05",
"yield_time_ms": 31415,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"watch early exit timing",
SandboxPolicy::DangerFullAccess,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs
.get(call_id)
.expect("missing early exit unified_exec output");
assert!(
output.process_id.is_none(),
"short-lived process should not keep a session alive"
);
assert_eq!(
output.exit_code,
Some(0),
"short-lived process should exit successfully"
);
let wall_time = output.wall_time_seconds;
assert!(
wall_time < 0.75,
"wall_time should reflect early exit rather than the full yield time; got {wall_time}"
);
assert!(
output.output.is_empty(),
"sleep command should not emit output, got {:?}",
output.output
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let start_call_id = "uexec-cat-start";
let send_call_id = "uexec-cat-send";
let exit_call_id = "uexec-cat-exit";
let start_args = serde_json::json!({
"cmd": "/bin/cat",
"yield_time_ms": 500,
"tty": true,
});
let send_args = serde_json::json!({
"chars": "hello unified exec\n",
"session_id": 1000,
"yield_time_ms": 500,
});
let exit_args = serde_json::json!({
"chars": "\u{0004}",
"session_id": 1000,
"yield_time_ms": 500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
send_call_id,
"write_stdin",
&serde_json::to_string(&send_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_function_call(
exit_call_id,
"write_stdin",
&serde_json::to_string(&exit_args)?,
),
ev_completed("resp-3"),
]),
sse(vec![
ev_assistant_message("msg-1", "all done"),
ev_completed("resp-4"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"test write_stdin exit behavior",
SandboxPolicy::DangerFullAccess,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(start_call_id)
.expect("missing start output for exec_command");
let process_id = start_output
.process_id
.clone()
.expect("expected process id from exec_command");
assert!(
process_id.len() > 3,
"process_id should be at least 4 digits, got {process_id}"
);
assert!(
start_output.exit_code.is_none(),
"initial exec_command should not include exit_code while session is running"
);
let send_output = outputs
.get(send_call_id)
.expect("missing write_stdin echo output");
let echoed = send_output.output.as_str();
assert!(
echoed.contains("hello unified exec"),
"expected echoed output from cat, got {echoed:?}"
);
let echoed_session = send_output
.process_id
.clone()
.expect("write_stdin should return process id while process is running");
assert_eq!(
echoed_session, process_id,
"write_stdin should reuse existing process id"
);
assert!(
send_output.exit_code.is_none(),
"write_stdin should not include exit_code while process is running"
);
let exit_output = outputs
.get(exit_call_id)
.expect("missing exit metadata output");
assert!(
exit_output.process_id.is_none(),
"process_id should be omitted once the process exits"
);
let exit_code = exit_output
.exit_code
.expect("expected exit_code after sending EOF");
assert_eq!(exit_code, 0, "cat should exit cleanly after EOF");
let exit_chunk = exit_output
.chunk_id
.as_ref()
.expect("missing chunk id for exit output");
assert!(
exit_chunk.chars().all(|c| c.is_ascii_hexdigit()),
"chunk id should be hexadecimal: {exit_chunk}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let start_call_id = "uexec-end-on-exit-start";
let start_args = serde_json::json!({
"cmd": "/bin/cat",
"yield_time_ms": 200,
"tty": true,
});
let echo_call_id = "uexec-end-on-exit-echo";
let echo_args = serde_json::json!({
"chars": "bye-END\n",
"session_id": 1000,
"yield_time_ms": 300,
});
let exit_call_id = "uexec-end-on-exit";
let exit_args = serde_json::json!({
"chars": "\u{0004}",
"session_id": 1000,
"yield_time_ms": 500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
echo_call_id,
"write_stdin",
&serde_json::to_string(&echo_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_function_call(
exit_call_id,
"write_stdin",
&serde_json::to_string(&exit_args)?,
),
ev_completed("resp-3"),
]),
sse(vec![
ev_response_created("resp-4"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-4"),
]),
];
mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "end on exit", SandboxPolicy::DangerFullAccess).await?;
// We expect the ExecCommandEnd event to match the initial exec_command call_id.
let end_event = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::ExecCommandEnd(ev) if ev.call_id == start_call_id => Some(ev.clone()),
_ => None,
})
.await;
assert_eq!(end_event.exit_code, 0);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_keeps_long_running_session_after_turn_end() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let temp_dir = tempfile::tempdir()?;
let pid_path = temp_dir.path().join("uexec_pid");
let pid_path_str = pid_path.to_string_lossy();
let call_id = "uexec-long-running";
let command = format!("printf '%s' $$ > '{pid_path_str}' && exec sleep 3000");
let args = json!({
"cmd": command,
"yield_time_ms": 250,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "keep unified exec process after turn end".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::DangerFullAccess,
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let begin_event = wait_for_event_match(&codex, |msg| match msg {
EventMsg::ExecCommandBegin(ev) if ev.call_id == call_id => Some(ev.clone()),
_ => None,
})
.await;
let _begin_process_id = begin_event
.process_id
.clone()
.expect("expected process_id for long-running unified exec process");
let pid = wait_for_pid_file(&pid_path).await?;
assert!(
pid.chars().all(|ch| ch.is_ascii_digit()),
"expected numeric pid, got {pid:?}"
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(
process_is_alive(&pid)?,
"expected unified exec process to remain alive after turn completion"
);
codex.submit(Op::Shutdown).await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
wait_for_process_exit(&pid).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_interrupt_preserves_long_running_session() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let temp_dir = tempfile::tempdir()?;
let pid_path = temp_dir.path().join("uexec_pid_interrupt");
let pid_path_str = pid_path.to_string_lossy();
let call_id = "uexec-long-running-interrupt";
let command = format!("printf '%s' $$ > '{pid_path_str}' && exec sleep 3000");
let args = json!({
"cmd": command,
"yield_time_ms": 30000,
});
let responses = vec![sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
])];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "interrupt long-running unified exec".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::DangerFullAccess,
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let _begin_event = wait_for_event_match(&codex, |msg| match msg {
EventMsg::ExecCommandBegin(ev) if ev.call_id == call_id => Some(ev.clone()),
_ => None,
})
.await;
let pid = wait_for_pid_file(&pid_path).await?;
assert!(
pid.chars().all(|ch| ch.is_ascii_digit()),
"expected numeric pid, got {pid:?}"
);
codex.submit(Op::Interrupt).await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnAborted(_))).await;
assert!(
process_is_alive(&pid)?,
"expected unified exec process to remain alive after interrupt"
);
codex.submit(Op::CleanBackgroundTerminals).await?;
wait_for_process_exit(&pid).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let first_call_id = "uexec-start";
let first_args = serde_json::json!({
"cmd": "/bin/cat",
"yield_time_ms": 200,
"tty": true,
});
let second_call_id = "uexec-stdin";
let second_args = serde_json::json!({
"chars": "hello unified exec\n",
"session_id": 1000,
"yield_time_ms": 500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
first_call_id,
"exec_command",
&serde_json::to_string(&first_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
second_call_id,
"write_stdin",
&serde_json::to_string(&second_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "all done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "run unified exec", SandboxPolicy::DangerFullAccess).await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(first_call_id)
.expect("missing first unified_exec output");
let process_id = start_output.process_id.clone().unwrap_or_default();
assert!(
!process_id.is_empty(),
"expected process id in first unified_exec response"
);
assert!(start_output.output.is_empty());
let reuse_output = outputs
.get(second_call_id)
.expect("missing reused unified_exec output");
assert_eq!(
reuse_output.process_id.clone().unwrap_or_default(),
process_id
);
let echoed = reuse_output.output.as_str();
assert!(
echoed.contains("hello unified exec"),
"expected echoed output, got {echoed:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_streams_after_lagged_output() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let script = r#"python3 - <<'PY'
import sys
import time
chunk = b'long content here to trigger truncation' * (1 << 10)
for _ in range(4):
sys.stdout.buffer.write(chunk)
sys.stdout.flush()
time.sleep(0.2)
for _ in range(5):
sys.stdout.write("TAIL-MARKER\n")
sys.stdout.flush()
time.sleep(0.05)
time.sleep(0.2)
PY
"#;
let first_call_id = "uexec-lag-start";
let first_args = serde_json::json!({
"cmd": script,
"yield_time_ms": 25,
"tty": true,
});
let second_call_id = "uexec-lag-poll";
let second_args = serde_json::json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 2_000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
first_call_id,
"exec_command",
&serde_json::to_string(&first_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
second_call_id,
"write_stdin",
&serde_json::to_string(&second_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "lag handled"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"exercise lag handling",
SandboxPolicy::DangerFullAccess,
)
.await?;
// This is a worst case scenario for the truncate logic, and CI can spend a
// while draining the lagged tail before the follow-up tool call completes.
wait_for_event_with_timeout(
&test.codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
UNIFIED_EXEC_LAGGED_OUTPUT_TIMEOUT,
)
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(first_call_id)
.expect("missing initial unified_exec output");
let process_id = start_output.process_id.clone().unwrap_or_default();
assert!(
!process_id.is_empty(),
"expected session id from initial unified_exec response"
);
let poll_output = outputs
.get(second_call_id)
.expect("missing poll unified_exec output");
let poll_text = poll_output.output.as_str();
assert!(
poll_text.contains("TAIL-MARKER"),
"expected poll output to contain tail marker, got {poll_text:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let first_call_id = "uexec-timeout";
let first_args = serde_json::json!({
"cmd": "sleep 0.5; echo ready",
"yield_time_ms": 10,
});
let second_call_id = "uexec-poll";
let second_args = serde_json::json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 800,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
first_call_id,
"exec_command",
&serde_json::to_string(&first_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
second_call_id,
"write_stdin",
&serde_json::to_string(&second_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(&test, "check timeout", SandboxPolicy::DangerFullAccess).await?;
loop {
let event = test.codex.next_event().await.expect("event");
if matches!(event.msg, EventMsg::TurnComplete(_)) {
break;
}
}
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let first_output = outputs.get(first_call_id).expect("missing timeout output");
assert!(first_output.process_id.is_some());
assert!(first_output.output.is_empty());
let poll_output = outputs.get(second_call_id).expect("missing poll output");
let output_text = poll_output.output.as_str();
assert!(
output_text.contains("ready"),
"expected ready output, got {output_text:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Skipped on arm because the ctor logic to handle arg0 doesn't work on ARM
#[cfg(not(target_arch = "arm"))]
async fn unified_exec_formats_large_output_summary() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let script = r#"python3 - <<'PY'
import sys
sys.stdout.write("token token \n" * 5000)
PY
"#;
let call_id = "uexec-large-output";
let args = serde_json::json!({
"cmd": script,
"max_output_tokens": 100,
"yield_time_ms": 500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"summarize large output",
SandboxPolicy::DangerFullAccess,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let large_output = outputs.get(call_id).expect("missing large output summary");
let output_text = large_output.output.replace("\r\n", "\n");
let truncated_pattern = r"(?s)^Total output lines: \d+\n\n(token token \n){5,}.*…\d+ tokens truncated….*(token token \n){5,}$";
assert_regex_match(truncated_pattern, &output_text);
let original_tokens = large_output
.original_token_count
.expect("missing original_token_count for large output summary");
assert!(original_tokens > 0);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_runs_under_sandbox() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let call_id = "uexec";
let args = serde_json::json!({
"cmd": "echo 'hello'",
"yield_time_ms": 500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "summarize large output".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
// Important!
sandbox_policy: SandboxPolicy::new_read_only_policy(),
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");
assert_regex_match("hello[\r\n]+", &output.output);
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_enforces_glob_deny_read_policy() -> Result<()> {
use codex_protocol::models::PermissionProfile;
use codex_protocol::permissions::FileSystemAccessMode;
use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSandboxEntry;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let read_only_policy = SandboxPolicy::new_read_only_policy();
let read_only_policy_for_config = read_only_policy.clone();
let mut builder = test_codex().with_config(move |config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
config
.set_legacy_sandbox_policy(read_only_policy_for_config)
.expect("set sandbox policy");
let mut file_system_sandbox_policy = FileSystemSandboxPolicy::default();
file_system_sandbox_policy
.entries
.push(FileSystemSandboxEntry {
path: FileSystemPath::GlobPattern {
pattern: format!("{}/**/*.env", config.cwd.as_path().display()),
},
access: FileSystemAccessMode::None,
});
config
.permissions
.set_permission_profile(PermissionProfile::from_runtime_permissions(
&file_system_sandbox_policy,
NetworkSandboxPolicy::Restricted,
))
.expect("set permission profile");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let fixture_dir = cwd.path().join("glob-deny-read");
fs::create_dir_all(&fixture_dir).context("create glob deny-read fixture directory")?;
let denied_path = fixture_dir.join("secret.env");
let allowed_path = fixture_dir.join("notes.txt");
let secret = "unified exec glob deny-read secret";
let allowed = "unified exec glob deny-read allowed";
fs::write(&denied_path, format!("{secret}\n")).context("write denied fixture")?;
fs::write(&allowed_path, format!("{allowed}\n")).context("write allowed fixture")?;
let call_id = "uexec-glob-deny-read";
let cmd = format!(
"read_status=0; cat {denied_path:?} || read_status=$?; cat {allowed_path:?}; exit $read_status"
);
let args = serde_json::json!({
"cmd": cmd,
"yield_time_ms": 5_000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "read the fixture files".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: read_only_policy,
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");
assert!(
output.exit_code.is_some_and(|code| code != 0),
"glob deny-read should surface a non-zero exit code: {output:?}"
);
assert!(
output.output.contains(allowed),
"expected allowed file contents in unified exec output: {output:?}"
);
assert!(
!output.output.contains(secret),
"denied file contents leaked into unified exec output: {output:?}"
);
let output_lower = output.output.to_lowercase();
let has_denial = output_lower.contains("permission denied")
|| output_lower.contains("operation not permitted")
|| output_lower.contains("read-only file system");
assert!(
has_denial,
"expected sandbox denial details in unified exec output: {output:?}"
);
Ok(())
}
#[cfg(target_os = "macos")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> {
skip_if_no_network!(Ok(()));
let python = match which::which("python").or_else(|_| which::which("python3")) {
Ok(path) => path,
Err(_) => {
eprintln!("python not found in PATH, skipping test.");
return Ok(());
}
};
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let startup_call_id = "uexec-python-seatbelt";
let startup_args = serde_json::json!({
"cmd": format!("{} -i", python.display()),
"yield_time_ms": 1_500,
"tty": true,
});
let exit_call_id = "uexec-python-exit";
let exit_args = serde_json::json!({
"chars": "exit()\n",
"session_id": 1000,
"yield_time_ms": 1_500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
startup_call_id,
"exec_command",
&serde_json::to_string(&startup_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
exit_call_id,
"write_stdin",
&serde_json::to_string(&exit_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "start python under seatbelt".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
permission_profile: None,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let startup_output = outputs
.get(startup_call_id)
.expect("missing python startup output");
let output_text = startup_output.output.replace("\r\n", "\n");
// This assert that we are in a TTY.
assert!(
output_text.contains(">>>"),
"python prompt missing from seatbelt output: {output_text:?}"
);
assert_eq!(
startup_output.process_id.as_deref(),
Some("1000"),
"python session should stay alive for follow-up input"
);
let exit_output = outputs
.get(exit_call_id)
.expect("missing python exit output");
assert_eq!(
exit_output.exit_code,
Some(0),
"python should exit cleanly after exit()"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_runs_on_all_platforms() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let call_id = "uexec";
let args = serde_json::json!({
"cmd": "echo 'hello crossplat'",
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"summarize large output",
SandboxPolicy::DangerFullAccess,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");
// TODO: Weaker match because windows produces control characters
assert_regex_match(".*hello crossplat.*", &output.output);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
const MAX_SESSIONS_FOR_TEST: i32 = 64;
const FILLER_SESSIONS: i32 = MAX_SESSIONS_FOR_TEST - 1;
let keep_call_id = "uexec-prune-keep";
let keep_args = serde_json::json!({
"cmd": "/bin/cat",
"yield_time_ms": 250,
"tty": true,
});
let prune_call_id = "uexec-prune-target";
// Give the sleeper time to exit before the filler sessions trigger pruning.
let prune_args = serde_json::json!({
"cmd": "sleep 1",
"yield_time_ms": 1_250,
"tty": true,
});
let mut events = vec![ev_response_created("resp-prune-1")];
events.push(ev_function_call(
keep_call_id,
"exec_command",
&serde_json::to_string(&keep_args)?,
));
events.push(ev_function_call(
prune_call_id,
"exec_command",
&serde_json::to_string(&prune_args)?,
));
for idx in 0..FILLER_SESSIONS {
let filler_args = serde_json::json!({
"cmd": format!("echo filler {idx}"),
"yield_time_ms": 250,
});
let call_id = format!("uexec-prune-fill-{idx}");
events.push(ev_function_call(
&call_id,
"exec_command",
&serde_json::to_string(&filler_args)?,
));
}
let keep_write_call_id = "uexec-prune-keep-write";
let keep_write_args = serde_json::json!({
"chars": "still alive\n",
"session_id": 1000,
"yield_time_ms": 500,
});
events.push(ev_function_call(
keep_write_call_id,
"write_stdin",
&serde_json::to_string(&keep_write_args)?,
));
let probe_call_id = "uexec-prune-probe";
let probe_args = serde_json::json!({
"chars": "should fail\n",
"session_id": 1001,
"yield_time_ms": 500,
});
events.push(ev_function_call(
probe_call_id,
"write_stdin",
&serde_json::to_string(&probe_args)?,
));
events.push(ev_completed("resp-prune-1"));
let first_response = sse(events);
let completion_response = sse(vec![
ev_response_created("resp-prune-2"),
ev_assistant_message("msg-prune", "done"),
ev_completed("resp-prune-2"),
]);
let response_mock =
mount_sse_sequence(&server, vec![first_response, completion_response]).await;
submit_unified_exec_turn(&test, "fill session cache", SandboxPolicy::DangerFullAccess).await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = response_mock.requests();
assert!(
!requests.is_empty(),
"expected at least one response request"
);
let keep_start = requests
.iter()
.find_map(|req| req.function_call_output_text(keep_call_id))
.expect("missing initial keep session output");
let keep_start_output = parse_unified_exec_output(&keep_start)?;
assert!(keep_start_output.process_id.is_some());
assert!(keep_start_output.exit_code.is_none());
let prune_start = requests
.iter()
.find_map(|req| req.function_call_output_text(prune_call_id))
.expect("missing initial prune process output");
let prune_start_output = parse_unified_exec_output(&prune_start)?;
assert!(prune_start_output.process_id.is_some());
assert!(prune_start_output.exit_code.is_none());
let keep_write = requests
.iter()
.find_map(|req| req.function_call_output_text(keep_write_call_id))
.expect("missing keep write output");
let keep_write_output = parse_unified_exec_output(&keep_write)?;
assert!(keep_write_output.process_id.is_some());
assert!(
keep_write_output.output.contains("still alive"),
"expected cat process to echo input, got {:?}",
keep_write_output.output
);
let pruned_probe = requests
.iter()
.find_map(|req| req.function_call_output_text(probe_call_id))
.expect("missing probe output");
assert!(
pruned_probe.contains("UnknownProcessId") || pruned_probe.contains("Unknown process id"),
"expected probe to fail after pruning, got {pruned_probe:?}"
);
Ok(())
}
fn assert_command(command: &[String], expected_args: &str, expected_cmd: &str) {
assert_eq!(command.len(), 3);
let shell_path = &command[0];
assert!(
shell_path == "/bin/bash"
|| shell_path == "/usr/bin/bash"
|| shell_path == "/usr/local/bin/bash"
|| shell_path.ends_with("/bash"),
"unexpected bash path: {shell_path}"
);
assert_eq!(command[1], expected_args);
assert_eq!(command[2], expected_cmd);
}