mirror of
https://github.com/openai/codex.git
synced 2026-05-17 09:43:19 +00:00
## Why Remote exec-server now needs one executor websocket to serve multiple harness JSON-RPC sessions. Rendezvous routes by `stream_id`, and the exec-server side needs to use the same stable relay frame contract instead of a hand-rolled JSON shape. The relay protocol also needs to make ownership boundaries clear: harness and executor endpoints own sequencing, acks, retries, duplicate suppression, segmentation, and reassembly; rendezvous only routes frames. ## What Changed - Add the checked-in `codex.exec_server.relay.v1.RelayMessageFrame` proto plus generated prost bindings for `codex-exec-server`. - Encode remote harness/executor relay traffic as binary protobuf websocket frames while keeping local websocket JSON-RPC unchanged. - Demux executor-side relay streams into independent `ConnectionProcessor` sessions keyed by `stream_id`. - Add a programmatic `RemoteExecutorConfig::with_bearer_token(...)` constructor for non-CLI callers and integration tests. - Add an integration test that starts the remote executor against a fake registry/rendezvous websocket and verifies two virtual streams share one executor websocket without cross-talk, including per-stream reset behavior. - Document the remote relay envelope, sequence ranges, `ack`/`ack_bits`, and endpoint responsibilities in `exec-server/README.md`. ## Verification - `cargo test -p codex-exec-server --test relay multiplexed_remote_executor_routes_independent_virtual_streams -- --exact` - `cargo test -p codex-exec-server --test relay` - `cargo test -p codex-exec-server` passed outside the sandbox. The sandboxed run hit macOS `sandbox-exec: sandbox_apply: Operation not permitted` in filesystem sandbox tests.
144 lines
5.0 KiB
Rust
144 lines
5.0 KiB
Rust
use std::process::Stdio;
|
|
use tokio::io::AsyncBufReadExt;
|
|
use tokio::io::BufReader;
|
|
use tokio::process::Command;
|
|
use tokio::time::timeout;
|
|
use tokio_tungstenite::connect_async;
|
|
use tracing::debug;
|
|
use tracing::warn;
|
|
|
|
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
|
|
|
use crate::ExecServerClient;
|
|
use crate::ExecServerError;
|
|
use crate::client_api::RemoteExecServerConnectArgs;
|
|
use crate::client_api::StdioExecServerCommand;
|
|
use crate::client_api::StdioExecServerConnectArgs;
|
|
use crate::connection::JsonRpcConnection;
|
|
use crate::relay::harness_connection_from_websocket;
|
|
|
|
const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
|
|
|
|
impl ExecServerClient {
|
|
pub(crate) async fn connect_for_transport(
|
|
transport_params: crate::client_api::ExecServerTransportParams,
|
|
) -> Result<Self, ExecServerError> {
|
|
match transport_params {
|
|
crate::client_api::ExecServerTransportParams::WebSocketUrl {
|
|
websocket_url,
|
|
connect_timeout,
|
|
initialize_timeout,
|
|
} => {
|
|
Self::connect_websocket(RemoteExecServerConnectArgs {
|
|
websocket_url,
|
|
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
|
|
connect_timeout,
|
|
initialize_timeout,
|
|
resume_session_id: None,
|
|
})
|
|
.await
|
|
}
|
|
crate::client_api::ExecServerTransportParams::StdioCommand {
|
|
command,
|
|
initialize_timeout,
|
|
} => {
|
|
Self::connect_stdio_command(StdioExecServerConnectArgs {
|
|
command,
|
|
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
|
|
initialize_timeout,
|
|
resume_session_id: None,
|
|
})
|
|
.await
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn connect_websocket(
|
|
args: RemoteExecServerConnectArgs,
|
|
) -> Result<Self, ExecServerError> {
|
|
ensure_rustls_crypto_provider();
|
|
let websocket_url = args.websocket_url.clone();
|
|
let connect_timeout = args.connect_timeout;
|
|
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
|
|
.await
|
|
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
|
|
url: websocket_url.clone(),
|
|
timeout: connect_timeout,
|
|
})?
|
|
.map_err(|source| ExecServerError::WebSocketConnect {
|
|
url: websocket_url.clone(),
|
|
source,
|
|
})?;
|
|
|
|
let connection_label = format!("exec-server websocket {websocket_url}");
|
|
let connection = if is_rendezvous_harness_url(&websocket_url) {
|
|
harness_connection_from_websocket(stream, connection_label)
|
|
} else {
|
|
JsonRpcConnection::from_websocket(stream, connection_label)
|
|
};
|
|
Self::connect(connection, args.into()).await
|
|
}
|
|
|
|
pub(crate) async fn connect_stdio_command(
|
|
args: StdioExecServerConnectArgs,
|
|
) -> Result<Self, ExecServerError> {
|
|
let mut child = stdio_command_process(&args.command)
|
|
.stdin(Stdio::piped())
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
.map_err(ExecServerError::Spawn)?;
|
|
|
|
let stdin = child.stdin.take().ok_or_else(|| {
|
|
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
|
|
})?;
|
|
let stdout = child.stdout.take().ok_or_else(|| {
|
|
ExecServerError::Protocol("spawned exec-server command has no stdout".to_string())
|
|
})?;
|
|
if let Some(stderr) = child.stderr.take() {
|
|
tokio::spawn(async move {
|
|
let mut lines = BufReader::new(stderr).lines();
|
|
loop {
|
|
match lines.next_line().await {
|
|
Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"),
|
|
Ok(None) => break,
|
|
Err(err) => {
|
|
warn!("failed to read exec-server stdio stderr: {err}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
Self::connect(
|
|
JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio command".to_string())
|
|
.with_child_process(child),
|
|
args.into(),
|
|
)
|
|
.await
|
|
}
|
|
}
|
|
|
|
fn is_rendezvous_harness_url(websocket_url: &str) -> bool {
|
|
let Some((_path, query)) = websocket_url.split_once('?') else {
|
|
return false;
|
|
};
|
|
query
|
|
.split('&')
|
|
.filter_map(|pair| pair.split_once('='))
|
|
.any(|(key, value)| key == "role" && value == "harness")
|
|
}
|
|
|
|
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
|
|
let mut command = Command::new(&stdio_command.program);
|
|
command.args(&stdio_command.args);
|
|
command.envs(&stdio_command.env);
|
|
if let Some(cwd) = &stdio_command.cwd {
|
|
command.current_dir(cwd);
|
|
}
|
|
#[cfg(unix)]
|
|
command.process_group(0);
|
|
command
|
|
}
|