Add a high-level remote shell exec RPC

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-20 17:40:35 -07:00
parent f5ffc0373d
commit e6c9f12a40
9 changed files with 304 additions and 20 deletions

View File

@@ -35,10 +35,10 @@ use crate::text_encoding::bytes_to_string_smart;
use crate::tools::sandboxing::SandboxablePreference;
use codex_exec_server::Environment as ExecutorEnvironment;
use codex_exec_server::ExecOutputStream as ExecutorOutputStream;
use codex_exec_server::ExecParams as ExecutorExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ProcessOutputChunk as ExecutorProcessOutputChunk;
use codex_exec_server::ReadParams as ExecutorReadParams;
use codex_exec_server::ShellExecParams as ExecutorShellExecParams;
use codex_network_proxy::NetworkProxy;
#[cfg(any(target_os = "windows", test))]
use codex_protocol::permissions::FileSystemSandboxKind;
@@ -46,8 +46,6 @@ use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use codex_utils_pty::process_group::kill_child_process_group;
use uuid::Uuid;
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
// Hardcode these since it does not seem worth including the libc crate just
@@ -414,38 +412,61 @@ async fn execute_exec_request_via_environment(
arg0,
} = exec_request;
if matches!(expiration, ExecExpiration::Cancellation(_)) {
return Err(CodexErr::Io(io::Error::new(
io::ErrorKind::Unsupported,
"remote shell/exec does not yet support cancellation-backed expiration",
)));
}
if let Some(network) = network.as_ref() {
network.apply_to_env(&mut env);
}
let process_id = format!("shell-{}", Uuid::new_v4());
let params = ExecutorExecParams {
process_id: process_id.clone(),
argv: command,
let params = ExecutorShellExecParams {
command,
cwd,
env,
tty: false,
timeout_ms: expiration.timeout_ms(),
output_bytes_cap: capture_policy.retained_bytes_cap(),
arg0,
};
let executor = environment.get_executor();
let start = Instant::now();
executor
.start(params)
let response = environment
.shell_exec(params)
.await
.map_err(exec_server_error_to_codex)?;
if let Some(after_spawn) = after_spawn {
after_spawn();
}
if let Some(stream) = stdout_stream.as_ref() {
let stdout = response.stdout.clone().into_inner();
if !stdout.is_empty() {
emit_output_delta(Some(stream), /*is_stderr*/ false, stdout).await;
}
let stderr = response.stderr.clone().into_inner();
if !stderr.is_empty() {
emit_output_delta(Some(stream), /*is_stderr*/ true, stderr).await;
}
}
let raw_output_result = consume_exec_server_output(
executor,
&process_id,
expiration,
capture_policy,
stdout_stream,
)
.await;
let raw_output_result = Ok(RawExecToolCallOutput {
exit_status: synthetic_exit_status(response.exit_code),
stdout: StreamOutput {
text: response.stdout.into_inner(),
truncated_after_lines: None,
},
stderr: StreamOutput {
text: response.stderr.into_inner(),
truncated_after_lines: None,
},
aggregated_output: StreamOutput {
text: response.aggregated_output.into_inner(),
truncated_after_lines: None,
},
timed_out: response.timed_out,
});
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox, duration)
}

View File

@@ -50,6 +50,9 @@ use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::SHELL_EXEC_METHOD;
use crate::protocol::ShellExecParams;
use crate::protocol::ShellExecResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -202,6 +205,17 @@ impl ExecServerClient {
.map_err(Into::into)
}
pub async fn shell_exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, ExecServerError> {
self.inner
.client
.call(SHELL_EXEC_METHOD, &params)
.await
.map_err(Into::into)
}
pub async fn write(
&self,
process_id: &str,

View File

@@ -3,6 +3,8 @@ use std::sync::Arc;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::RemoteExecServerConnectArgs;
use crate::ShellExecParams;
use crate::ShellExecResponse;
use crate::file_system::ExecutorFileSystem;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
@@ -104,6 +106,16 @@ impl Environment {
Arc::new(LocalFileSystem)
}
}
pub async fn shell_exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, ExecServerError> {
let client = self.remote_exec_server_client.clone().ok_or_else(|| {
ExecServerError::Protocol("remote exec-server client is not configured".to_string())
})?;
client.shell_exec(params).await
}
}
impl ExecutorEnvironment for Environment {

View File

@@ -50,6 +50,8 @@ pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::ShellExecParams;
pub use protocol::ShellExecResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;

View File

@@ -13,6 +13,7 @@ pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
pub const EXEC_EXITED_METHOD: &str = "process/exited";
pub const SHELL_EXEC_METHOD: &str = "shell/exec";
pub const FS_READ_FILE_METHOD: &str = "fs/readFile";
pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile";
pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory";
@@ -140,6 +141,27 @@ pub struct ExecExitedNotification {
pub exit_code: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ShellExecParams {
pub command: Vec<String>,
pub cwd: PathBuf,
pub env: HashMap<String, String>,
pub timeout_ms: Option<u64>,
pub output_bytes_cap: Option<usize>,
pub arg0: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ShellExecResponse {
pub exit_code: i32,
pub stdout: ByteChunk,
pub stderr: ByteChunk,
pub aggregated_output: ByteChunk,
pub timed_out: bool,
}
mod base64_bytes {
use super::BASE64_STANDARD;
use base64::Engine as _;

View File

@@ -3,6 +3,7 @@ mod handler;
mod process_handler;
mod processor;
mod registry;
mod shell_exec_handler;
mod transport;
pub(crate) use handler::ExecServerHandler;

View File

@@ -19,6 +19,8 @@ use crate::protocol::ExecResponse;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::ShellExecParams;
use crate::protocol::ShellExecResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -26,17 +28,21 @@ use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::server::file_system_handler::FileSystemHandler;
use crate::server::process_handler::ProcessHandler;
use crate::server::shell_exec_handler::ShellExecHandler;
#[derive(Clone)]
pub(crate) struct ExecServerHandler {
process: ProcessHandler,
shell_exec: ShellExecHandler,
file_system: FileSystemHandler,
}
impl ExecServerHandler {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
let process = ProcessHandler::new(notifications);
Self {
process: ProcessHandler::new(notifications),
shell_exec: ShellExecHandler::new(process.clone()),
process,
file_system: FileSystemHandler::default(),
}
}
@@ -78,6 +84,13 @@ impl ExecServerHandler {
self.process.terminate(params).await
}
pub(crate) async fn shell_exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, JSONRPCErrorError> {
self.shell_exec.exec(params).await
}
pub(crate) async fn fs_read_file(
&self,
params: FsReadFileParams,

View File

@@ -16,6 +16,8 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::ReadParams;
use crate::protocol::SHELL_EXEC_METHOD;
use crate::protocol::ShellExecParams;
use crate::protocol::TerminateParams;
use crate::protocol::WriteParams;
use crate::rpc::RpcRouter;
@@ -64,6 +66,12 @@ pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
handler.terminate(params).await
},
);
router.request(
SHELL_EXEC_METHOD,
|handler: Arc<ExecServerHandler>, params: ShellExecParams| async move {
handler.shell_exec(params).await
},
);
router.request(
FS_READ_FILE_METHOD,
|handler: Arc<ExecServerHandler>, params: FsReadFileParams| async move {

View File

@@ -0,0 +1,191 @@
use std::time::Duration;
use codex_app_server_protocol::JSONRPCErrorError;
use uuid::Uuid;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecParams;
use crate::protocol::ReadParams;
use crate::protocol::ShellExecParams;
use crate::protocol::ShellExecResponse;
use crate::protocol::TerminateParams;
use crate::rpc::invalid_params;
use crate::server::process_handler::ProcessHandler;
const DEFAULT_EXEC_TIMEOUT_MS: u64 = 10_000;
const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
const READ_CHUNK_SIZE: usize = 8192;
#[derive(Clone)]
pub(crate) struct ShellExecHandler {
process: ProcessHandler,
}
impl ShellExecHandler {
pub(crate) fn new(process: ProcessHandler) -> Self {
Self { process }
}
pub(crate) async fn exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, JSONRPCErrorError> {
self.process.require_initialized_for("shell execution")?;
if params.command.is_empty() {
return Err(invalid_params("command must not be empty".to_string()));
}
let process_id = format!("shell-exec-{}", Uuid::new_v4());
self.process
.exec(ExecParams {
process_id: process_id.clone(),
argv: params.command,
cwd: params.cwd,
env: params.env,
tty: false,
arg0: params.arg0,
})
.await?;
let retained_bytes_cap = params.output_bytes_cap;
let timeout = Duration::from_millis(params.timeout_ms.unwrap_or(DEFAULT_EXEC_TIMEOUT_MS));
let expiration_wait = tokio::time::sleep(timeout);
tokio::pin!(expiration_wait);
let mut stdout = Vec::with_capacity(
retained_bytes_cap.map_or(READ_CHUNK_SIZE, |max_bytes| READ_CHUNK_SIZE.min(max_bytes)),
);
let mut stderr = Vec::with_capacity(stdout.capacity());
let mut after_seq = None;
let mut exit_code = None;
let mut timed_out = false;
loop {
let read_future = self.process.exec_read(ReadParams {
process_id: process_id.clone(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(50),
});
tokio::pin!(read_future);
let read_response = tokio::select! {
response = &mut read_future => response?,
_ = &mut expiration_wait => {
timed_out = true;
let _ = self.process.terminate(TerminateParams {
process_id: process_id.clone(),
}).await;
break;
}
};
after_seq = Some(read_response.next_seq.saturating_sub(1));
append_process_output(
read_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
);
if read_response.exited {
exit_code = Some(read_response.exit_code.unwrap_or(-1));
loop {
let drain_response = self
.process
.exec_read(ReadParams {
process_id: process_id.clone(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(0),
})
.await?;
if drain_response.chunks.is_empty() {
break;
}
after_seq = Some(drain_response.next_seq.saturating_sub(1));
append_process_output(
drain_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
);
}
break;
}
}
let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);
Ok(ShellExecResponse {
exit_code: exit_code.unwrap_or(EXEC_TIMEOUT_EXIT_CODE),
stdout: stdout.into(),
stderr: stderr.into(),
aggregated_output: aggregated_output.into(),
timed_out,
})
}
}
fn append_process_output(
chunks: Vec<crate::protocol::ProcessOutputChunk>,
stdout: &mut Vec<u8>,
stderr: &mut Vec<u8>,
retained_bytes_cap: Option<usize>,
) {
for chunk in chunks {
let bytes = chunk.chunk.into_inner();
match chunk.stream {
ExecOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap),
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
append_with_cap(stdout, &bytes, retained_bytes_cap)
}
}
}
}
fn append_with_cap(dst: &mut Vec<u8>, src: &[u8], max_bytes: Option<usize>) {
if let Some(max_bytes) = max_bytes {
append_capped(dst, src, max_bytes);
} else {
dst.extend_from_slice(src);
}
}
fn append_capped(dst: &mut Vec<u8>, src: &[u8], max_bytes: usize) {
if dst.len() >= max_bytes {
return;
}
let remaining = max_bytes.saturating_sub(dst.len());
let take = remaining.min(src.len());
dst.extend_from_slice(&src[..take]);
}
fn aggregate_output(stdout: &[u8], stderr: &[u8], max_bytes: Option<usize>) -> Vec<u8> {
let Some(max_bytes) = max_bytes else {
let total_len = stdout.len().saturating_add(stderr.len());
let mut aggregated = Vec::with_capacity(total_len);
aggregated.extend_from_slice(stdout);
aggregated.extend_from_slice(stderr);
return aggregated;
};
let total_len = stdout.len().saturating_add(stderr.len());
let mut aggregated = Vec::with_capacity(total_len.min(max_bytes));
if total_len <= max_bytes {
aggregated.extend_from_slice(stdout);
aggregated.extend_from_slice(stderr);
return aggregated;
}
let want_stdout = stdout.len().min(max_bytes / 3);
let want_stderr = stderr.len();
let stderr_take = want_stderr.min(max_bytes.saturating_sub(want_stdout));
let remaining = max_bytes.saturating_sub(want_stdout + stderr_take);
let stdout_take = want_stdout + remaining.min(stdout.len().saturating_sub(want_stdout));
aggregated.extend_from_slice(&stdout[..stdout_take]);
aggregated.extend_from_slice(&stderr[..stderr_take]);
aggregated
}