Compare commits

...

4 Commits

Author SHA1 Message Date
starr-openai
16f046ccaa Fix isolated remote exec verification glue
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 19:24:16 -07:00
starr-openai
1f045b824e Isolate remote exec-server shell glue
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 19:13:49 -07:00
starr-openai
51150c7616 Cap remote shell output deltas
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 19:11:54 -07:00
starr-openai
993c967df7 Route shell commands through exec environments
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 19:11:52 -07:00
6 changed files with 352 additions and 24 deletions

View File

@@ -33,6 +33,7 @@ use crate::spawn::StdioPolicy;
use crate::spawn::spawn_child_async;
use crate::text_encoding::bytes_to_string_smart;
use crate::tools::sandboxing::SandboxablePreference;
use codex_exec_server::Environment as ExecutorEnvironment;
use codex_network_proxy::NetworkProxy;
#[cfg(any(target_os = "windows", test))]
use codex_protocol::permissions::FileSystemSandboxKind;
@@ -41,6 +42,9 @@ use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use codex_utils_pty::process_group::kill_child_process_group;
#[path = "exec_remote.rs"]
mod exec_remote;
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
// Hardcode these since it does not seem worth including the libc crate just
@@ -367,6 +371,27 @@ pub(crate) async fn execute_exec_request(
finalize_exec_result(raw_output_result, sandbox, duration)
}
pub(crate) async fn execute_exec_request_in_environment(
exec_request: ExecRequest,
environment: &ExecutorEnvironment,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> Result<ExecToolCallOutput> {
if environment.experimental_exec_server_url().is_none() {
let effective_policy = exec_request.sandbox_policy.clone();
return execute_exec_request(exec_request, &effective_policy, stdout_stream, after_spawn)
.await;
}
exec_remote::execute_exec_request_via_environment(
exec_request,
environment,
stdout_stream,
after_spawn,
)
.await
}
#[cfg(target_os = "windows")]
fn extract_create_process_as_user_error_code(err: &str) -> Option<String> {
let marker = "CreateProcessAsUserW failed: ";
@@ -1050,22 +1075,7 @@ async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
if let Some(stream) = &stream
&& emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
{
let chunk = tmp[..n].to_vec();
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk,
});
let event = Event {
id: stream.sub_id.clone(),
msg,
};
#[allow(clippy::let_unit_value)]
let _ = stream.tx_event.send(event).await;
emit_output_delta(Some(stream), is_stderr, tmp[..n].to_vec()).await;
emitted_deltas += 1;
}
@@ -1083,6 +1093,28 @@ async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
})
}
async fn emit_output_delta(stream: Option<&StdoutStream>, is_stderr: bool, chunk: Vec<u8>) {
let Some(stream) = stream else {
return;
};
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk,
});
let event = Event {
id: stream.sub_id.clone(),
msg,
};
#[allow(clippy::let_unit_value)]
let _ = stream.tx_event.send(event).await;
}
#[cfg(unix)]
fn synthetic_exit_status(code: i32) -> ExitStatus {
use std::os::unix::process::ExitStatusExt;

View File

@@ -0,0 +1,230 @@
use std::io;
use std::time::Instant;
use codex_exec_server::Environment as ExecutorEnvironment;
use codex_exec_server::ExecOutputStream as ExecutorOutputStream;
use codex_exec_server::ExecParams as ExecutorExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ReadParams as ExecutorReadParams;
use uuid::Uuid;
use super::AGGREGATE_BUFFER_INITIAL_CAPACITY;
use super::CodexErr;
use super::EXIT_CODE_SIGNAL_BASE;
use super::ExecCapturePolicy;
use super::ExecExpiration;
use super::ExecRequest;
use super::ExecToolCallOutput;
use super::MAX_EXEC_OUTPUT_DELTAS_PER_CALL;
use super::READ_CHUNK_SIZE;
use super::RawExecToolCallOutput;
use super::Result;
use super::SIGKILL_CODE;
use super::StdoutStream;
use super::StreamOutput;
use super::TIMEOUT_CODE;
use super::aggregate_output;
use super::append_capped;
use super::emit_output_delta;
use super::finalize_exec_result;
use super::synthetic_exit_status;
pub(crate) async fn execute_exec_request_via_environment(
exec_request: ExecRequest,
environment: &ExecutorEnvironment,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> Result<ExecToolCallOutput> {
let ExecRequest {
command,
cwd,
mut env,
network,
expiration,
capture_policy,
sandbox,
windows_sandbox_level: _,
windows_sandbox_private_desktop: _,
sandbox_permissions: _,
sandbox_policy: _,
file_system_sandbox_policy: _,
network_sandbox_policy: _,
justification: _,
arg0,
} = exec_request;
if let Some(network) = network.as_ref() {
network.apply_to_env(&mut env);
}
let process_id = format!("shell-{}", Uuid::new_v4());
let params = ExecutorExecParams {
process_id: process_id.clone(),
argv: command,
cwd,
env,
tty: false,
arg0,
};
let executor = environment.get_executor();
let start = Instant::now();
executor
.start(params)
.await
.map_err(exec_server_error_to_codex)?;
if let Some(after_spawn) = after_spawn {
after_spawn();
}
let raw_output_result = consume_exec_server_output(
executor,
&process_id,
expiration,
capture_policy,
stdout_stream,
)
.await;
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox, duration)
}
pub(crate) async fn consume_exec_server_output(
executor: std::sync::Arc<dyn ExecProcess>,
process_id: &str,
expiration: ExecExpiration,
capture_policy: ExecCapturePolicy,
stdout_stream: Option<StdoutStream>,
) -> Result<RawExecToolCallOutput> {
let retained_bytes_cap = capture_policy.retained_bytes_cap();
let mut stdout = Vec::with_capacity(
retained_bytes_cap.map_or(AGGREGATE_BUFFER_INITIAL_CAPACITY, |max_bytes| {
AGGREGATE_BUFFER_INITIAL_CAPACITY.min(max_bytes)
}),
);
let mut stderr = Vec::with_capacity(stdout.capacity());
let mut after_seq = None;
let mut exit_status = None;
let mut timed_out = false;
let mut emitted_deltas = 0usize;
let expiration_wait = async {
if capture_policy.uses_expiration() {
expiration.wait().await;
} else {
std::future::pending::<()>().await;
}
};
tokio::pin!(expiration_wait);
loop {
let read_future = executor.read(ExecutorReadParams {
process_id: process_id.to_string(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(50),
});
tokio::pin!(read_future);
let read_response = tokio::select! {
response = &mut read_future => response.map_err(exec_server_error_to_codex)?,
_ = &mut expiration_wait => {
timed_out = true;
let _ = executor.terminate(process_id).await;
break;
}
_ = tokio::signal::ctrl_c() => {
let _ = executor.terminate(process_id).await;
exit_status = Some(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE));
break;
}
};
after_seq = Some(read_response.next_seq.saturating_sub(1));
for chunk in read_response.chunks {
let bytes = chunk.chunk.into_inner();
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
if emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
emit_output_delta(stdout_stream.as_ref(), is_stderr, bytes.clone()).await;
emitted_deltas += 1;
}
match chunk.stream {
ExecutorOutputStream::Stderr => {
append_with_cap(&mut stderr, &bytes, retained_bytes_cap)
}
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
append_with_cap(&mut stdout, &bytes, retained_bytes_cap)
}
}
}
if read_response.exited {
exit_status = Some(synthetic_exit_status(read_response.exit_code.unwrap_or(-1)));
loop {
let drain_response = executor
.read(ExecutorReadParams {
process_id: process_id.to_string(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(0),
})
.await
.map_err(exec_server_error_to_codex)?;
if drain_response.chunks.is_empty() {
break;
}
after_seq = Some(drain_response.next_seq.saturating_sub(1));
for chunk in drain_response.chunks {
let bytes = chunk.chunk.into_inner();
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
if emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
emit_output_delta(stdout_stream.as_ref(), is_stderr, bytes.clone()).await;
emitted_deltas += 1;
}
match chunk.stream {
ExecutorOutputStream::Stderr => {
append_with_cap(&mut stderr, &bytes, retained_bytes_cap)
}
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
append_with_cap(&mut stdout, &bytes, retained_bytes_cap)
}
}
}
}
break;
}
}
let stdout = StreamOutput {
text: stdout,
truncated_after_lines: None,
};
let stderr = StreamOutput {
text: stderr,
truncated_after_lines: None,
};
let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);
Ok(RawExecToolCallOutput {
exit_status: exit_status
.unwrap_or_else(|| synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE)),
stdout,
stderr,
aggregated_output,
timed_out,
})
}
fn append_with_cap(dst: &mut Vec<u8>, src: &[u8], max_bytes: Option<usize>) {
if let Some(max_bytes) = max_bytes {
append_capped(dst, src, max_bytes);
} else {
dst.extend_from_slice(src);
}
}
fn exec_server_error_to_codex(err: codex_exec_server::ExecServerError) -> CodexErr {
CodexErr::Io(io::Error::other(err.to_string()))
}

View File

@@ -1,4 +1,6 @@
use super::exec_remote::consume_exec_server_output;
use super::*;
use codex_exec_server::Environment as ExecutorEnvironment;
use codex_protocol::config_types::WindowsSandboxLevel;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
@@ -199,6 +201,58 @@ async fn read_output_retains_all_bytes_for_full_buffer_capture() {
assert_eq!(out.text.len(), expected_len);
}
#[tokio::test]
async fn consume_exec_server_output_collects_split_streams() -> anyhow::Result<()> {
let environment = ExecutorEnvironment::create(None).await?;
let executor = environment.get_executor();
let process_id = format!("exec-test-{}", uuid::Uuid::new_v4());
#[cfg(windows)]
let argv = vec![
"powershell.exe".to_string(),
"-NonInteractive".to_string(),
"-NoLogo".to_string(),
"-Command".to_string(),
"[Console]::Out.Write('hello'); [Console]::Error.Write('oops')".to_string(),
];
#[cfg(not(windows))]
let argv = vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf hello; printf oops >&2".to_string(),
];
executor
.start(codex_exec_server::ExecParams {
process_id: process_id.clone(),
argv,
cwd: std::env::current_dir()?,
env: HashMap::new(),
tty: false,
arg0: None,
})
.await?;
let output = consume_exec_server_output(
executor,
&process_id,
5_000.into(),
ExecCapturePolicy::ShellTool,
None,
)
.await?;
assert_eq!(output.exit_status.code(), Some(0));
assert_eq!(bytes_to_string_smart(&output.stdout.text), "hello");
assert_eq!(bytes_to_string_smart(&output.stderr.text), "oops");
assert_eq!(
bytes_to_string_smart(&output.aggregated_output.text),
"hellooops"
);
Ok(())
}
#[test]
fn aggregate_output_keeps_all_bytes_when_uncapped() {
let stdout = StreamOutput {

View File

@@ -15,7 +15,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::execute_exec_request;
use crate::exec::execute_exec_request_in_environment;
use crate::exec_env::create_env;
use crate::parse_command::parse_command;
use crate::protocol::EventMsg;
@@ -187,9 +187,9 @@ pub(crate) async fn execute_user_shell_command(
tx_event: session.get_tx_event(),
});
let exec_result = execute_exec_request(
let exec_result = execute_exec_request_in_environment(
exec_env,
&sandbox_policy,
session.services.environment.as_ref(),
stdout_stream,
/*after_spawn*/ None,
)

View File

@@ -10,12 +10,12 @@ pub(crate) mod zsh_fork_backend;
use crate::command_canonicalization::canonicalize_command_for_approval;
use crate::exec::ExecToolCallOutput;
use crate::exec::execute_exec_request_in_environment;
use crate::guardian::GuardianApprovalRequest;
use crate::guardian::review_approval_request;
use crate::guardian::routes_approval_to_guardian;
use crate::powershell::prefix_powershell_script_with_utf8;
use crate::sandboxing::SandboxPermissions;
use crate::sandboxing::execute_env;
use crate::shell::ShellType;
use crate::tools::network_approval::NetworkApprovalMode;
use crate::tools::network_approval::NetworkApprovalSpec;
@@ -231,8 +231,10 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
} else {
command
};
let environment = ctx.session.services.environment.as_ref();
let remote_exec_server_enabled = environment.experimental_exec_server_url().is_some();
if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
if self.backend == ShellRuntimeBackend::ShellCommandZshFork && !remote_exec_server_enabled {
match zsh_fork_backend::maybe_run_shell_command(req, attempt, ctx, &command).await? {
Some(out) => return Ok(out),
None => {
@@ -241,6 +243,10 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
);
}
}
} else if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
tracing::warn!(
"ZshFork backend specified, but exec-server environments require the standard shell runtime path; falling back to normal execution",
);
}
let spec = build_command_spec(
@@ -255,9 +261,14 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
let env = attempt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
let out = execute_env(env, Self::stdout_stream(ctx))
.await
.map_err(ToolError::Codex)?;
let out = execute_exec_request_in_environment(
env,
environment,
Self::stdout_stream(ctx),
/*after_spawn*/ None,
)
.await
.map_err(ToolError::Codex)?;
Ok(out)
}
}

View File

@@ -48,6 +48,7 @@ pub use protocol::ExecParams;
pub use protocol::ExecResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ProcessOutputChunk;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::TerminateParams;