From bf7937b4a70720a89d9d45dc824bbcf510028e5d Mon Sep 17 00:00:00 2001 From: starr-openai Date: Tue, 5 May 2026 12:45:14 -0700 Subject: [PATCH] codex: extract remote shell runtime path Co-authored-by: Codex --- codex-rs/core/src/tools/runtimes/shell.rs | 209 +-------- .../src/tools/runtimes/shell/remote_exec.rs | 412 ++++++++++++++++++ codex-rs/core/tests/suite/remote_env.rs | 79 ++++ 3 files changed, 495 insertions(+), 205 deletions(-) create mode 100644 codex-rs/core/src/tools/runtimes/shell/remote_exec.rs diff --git a/codex-rs/core/src/tools/runtimes/shell.rs b/codex-rs/core/src/tools/runtimes/shell.rs index 546d07514e..e036a56631 100644 --- a/codex-rs/core/src/tools/runtimes/shell.rs +++ b/codex-rs/core/src/tools/runtimes/shell.rs @@ -4,15 +4,14 @@ Runtime: shell Executes shell requests under the orchestrator: asks for approval when needed, builds sandbox transform inputs, and runs them under the current SandboxAttempt. */ +pub(crate) mod remote_exec; #[cfg(unix)] pub(crate) mod unix_escalation; pub(crate) mod zsh_fork_backend; use crate::command_canonicalization::canonicalize_command_for_approval; use crate::exec::ExecCapturePolicy; -use crate::exec::ExecExpiration; use crate::exec::StdoutStream; -use crate::exec::is_likely_sandbox_denied; use crate::guardian::GuardianApprovalRequest; use crate::guardian::GuardianNetworkAccessTrigger; use crate::guardian::review_approval_request; @@ -40,28 +39,16 @@ use crate::tools::sandboxing::managed_network_for_sandbox_permissions; use crate::tools::sandboxing::sandbox_override_for_first_attempt; use crate::tools::sandboxing::with_cached_approval; use codex_exec_server::Environment; -use codex_exec_server::ExecOutputStream; -use codex_exec_server::ExecParams as ExecServerParams; -use codex_exec_server::ProcessId; use codex_network_proxy::NetworkProxy; -use codex_protocol::error::CodexErr; -use codex_protocol::error::SandboxErr; use codex_protocol::exec_output::ExecToolCallOutput; -use codex_protocol::exec_output::StreamOutput; use codex_protocol::models::AdditionalPermissionProfile; -use codex_protocol::protocol::Event; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::ExecCommandOutputDeltaEvent; use codex_protocol::protocol::ReviewDecision; use codex_sandboxing::SandboxablePreference; use codex_shell_command::powershell::prefix_powershell_script_with_utf8; use codex_utils_absolute_path::AbsolutePathBuf; -use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; use futures::future::BoxFuture; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; -use tokio::time::Instant; #[derive(Clone, Debug)] pub struct ShellRequest { @@ -317,7 +304,9 @@ impl ToolRuntime for ShellRuntime { .map_err(|err| ToolError::Codex(err.into()))?; exec_env.exec_server_env_config = req.exec_server_env_config.clone(); let out = if req.environment.is_remote() { - run_remote_shell(req, exec_env, attempt, ctx).await? + remote_exec::RemoteShellExecutor::new(req, &exec_env, attempt, ctx) + .run(exec_env) + .await? } else { execute_env(exec_env, Self::stdout_stream(ctx)) .await @@ -326,193 +315,3 @@ impl ToolRuntime for ShellRuntime { Ok(out) } } - -fn exec_server_env_for_request( - request: &ExecRequest, -) -> ( - Option, - HashMap, -) { - if let Some(exec_server_env_config) = &request.exec_server_env_config { - let env = exec_server_env_config.env_overlay(&request.env); - (Some(exec_server_env_config.policy.clone()), env) - } else { - (None, request.env.clone()) - } -} - -fn exec_server_params_for_request(request: &ExecRequest, call_id: &str) -> ExecServerParams { - let (env_policy, env) = exec_server_env_for_request(request); - ExecServerParams { - process_id: ProcessId::from(call_id.to_string()), - argv: request.command.clone(), - cwd: request.cwd.to_path_buf(), - env_policy, - env, - tty: false, - pipe_stdin: false, - arg0: request.arg0.clone(), - } -} - -fn append_capped(dst: &mut Vec, src: &[u8]) { - if dst.len() >= DEFAULT_OUTPUT_BYTES_CAP { - return; - } - let remaining = DEFAULT_OUTPUT_BYTES_CAP.saturating_sub(dst.len()); - dst.extend_from_slice(&src[..src.len().min(remaining)]); -} - -async fn emit_output_delta( - stdout_stream: &StdoutStream, - call_stream: ExecOutputStream, - chunk: Vec, -) { - let stream = match call_stream { - ExecOutputStream::Stdout | ExecOutputStream::Pty => { - codex_protocol::protocol::ExecOutputStream::Stdout - } - ExecOutputStream::Stderr => codex_protocol::protocol::ExecOutputStream::Stderr, - }; - let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { - call_id: stdout_stream.call_id.clone(), - stream, - chunk, - }); - let event = Event { - id: stdout_stream.sub_id.clone(), - msg, - }; - let _ = stdout_stream.tx_event.send(event).await; -} - -fn shell_output( - stdout: Vec, - stderr: Vec, - aggregated_output: Vec, - exit_code: i32, - duration: Duration, - timed_out: bool, -) -> ExecToolCallOutput { - let stdout = StreamOutput { - text: stdout, - truncated_after_lines: None, - }; - let stderr = StreamOutput { - text: stderr, - truncated_after_lines: None, - }; - ExecToolCallOutput { - exit_code, - stdout: stdout.from_utf8_lossy(), - stderr: stderr.from_utf8_lossy(), - aggregated_output: StreamOutput { - text: codex_protocol::exec_output::bytes_to_string_smart(&aggregated_output), - truncated_after_lines: None, - }, - duration, - timed_out, - } -} - -async fn run_remote_shell( - req: &ShellRequest, - exec_env: ExecRequest, - attempt: &SandboxAttempt<'_>, - ctx: &ToolCtx, -) -> Result { - let start = Instant::now(); - let expiration: ExecExpiration = req.timeout_ms.into(); - let timeout = expiration.timeout_ms().map(Duration::from_millis); - let deadline = timeout.map(|timeout| start + timeout); - let started = req - .environment - .get_exec_backend() - .start(exec_server_params_for_request(&exec_env, &ctx.call_id)) - .await - .map_err(|err| ToolError::Rejected(err.to_string()))?; - let stdout_stream = ShellRuntime::stdout_stream(ctx); - let mut stdout = Vec::new(); - let mut stderr = Vec::new(); - let mut aggregated_output = Vec::new(); - let mut after_seq = None; - let mut exit_code = None; - - loop { - if let Some(cancellation) = attempt.network_denial_cancellation_token.as_ref() - && cancellation.is_cancelled() - { - let _ = started.process.terminate().await; - return Err(ToolError::Rejected( - "Network access was denied by the Codex sandbox network proxy.".to_string(), - )); - } - let wait_ms = deadline - .map(|deadline| deadline.saturating_duration_since(Instant::now())) - .map(|remaining| remaining.min(Duration::from_millis(100)).as_millis() as u64) - .unwrap_or(100); - if wait_ms == 0 { - let _ = started.process.terminate().await; - let output = shell_output( - stdout, - stderr, - aggregated_output, - 124, - start.elapsed(), - /*timed_out*/ true, - ); - return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { - output: Box::new(output), - }))); - } - - let response = started - .process - .read(after_seq, /*max_bytes*/ None, Some(wait_ms)) - .await - .map_err(|err| ToolError::Rejected(err.to_string()))?; - for chunk in response.chunks { - let stream = chunk.stream; - let bytes = chunk.chunk.into_inner(); - if let Some(stdout_stream) = stdout_stream.as_ref() { - emit_output_delta(stdout_stream, stream, bytes.clone()).await; - } - match stream { - ExecOutputStream::Stdout | ExecOutputStream::Pty => { - append_capped(&mut stdout, &bytes); - append_capped(&mut aggregated_output, &bytes); - } - ExecOutputStream::Stderr => { - append_capped(&mut stderr, &bytes); - append_capped(&mut aggregated_output, &bytes); - } - } - } - if let Some(failure) = response.failure { - return Err(ToolError::Rejected(failure)); - } - if response.exited { - exit_code = response.exit_code; - } - if response.closed { - break; - } - after_seq = response.next_seq.checked_sub(1); - } - - let output = shell_output( - stdout, - stderr, - aggregated_output, - exit_code.unwrap_or(-1), - start.elapsed(), - /*timed_out*/ false, - ); - if is_likely_sandbox_denied(exec_env.sandbox, &output) { - return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied { - output: Box::new(output), - network_policy_decision: None, - }))); - } - Ok(output) -} diff --git a/codex-rs/core/src/tools/runtimes/shell/remote_exec.rs b/codex-rs/core/src/tools/runtimes/shell/remote_exec.rs new file mode 100644 index 0000000000..57baa129b2 --- /dev/null +++ b/codex-rs/core/src/tools/runtimes/shell/remote_exec.rs @@ -0,0 +1,412 @@ +use crate::exec::ExecExpiration; +use crate::exec::StdoutStream; +use crate::exec::is_likely_sandbox_denied; +use crate::sandboxing::ExecRequest; +use crate::tools::runtimes::shell::ShellRequest; +use crate::tools::sandboxing::SandboxAttempt; +use crate::tools::sandboxing::ToolCtx; +use crate::tools::sandboxing::ToolError; +use codex_exec_server::Environment; +use codex_exec_server::ExecOutputStream; +use codex_exec_server::ExecParams as ExecServerParams; +use codex_exec_server::ExecProcess; +use codex_exec_server::ProcessId; +use codex_exec_server::ReadResponse; +use codex_protocol::error::CodexErr; +use codex_protocol::error::SandboxErr; +use codex_protocol::exec_output::ExecToolCallOutput; +use codex_protocol::exec_output::StreamOutput; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ExecCommandOutputDeltaEvent; +use codex_sandboxing::SandboxType; +use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::Instant; +use tokio_util::sync::CancellationToken; + +/// Executes the remote branch of `ShellRuntime`. +/// +/// Local shell requests still go through `execute_env(...)`. Remote +/// environments have a different contract: start a process on the exec +/// backend, page output until the session closes, stream deltas to the client, +/// and then synthesize `ExecToolCallOutput`. +pub(super) struct RemoteShellExecutor { + environment: Arc, + call_id: String, + timeout_ms: Option, + sandbox: SandboxType, + stdout_stream: Option, + network_denial_cancellation_token: Option, +} + +/// Minimal remote-process contract needed by shell's remote exec path. +/// +/// The shell runtime only needs retained-output reads plus termination. Keeping +/// this narrow makes the remote-only loop unit-testable without a live +/// exec-server backend. +pub(super) trait RemoteShellProcess: Send + Sync { + fn read( + &self, + after_seq: Option, + wait_ms: u64, + ) -> impl std::future::Future> + Send; + + fn terminate(&self) -> impl std::future::Future + Send; +} + +struct ExecServerRemoteShellProcess { + inner: Arc, +} + +impl RemoteShellExecutor { + pub(super) fn new( + req: &ShellRequest, + exec_env: &ExecRequest, + attempt: &SandboxAttempt<'_>, + ctx: &ToolCtx, + ) -> Self { + Self { + environment: Arc::clone(&req.environment), + call_id: ctx.call_id.clone(), + timeout_ms: req.timeout_ms, + sandbox: exec_env.sandbox, + stdout_stream: crate::tools::runtimes::shell::ShellRuntime::stdout_stream(ctx), + network_denial_cancellation_token: attempt.network_denial_cancellation_token.clone(), + } + } + + pub(super) async fn run(self, exec_env: ExecRequest) -> Result { + let started = self + .environment + .get_exec_backend() + .start(exec_server_params_for_request(&exec_env, &self.call_id)) + .await + .map_err(|err| ToolError::Rejected(err.to_string()))?; + let process = ExecServerRemoteShellProcess { + inner: started.process, + }; + self.run_with_process(&process).await + } + + async fn run_with_process( + self, + process: &P, + ) -> Result { + let start = Instant::now(); + let expiration: ExecExpiration = self.timeout_ms.into(); + let timeout = expiration.timeout_ms().map(Duration::from_millis); + let deadline = timeout.map(|timeout| start + timeout); + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + let mut aggregated_output = Vec::new(); + let mut after_seq = None; + let mut exit_code = None; + + loop { + if let Some(cancellation) = self.network_denial_cancellation_token.as_ref() + && cancellation.is_cancelled() + { + process.terminate().await; + return Err(ToolError::Rejected( + "Network access was denied by the Codex sandbox network proxy.".to_string(), + )); + } + let wait_ms = deadline + .map(|deadline| deadline.saturating_duration_since(Instant::now())) + .map(|remaining| remaining.min(Duration::from_millis(100)).as_millis() as u64) + .unwrap_or(100); + if wait_ms == 0 { + process.terminate().await; + let output = shell_output( + stdout, + stderr, + aggregated_output, + 124, + start.elapsed(), + /*timed_out*/ true, + ); + return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { + output: Box::new(output), + }))); + } + + let response = process + .read(after_seq, wait_ms) + .await + .map_err(ToolError::Rejected)?; + for chunk in response.chunks { + let stream = chunk.stream; + let bytes = chunk.chunk.into_inner(); + if let Some(stdout_stream) = self.stdout_stream.as_ref() { + emit_output_delta(stdout_stream, stream, bytes.clone()).await; + } + match stream { + ExecOutputStream::Stdout | ExecOutputStream::Pty => { + append_capped(&mut stdout, &bytes); + append_capped(&mut aggregated_output, &bytes); + } + ExecOutputStream::Stderr => { + append_capped(&mut stderr, &bytes); + append_capped(&mut aggregated_output, &bytes); + } + } + } + if let Some(failure) = response.failure { + return Err(ToolError::Rejected(failure)); + } + if response.exited { + exit_code = response.exit_code; + } + if response.closed { + break; + } + after_seq = response.next_seq.checked_sub(1); + } + + let output = shell_output( + stdout, + stderr, + aggregated_output, + exit_code.unwrap_or(-1), + start.elapsed(), + /*timed_out*/ false, + ); + if is_likely_sandbox_denied(self.sandbox, &output) { + return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied { + output: Box::new(output), + network_policy_decision: None, + }))); + } + Ok(output) + } +} + +impl RemoteShellProcess for ExecServerRemoteShellProcess { + async fn read(&self, after_seq: Option, wait_ms: u64) -> Result { + self.inner + .read(after_seq, /*max_bytes*/ None, Some(wait_ms)) + .await + .map_err(|err| err.to_string()) + } + + async fn terminate(&self) { + let _ = self.inner.terminate().await; + } +} + +fn exec_server_env_for_request( + request: &ExecRequest, +) -> ( + Option, + HashMap, +) { + if let Some(exec_server_env_config) = &request.exec_server_env_config { + let env = exec_server_env_config.env_overlay(&request.env); + (Some(exec_server_env_config.policy.clone()), env) + } else { + (None, request.env.clone()) + } +} + +fn exec_server_params_for_request(request: &ExecRequest, call_id: &str) -> ExecServerParams { + let (env_policy, env) = exec_server_env_for_request(request); + ExecServerParams { + process_id: ProcessId::from(call_id.to_string()), + argv: request.command.clone(), + cwd: request.cwd.to_path_buf(), + env_policy, + env, + tty: false, + pipe_stdin: false, + arg0: request.arg0.clone(), + } +} + +fn append_capped(dst: &mut Vec, src: &[u8]) { + if dst.len() >= DEFAULT_OUTPUT_BYTES_CAP { + return; + } + let remaining = DEFAULT_OUTPUT_BYTES_CAP.saturating_sub(dst.len()); + dst.extend_from_slice(&src[..src.len().min(remaining)]); +} + +async fn emit_output_delta( + stdout_stream: &StdoutStream, + call_stream: ExecOutputStream, + chunk: Vec, +) { + let stream = match call_stream { + ExecOutputStream::Stdout | ExecOutputStream::Pty => { + codex_protocol::protocol::ExecOutputStream::Stdout + } + ExecOutputStream::Stderr => codex_protocol::protocol::ExecOutputStream::Stderr, + }; + let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { + call_id: stdout_stream.call_id.clone(), + stream, + chunk, + }); + let event = Event { + id: stdout_stream.sub_id.clone(), + msg, + }; + let _ = stdout_stream.tx_event.send(event).await; +} + +fn shell_output( + stdout: Vec, + stderr: Vec, + aggregated_output: Vec, + exit_code: i32, + duration: Duration, + timed_out: bool, +) -> ExecToolCallOutput { + let stdout = StreamOutput { + text: stdout, + truncated_after_lines: None, + }; + let stderr = StreamOutput { + text: stderr, + truncated_after_lines: None, + }; + ExecToolCallOutput { + exit_code, + stdout: stdout.from_utf8_lossy(), + stderr: stderr.from_utf8_lossy(), + aggregated_output: StreamOutput { + text: codex_protocol::exec_output::bytes_to_string_smart(&aggregated_output), + truncated_after_lines: None, + }, + duration, + timed_out, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_exec_server::Environment; + use codex_exec_server::ExecOutputStream; + use codex_exec_server::ProcessOutputChunk; + use pretty_assertions::assert_eq; + use std::collections::VecDeque; + use std::sync::Mutex; + + struct FakeRemoteShellProcess { + responses: Mutex>, + terminated: Mutex, + } + + impl FakeRemoteShellProcess { + fn new(responses: Vec) -> Self { + Self { + responses: Mutex::new(VecDeque::from(responses)), + terminated: Mutex::new(false), + } + } + + fn terminated(&self) -> bool { + *self + .terminated + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + } + } + + impl RemoteShellProcess for FakeRemoteShellProcess { + async fn read( + &self, + _after_seq: Option, + _wait_ms: u64, + ) -> Result { + self.responses + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .pop_front() + .ok_or_else(|| "no more responses".to_string()) + } + + async fn terminate(&self) { + *self + .terminated + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = true; + } + } + + fn chunk(seq: u64, stream: ExecOutputStream, bytes: &[u8]) -> ProcessOutputChunk { + ProcessOutputChunk { + seq, + stream, + chunk: bytes.to_vec().into(), + } + } + + fn test_executor(timeout_ms: Option) -> RemoteShellExecutor { + RemoteShellExecutor { + environment: Arc::new(Environment::default_for_tests()), + call_id: "call-1".to_string(), + timeout_ms, + sandbox: SandboxType::None, + stdout_stream: None, + network_denial_cancellation_token: None, + } + } + + #[tokio::test] + async fn remote_shell_executor_collects_output_until_closed() { + let executor = test_executor(/*timeout_ms*/ None); + let process = FakeRemoteShellProcess::new(vec![ + ReadResponse { + chunks: vec![ + chunk(1, ExecOutputStream::Stdout, b"hello "), + chunk(2, ExecOutputStream::Stderr, b"warn"), + ], + next_seq: 3, + exited: false, + exit_code: None, + closed: false, + failure: None, + }, + ReadResponse { + chunks: vec![chunk(3, ExecOutputStream::Stdout, b"world")], + next_seq: 4, + exited: true, + exit_code: Some(0), + closed: true, + failure: None, + }, + ]); + + let output = executor + .run_with_process(&process) + .await + .expect("remote shell output should succeed"); + + assert_eq!(output.exit_code, 0); + assert_eq!(output.stdout, "hello world"); + assert_eq!(output.stderr, "warn"); + assert_eq!(output.aggregated_output.text, "hello warnworld"); + assert!(!output.timed_out); + } + + #[tokio::test] + async fn remote_shell_executor_times_out_and_terminates_process() { + let executor = test_executor(/*timeout_ms*/ Some(0)); + let process = FakeRemoteShellProcess::new(Vec::new()); + + let err = executor + .run_with_process(&process) + .await + .expect_err("timeout should fail"); + + assert!(process.terminated()); + match err { + ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { .. })) => {} + other => panic!("expected timeout error, got {other:?}"), + } + } +} diff --git a/codex-rs/core/tests/suite/remote_env.rs b/codex-rs/core/tests/suite/remote_env.rs index e2e451a7d0..6b99879eeb 100644 --- a/codex-rs/core/tests/suite/remote_env.rs +++ b/codex-rs/core/tests/suite/remote_env.rs @@ -380,6 +380,85 @@ async fn shell_command_routes_to_selected_remote_environment() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn shell_command_resolves_relative_workdir_in_selected_remote_environment() -> Result<()> { + skip_if_no_network!(Ok(())); + let Some(_remote_env) = get_remote_test_env() else { + return Ok(()); + }; + + let server = start_mock_server().await; + let test = shell_command_test(&server).await?; + let local_cwd = TempDir::new()?; + let local_nested = local_cwd.path().join("nested"); + fs::create_dir_all(&local_nested)?; + fs::write(local_nested.join("marker.txt"), "local-routing")?; + let local_selection = TurnEnvironmentSelection { + environment_id: LOCAL_ENVIRONMENT_ID.to_string(), + cwd: local_cwd.path().abs(), + }; + let remote_cwd = PathBuf::from(format!( + "/tmp/codex-shell-remote-workdir-{}", + SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() + )) + .abs(); + let remote_nested = remote_cwd.join("nested"); + test.fs() + .create_directory( + &remote_nested, + CreateDirectoryOptions { recursive: true }, + /*sandbox*/ None, + ) + .await?; + test.fs() + .write_file( + &remote_nested.join("marker.txt"), + b"remote-routing".to_vec(), + /*sandbox*/ None, + ) + .await?; + let remote_selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: remote_cwd.clone(), + }; + + let output = shell_command_routing_output( + &test, + &server, + "call-shell-remote-workdir", + json!({ + "command": "cat marker.txt", + "workdir": "nested", + "login": false, + "timeout_ms": 1_000, + "environment_id": REMOTE_ENVIRONMENT_ID, + }), + Some(vec![local_selection, remote_selection]), + ) + .await?; + assert!( + output.contains("remote-routing"), + "unexpected shell_command output: {output}", + ); + assert!( + !output.contains("local-routing"), + "shell_command should resolve workdir in the selected remote environment: {output}", + ); + + test.fs() + .remove( + &remote_cwd, + RemoveOptions { + recursive: true, + force: true, + }, + /*sandbox*/ None, + ) + .await?; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_test_env_sandboxed_read_allows_readable_root() -> Result<()> { skip_if_no_network!(Ok(()));