codex: extract remote shell runtime path

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-05-05 12:45:14 -07:00
parent c7b4daee61
commit bf7937b4a7
3 changed files with 495 additions and 205 deletions

View File

@@ -4,15 +4,14 @@ Runtime: shell
Executes shell requests under the orchestrator: asks for approval when needed,
builds sandbox transform inputs, and runs them under the current SandboxAttempt.
*/
pub(crate) mod remote_exec;
#[cfg(unix)]
pub(crate) mod unix_escalation;
pub(crate) mod zsh_fork_backend;
use crate::command_canonicalization::canonicalize_command_for_approval;
use crate::exec::ExecCapturePolicy;
use crate::exec::ExecExpiration;
use crate::exec::StdoutStream;
use crate::exec::is_likely_sandbox_denied;
use crate::guardian::GuardianApprovalRequest;
use crate::guardian::GuardianNetworkAccessTrigger;
use crate::guardian::review_approval_request;
@@ -40,28 +39,16 @@ use crate::tools::sandboxing::managed_network_for_sandbox_permissions;
use crate::tools::sandboxing::sandbox_override_for_first_attempt;
use crate::tools::sandboxing::with_cached_approval;
use codex_exec_server::Environment;
use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams as ExecServerParams;
use codex_exec_server::ProcessId;
use codex_network_proxy::NetworkProxy;
use codex_protocol::error::CodexErr;
use codex_protocol::error::SandboxErr;
use codex_protocol::exec_output::ExecToolCallOutput;
use codex_protocol::exec_output::StreamOutput;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandOutputDeltaEvent;
use codex_protocol::protocol::ReviewDecision;
use codex_sandboxing::SandboxablePreference;
use codex_shell_command::powershell::prefix_powershell_script_with_utf8;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use futures::future::BoxFuture;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
#[derive(Clone, Debug)]
pub struct ShellRequest {
@@ -317,7 +304,9 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
.map_err(|err| ToolError::Codex(err.into()))?;
exec_env.exec_server_env_config = req.exec_server_env_config.clone();
let out = if req.environment.is_remote() {
run_remote_shell(req, exec_env, attempt, ctx).await?
remote_exec::RemoteShellExecutor::new(req, &exec_env, attempt, ctx)
.run(exec_env)
.await?
} else {
execute_env(exec_env, Self::stdout_stream(ctx))
.await
@@ -326,193 +315,3 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
Ok(out)
}
}
fn exec_server_env_for_request(
request: &ExecRequest,
) -> (
Option<codex_exec_server::ExecEnvPolicy>,
HashMap<String, String>,
) {
if let Some(exec_server_env_config) = &request.exec_server_env_config {
let env = exec_server_env_config.env_overlay(&request.env);
(Some(exec_server_env_config.policy.clone()), env)
} else {
(None, request.env.clone())
}
}
fn exec_server_params_for_request(request: &ExecRequest, call_id: &str) -> ExecServerParams {
let (env_policy, env) = exec_server_env_for_request(request);
ExecServerParams {
process_id: ProcessId::from(call_id.to_string()),
argv: request.command.clone(),
cwd: request.cwd.to_path_buf(),
env_policy,
env,
tty: false,
pipe_stdin: false,
arg0: request.arg0.clone(),
}
}
fn append_capped(dst: &mut Vec<u8>, src: &[u8]) {
if dst.len() >= DEFAULT_OUTPUT_BYTES_CAP {
return;
}
let remaining = DEFAULT_OUTPUT_BYTES_CAP.saturating_sub(dst.len());
dst.extend_from_slice(&src[..src.len().min(remaining)]);
}
async fn emit_output_delta(
stdout_stream: &StdoutStream,
call_stream: ExecOutputStream,
chunk: Vec<u8>,
) {
let stream = match call_stream {
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
codex_protocol::protocol::ExecOutputStream::Stdout
}
ExecOutputStream::Stderr => codex_protocol::protocol::ExecOutputStream::Stderr,
};
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stdout_stream.call_id.clone(),
stream,
chunk,
});
let event = Event {
id: stdout_stream.sub_id.clone(),
msg,
};
let _ = stdout_stream.tx_event.send(event).await;
}
fn shell_output(
stdout: Vec<u8>,
stderr: Vec<u8>,
aggregated_output: Vec<u8>,
exit_code: i32,
duration: Duration,
timed_out: bool,
) -> ExecToolCallOutput {
let stdout = StreamOutput {
text: stdout,
truncated_after_lines: None,
};
let stderr = StreamOutput {
text: stderr,
truncated_after_lines: None,
};
ExecToolCallOutput {
exit_code,
stdout: stdout.from_utf8_lossy(),
stderr: stderr.from_utf8_lossy(),
aggregated_output: StreamOutput {
text: codex_protocol::exec_output::bytes_to_string_smart(&aggregated_output),
truncated_after_lines: None,
},
duration,
timed_out,
}
}
async fn run_remote_shell(
req: &ShellRequest,
exec_env: ExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
) -> Result<ExecToolCallOutput, ToolError> {
let start = Instant::now();
let expiration: ExecExpiration = req.timeout_ms.into();
let timeout = expiration.timeout_ms().map(Duration::from_millis);
let deadline = timeout.map(|timeout| start + timeout);
let started = req
.environment
.get_exec_backend()
.start(exec_server_params_for_request(&exec_env, &ctx.call_id))
.await
.map_err(|err| ToolError::Rejected(err.to_string()))?;
let stdout_stream = ShellRuntime::stdout_stream(ctx);
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut aggregated_output = Vec::new();
let mut after_seq = None;
let mut exit_code = None;
loop {
if let Some(cancellation) = attempt.network_denial_cancellation_token.as_ref()
&& cancellation.is_cancelled()
{
let _ = started.process.terminate().await;
return Err(ToolError::Rejected(
"Network access was denied by the Codex sandbox network proxy.".to_string(),
));
}
let wait_ms = deadline
.map(|deadline| deadline.saturating_duration_since(Instant::now()))
.map(|remaining| remaining.min(Duration::from_millis(100)).as_millis() as u64)
.unwrap_or(100);
if wait_ms == 0 {
let _ = started.process.terminate().await;
let output = shell_output(
stdout,
stderr,
aggregated_output,
124,
start.elapsed(),
/*timed_out*/ true,
);
return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout {
output: Box::new(output),
})));
}
let response = started
.process
.read(after_seq, /*max_bytes*/ None, Some(wait_ms))
.await
.map_err(|err| ToolError::Rejected(err.to_string()))?;
for chunk in response.chunks {
let stream = chunk.stream;
let bytes = chunk.chunk.into_inner();
if let Some(stdout_stream) = stdout_stream.as_ref() {
emit_output_delta(stdout_stream, stream, bytes.clone()).await;
}
match stream {
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
append_capped(&mut stdout, &bytes);
append_capped(&mut aggregated_output, &bytes);
}
ExecOutputStream::Stderr => {
append_capped(&mut stderr, &bytes);
append_capped(&mut aggregated_output, &bytes);
}
}
}
if let Some(failure) = response.failure {
return Err(ToolError::Rejected(failure));
}
if response.exited {
exit_code = response.exit_code;
}
if response.closed {
break;
}
after_seq = response.next_seq.checked_sub(1);
}
let output = shell_output(
stdout,
stderr,
aggregated_output,
exit_code.unwrap_or(-1),
start.elapsed(),
/*timed_out*/ false,
);
if is_likely_sandbox_denied(exec_env.sandbox, &output) {
return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied {
output: Box::new(output),
network_policy_decision: None,
})));
}
Ok(output)
}

View File

@@ -0,0 +1,412 @@
use crate::exec::ExecExpiration;
use crate::exec::StdoutStream;
use crate::exec::is_likely_sandbox_denied;
use crate::sandboxing::ExecRequest;
use crate::tools::runtimes::shell::ShellRequest;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use codex_exec_server::Environment;
use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams as ExecServerParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ProcessId;
use codex_exec_server::ReadResponse;
use codex_protocol::error::CodexErr;
use codex_protocol::error::SandboxErr;
use codex_protocol::exec_output::ExecToolCallOutput;
use codex_protocol::exec_output::StreamOutput;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandOutputDeltaEvent;
use codex_sandboxing::SandboxType;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
/// Executes the remote branch of `ShellRuntime`.
///
/// Local shell requests still go through `execute_env(...)`. Remote
/// environments have a different contract: start a process on the exec
/// backend, page output until the session closes, stream deltas to the client,
/// and then synthesize `ExecToolCallOutput`.
pub(super) struct RemoteShellExecutor {
environment: Arc<Environment>,
call_id: String,
timeout_ms: Option<u64>,
sandbox: SandboxType,
stdout_stream: Option<StdoutStream>,
network_denial_cancellation_token: Option<CancellationToken>,
}
/// Minimal remote-process contract needed by shell's remote exec path.
///
/// The shell runtime only needs retained-output reads plus termination. Keeping
/// this narrow makes the remote-only loop unit-testable without a live
/// exec-server backend.
pub(super) trait RemoteShellProcess: Send + Sync {
fn read(
&self,
after_seq: Option<u64>,
wait_ms: u64,
) -> impl std::future::Future<Output = Result<ReadResponse, String>> + Send;
fn terminate(&self) -> impl std::future::Future<Output = ()> + Send;
}
struct ExecServerRemoteShellProcess {
inner: Arc<dyn ExecProcess>,
}
impl RemoteShellExecutor {
pub(super) fn new(
req: &ShellRequest,
exec_env: &ExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
) -> Self {
Self {
environment: Arc::clone(&req.environment),
call_id: ctx.call_id.clone(),
timeout_ms: req.timeout_ms,
sandbox: exec_env.sandbox,
stdout_stream: crate::tools::runtimes::shell::ShellRuntime::stdout_stream(ctx),
network_denial_cancellation_token: attempt.network_denial_cancellation_token.clone(),
}
}
pub(super) async fn run(self, exec_env: ExecRequest) -> Result<ExecToolCallOutput, ToolError> {
let started = self
.environment
.get_exec_backend()
.start(exec_server_params_for_request(&exec_env, &self.call_id))
.await
.map_err(|err| ToolError::Rejected(err.to_string()))?;
let process = ExecServerRemoteShellProcess {
inner: started.process,
};
self.run_with_process(&process).await
}
async fn run_with_process<P: RemoteShellProcess>(
self,
process: &P,
) -> Result<ExecToolCallOutput, ToolError> {
let start = Instant::now();
let expiration: ExecExpiration = self.timeout_ms.into();
let timeout = expiration.timeout_ms().map(Duration::from_millis);
let deadline = timeout.map(|timeout| start + timeout);
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut aggregated_output = Vec::new();
let mut after_seq = None;
let mut exit_code = None;
loop {
if let Some(cancellation) = self.network_denial_cancellation_token.as_ref()
&& cancellation.is_cancelled()
{
process.terminate().await;
return Err(ToolError::Rejected(
"Network access was denied by the Codex sandbox network proxy.".to_string(),
));
}
let wait_ms = deadline
.map(|deadline| deadline.saturating_duration_since(Instant::now()))
.map(|remaining| remaining.min(Duration::from_millis(100)).as_millis() as u64)
.unwrap_or(100);
if wait_ms == 0 {
process.terminate().await;
let output = shell_output(
stdout,
stderr,
aggregated_output,
124,
start.elapsed(),
/*timed_out*/ true,
);
return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout {
output: Box::new(output),
})));
}
let response = process
.read(after_seq, wait_ms)
.await
.map_err(ToolError::Rejected)?;
for chunk in response.chunks {
let stream = chunk.stream;
let bytes = chunk.chunk.into_inner();
if let Some(stdout_stream) = self.stdout_stream.as_ref() {
emit_output_delta(stdout_stream, stream, bytes.clone()).await;
}
match stream {
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
append_capped(&mut stdout, &bytes);
append_capped(&mut aggregated_output, &bytes);
}
ExecOutputStream::Stderr => {
append_capped(&mut stderr, &bytes);
append_capped(&mut aggregated_output, &bytes);
}
}
}
if let Some(failure) = response.failure {
return Err(ToolError::Rejected(failure));
}
if response.exited {
exit_code = response.exit_code;
}
if response.closed {
break;
}
after_seq = response.next_seq.checked_sub(1);
}
let output = shell_output(
stdout,
stderr,
aggregated_output,
exit_code.unwrap_or(-1),
start.elapsed(),
/*timed_out*/ false,
);
if is_likely_sandbox_denied(self.sandbox, &output) {
return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied {
output: Box::new(output),
network_policy_decision: None,
})));
}
Ok(output)
}
}
impl RemoteShellProcess for ExecServerRemoteShellProcess {
async fn read(&self, after_seq: Option<u64>, wait_ms: u64) -> Result<ReadResponse, String> {
self.inner
.read(after_seq, /*max_bytes*/ None, Some(wait_ms))
.await
.map_err(|err| err.to_string())
}
async fn terminate(&self) {
let _ = self.inner.terminate().await;
}
}
fn exec_server_env_for_request(
request: &ExecRequest,
) -> (
Option<codex_exec_server::ExecEnvPolicy>,
HashMap<String, String>,
) {
if let Some(exec_server_env_config) = &request.exec_server_env_config {
let env = exec_server_env_config.env_overlay(&request.env);
(Some(exec_server_env_config.policy.clone()), env)
} else {
(None, request.env.clone())
}
}
fn exec_server_params_for_request(request: &ExecRequest, call_id: &str) -> ExecServerParams {
let (env_policy, env) = exec_server_env_for_request(request);
ExecServerParams {
process_id: ProcessId::from(call_id.to_string()),
argv: request.command.clone(),
cwd: request.cwd.to_path_buf(),
env_policy,
env,
tty: false,
pipe_stdin: false,
arg0: request.arg0.clone(),
}
}
fn append_capped(dst: &mut Vec<u8>, src: &[u8]) {
if dst.len() >= DEFAULT_OUTPUT_BYTES_CAP {
return;
}
let remaining = DEFAULT_OUTPUT_BYTES_CAP.saturating_sub(dst.len());
dst.extend_from_slice(&src[..src.len().min(remaining)]);
}
async fn emit_output_delta(
stdout_stream: &StdoutStream,
call_stream: ExecOutputStream,
chunk: Vec<u8>,
) {
let stream = match call_stream {
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
codex_protocol::protocol::ExecOutputStream::Stdout
}
ExecOutputStream::Stderr => codex_protocol::protocol::ExecOutputStream::Stderr,
};
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stdout_stream.call_id.clone(),
stream,
chunk,
});
let event = Event {
id: stdout_stream.sub_id.clone(),
msg,
};
let _ = stdout_stream.tx_event.send(event).await;
}
fn shell_output(
stdout: Vec<u8>,
stderr: Vec<u8>,
aggregated_output: Vec<u8>,
exit_code: i32,
duration: Duration,
timed_out: bool,
) -> ExecToolCallOutput {
let stdout = StreamOutput {
text: stdout,
truncated_after_lines: None,
};
let stderr = StreamOutput {
text: stderr,
truncated_after_lines: None,
};
ExecToolCallOutput {
exit_code,
stdout: stdout.from_utf8_lossy(),
stderr: stderr.from_utf8_lossy(),
aggregated_output: StreamOutput {
text: codex_protocol::exec_output::bytes_to_string_smart(&aggregated_output),
truncated_after_lines: None,
},
duration,
timed_out,
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_exec_server::Environment;
use codex_exec_server::ExecOutputStream;
use codex_exec_server::ProcessOutputChunk;
use pretty_assertions::assert_eq;
use std::collections::VecDeque;
use std::sync::Mutex;
struct FakeRemoteShellProcess {
responses: Mutex<VecDeque<ReadResponse>>,
terminated: Mutex<bool>,
}
impl FakeRemoteShellProcess {
fn new(responses: Vec<ReadResponse>) -> Self {
Self {
responses: Mutex::new(VecDeque::from(responses)),
terminated: Mutex::new(false),
}
}
fn terminated(&self) -> bool {
*self
.terminated
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
impl RemoteShellProcess for FakeRemoteShellProcess {
async fn read(
&self,
_after_seq: Option<u64>,
_wait_ms: u64,
) -> Result<ReadResponse, String> {
self.responses
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.pop_front()
.ok_or_else(|| "no more responses".to_string())
}
async fn terminate(&self) {
*self
.terminated
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = true;
}
}
fn chunk(seq: u64, stream: ExecOutputStream, bytes: &[u8]) -> ProcessOutputChunk {
ProcessOutputChunk {
seq,
stream,
chunk: bytes.to_vec().into(),
}
}
fn test_executor(timeout_ms: Option<u64>) -> RemoteShellExecutor {
RemoteShellExecutor {
environment: Arc::new(Environment::default_for_tests()),
call_id: "call-1".to_string(),
timeout_ms,
sandbox: SandboxType::None,
stdout_stream: None,
network_denial_cancellation_token: None,
}
}
#[tokio::test]
async fn remote_shell_executor_collects_output_until_closed() {
let executor = test_executor(/*timeout_ms*/ None);
let process = FakeRemoteShellProcess::new(vec![
ReadResponse {
chunks: vec![
chunk(1, ExecOutputStream::Stdout, b"hello "),
chunk(2, ExecOutputStream::Stderr, b"warn"),
],
next_seq: 3,
exited: false,
exit_code: None,
closed: false,
failure: None,
},
ReadResponse {
chunks: vec![chunk(3, ExecOutputStream::Stdout, b"world")],
next_seq: 4,
exited: true,
exit_code: Some(0),
closed: true,
failure: None,
},
]);
let output = executor
.run_with_process(&process)
.await
.expect("remote shell output should succeed");
assert_eq!(output.exit_code, 0);
assert_eq!(output.stdout, "hello world");
assert_eq!(output.stderr, "warn");
assert_eq!(output.aggregated_output.text, "hello warnworld");
assert!(!output.timed_out);
}
#[tokio::test]
async fn remote_shell_executor_times_out_and_terminates_process() {
let executor = test_executor(/*timeout_ms*/ Some(0));
let process = FakeRemoteShellProcess::new(Vec::new());
let err = executor
.run_with_process(&process)
.await
.expect_err("timeout should fail");
assert!(process.terminated());
match err {
ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { .. })) => {}
other => panic!("expected timeout error, got {other:?}"),
}
}
}

View File

@@ -380,6 +380,85 @@ async fn shell_command_routes_to_selected_remote_environment() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shell_command_resolves_relative_workdir_in_selected_remote_environment() -> Result<()> {
skip_if_no_network!(Ok(()));
let Some(_remote_env) = get_remote_test_env() else {
return Ok(());
};
let server = start_mock_server().await;
let test = shell_command_test(&server).await?;
let local_cwd = TempDir::new()?;
let local_nested = local_cwd.path().join("nested");
fs::create_dir_all(&local_nested)?;
fs::write(local_nested.join("marker.txt"), "local-routing")?;
let local_selection = TurnEnvironmentSelection {
environment_id: LOCAL_ENVIRONMENT_ID.to_string(),
cwd: local_cwd.path().abs(),
};
let remote_cwd = PathBuf::from(format!(
"/tmp/codex-shell-remote-workdir-{}",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis()
))
.abs();
let remote_nested = remote_cwd.join("nested");
test.fs()
.create_directory(
&remote_nested,
CreateDirectoryOptions { recursive: true },
/*sandbox*/ None,
)
.await?;
test.fs()
.write_file(
&remote_nested.join("marker.txt"),
b"remote-routing".to_vec(),
/*sandbox*/ None,
)
.await?;
let remote_selection = TurnEnvironmentSelection {
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
cwd: remote_cwd.clone(),
};
let output = shell_command_routing_output(
&test,
&server,
"call-shell-remote-workdir",
json!({
"command": "cat marker.txt",
"workdir": "nested",
"login": false,
"timeout_ms": 1_000,
"environment_id": REMOTE_ENVIRONMENT_ID,
}),
Some(vec![local_selection, remote_selection]),
)
.await?;
assert!(
output.contains("remote-routing"),
"unexpected shell_command output: {output}",
);
assert!(
!output.contains("local-routing"),
"shell_command should resolve workdir in the selected remote environment: {output}",
);
test.fs()
.remove(
&remote_cwd,
RemoveOptions {
recursive: true,
force: true,
},
/*sandbox*/ None,
)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_test_env_sandboxed_read_allows_readable_root() -> Result<()> {
skip_if_no_network!(Ok(()));