Remove stdio transport from exec server (#15119)

Summary
- delete the deprecated stdio transport plumbing from the exec server
stack
- add a basic `exec_server()` harness plus test utilities to start a
server, send requests, and await events
- refresh exec-server dependencies, configs, and documentation to
reflect the new flow

Testing
- Not run (not requested)

---------

Co-authored-by: starr-openai <starr@openai.com>
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
pakrym-oai
2026-03-18 18:00:35 -07:00
committed by GitHub
parent 4fd2774614
commit 903660edba
17 changed files with 418 additions and 562 deletions

View File

@@ -0,0 +1,188 @@
#![allow(dead_code)]
use std::process::Stdio;
use std::time::Duration;
use anyhow::anyhow;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::RequestId;
use codex_utils_cargo_bin::cargo_bin;
use futures::SinkExt;
use futures::StreamExt;
use tokio::process::Child;
use tokio::process::Command;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25);
const EVENT_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) struct ExecServerHarness {
child: Child,
websocket: tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
next_request_id: i64,
}
impl Drop for ExecServerHarness {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
pub(crate) async fn exec_server() -> anyhow::Result<ExecServerHarness> {
let binary = cargo_bin("codex-exec-server")?;
let websocket_url = reserve_websocket_url()?;
let mut child = Command::new(binary);
child.args(["--listen", &websocket_url]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::inherit());
let child = child.spawn()?;
let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
Ok(ExecServerHarness {
child,
websocket,
next_request_id: 1,
})
}
impl ExecServerHarness {
pub(crate) async fn send_request(
&mut self,
method: &str,
params: serde_json::Value,
) -> anyhow::Result<RequestId> {
let id = RequestId::Integer(self.next_request_id);
self.next_request_id += 1;
self.send_message(JSONRPCMessage::Request(JSONRPCRequest {
id: id.clone(),
method: method.to_string(),
params: Some(params),
trace: None,
}))
.await?;
Ok(id)
}
pub(crate) async fn send_notification(
&mut self,
method: &str,
params: serde_json::Value,
) -> anyhow::Result<()> {
self.send_message(JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(params),
}))
.await
}
pub(crate) async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> {
self.websocket
.send(Message::Text(text.to_string().into()))
.await?;
Ok(())
}
pub(crate) async fn next_event(&mut self) -> anyhow::Result<JSONRPCMessage> {
self.next_event_with_timeout(EVENT_TIMEOUT).await
}
pub(crate) async fn wait_for_event<F>(
&mut self,
mut predicate: F,
) -> anyhow::Result<JSONRPCMessage>
where
F: FnMut(&JSONRPCMessage) -> bool,
{
let deadline = Instant::now() + EVENT_TIMEOUT;
loop {
let now = Instant::now();
if now >= deadline {
return Err(anyhow!(
"timed out waiting for matching exec-server event after {EVENT_TIMEOUT:?}"
));
}
let remaining = deadline.duration_since(now);
let event = self.next_event_with_timeout(remaining).await?;
if predicate(&event) {
return Ok(event);
}
}
}
pub(crate) async fn shutdown(&mut self) -> anyhow::Result<()> {
self.child.start_kill()?;
Ok(())
}
async fn send_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> {
let encoded = serde_json::to_string(&message)?;
self.websocket.send(Message::Text(encoded.into())).await?;
Ok(())
}
async fn next_event_with_timeout(
&mut self,
timeout_duration: Duration,
) -> anyhow::Result<JSONRPCMessage> {
loop {
let frame = timeout(timeout_duration, self.websocket.next())
.await
.map_err(|_| anyhow!("timed out waiting for exec-server websocket event"))?
.ok_or_else(|| anyhow!("exec-server websocket closed"))??;
match frame {
Message::Text(text) => {
return Ok(serde_json::from_str(text.as_ref())?);
}
Message::Binary(bytes) => {
return Ok(serde_json::from_slice(bytes.as_ref())?);
}
Message::Close(_) => return Err(anyhow!("exec-server websocket closed")),
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
}
}
fn reserve_websocket_url() -> anyhow::Result<String> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;
drop(listener);
Ok(format!("ws://{addr}"))
}
async fn connect_websocket_when_ready(
websocket_url: &str,
) -> anyhow::Result<(
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
tokio_tungstenite::tungstenite::handshake::client::Response,
)> {
let deadline = Instant::now() + CONNECT_TIMEOUT;
loop {
match connect_async(websocket_url).await {
Ok(websocket) => return Ok(websocket),
Err(err)
if Instant::now() < deadline
&& matches!(
err,
tokio_tungstenite::tungstenite::Error::Io(ref io_err)
if io_err.kind() == std::io::ErrorKind::ConnectionRefused
) =>
{
sleep(CONNECT_RETRY_INTERVAL).await;
}
Err(err) => return Err(err.into()),
}
}
}

View File

@@ -0,0 +1 @@
pub(crate) mod exec_server;

View File

@@ -0,0 +1,34 @@
#![cfg(unix)]
mod common;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use common::exec_server::exec_server;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_accepts_initialize() -> anyhow::Result<()> {
let mut server = exec_server().await?;
let initialize_id = server
.send_request(
"initialize",
serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?,
)
.await?;
let response = server.next_event().await?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected initialize response");
};
assert_eq!(id, initialize_id);
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
server.shutdown().await?;
Ok(())
}

View File

@@ -0,0 +1,65 @@
#![cfg(unix)]
mod common;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::InitializeParams;
use common::exec_server::exec_server;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> {
let mut server = exec_server().await?;
let initialize_id = server
.send_request(
"initialize",
serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?,
)
.await?;
let _ = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
)
})
.await?;
let process_start_id = server
.send_request(
"process/start",
serde_json::json!({
"processId": "proc-1",
"argv": ["true"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"arg0": null
}),
)
.await?;
let response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Error(JSONRPCError { id, .. }) if id == &process_start_id
)
})
.await?;
let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else {
panic!("expected process/start stub error");
};
assert_eq!(id, process_start_id);
assert_eq!(error.code, -32601);
assert_eq!(
error.message,
"exec-server stub does not implement `process/start` yet"
);
server.shutdown().await?;
Ok(())
}

View File

@@ -1,129 +0,0 @@
#![cfg(unix)]
use std::process::Stdio;
use std::time::Duration;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::time::timeout;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_accepts_initialize_over_stdio() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.args(["--listen", "stdio://"]);
child.stdin(Stdio::piped());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
let mut child = child.spawn()?;
let mut stdin = child.stdin.take().expect("stdin");
let stdout = child.stdout.take().expect("stdout");
let mut stdout = BufReader::new(stdout).lines();
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?),
trace: None,
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&initialize)?).as_bytes())
.await?;
let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let response_line = response_line.expect("response line");
let response: JSONRPCMessage = serde_json::from_str(&response_line)?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected initialize response");
};
assert_eq!(id, RequestId::Integer(1));
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
method: "initialized".to_string(),
params: Some(serde_json::json!({})),
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&initialized)?).as_bytes())
.await?;
child.start_kill()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_stubs_process_start_over_stdio() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.args(["--listen", "stdio://"]);
child.stdin(Stdio::piped());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
let mut child = child.spawn()?;
let mut stdin = child.stdin.take().expect("stdin");
let stdout = child.stdout.take().expect("stdout");
let mut stdout = BufReader::new(stdout).lines();
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?),
trace: None,
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&initialize)?).as_bytes())
.await?;
let _ = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let exec = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(2),
method: "process/start".to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"argv": ["true"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"arg0": null
})),
trace: None,
});
stdin
.write_all(format!("{}\n", serde_json::to_string(&exec)?).as_bytes())
.await?;
let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??;
let response_line = response_line.expect("exec response line");
let response: JSONRPCMessage = serde_json::from_str(&response_line)?;
let JSONRPCMessage::Error(codex_app_server_protocol::JSONRPCError { id, error }) = response
else {
panic!("expected process/start stub error");
};
assert_eq!(id, RequestId::Integer(2));
assert_eq!(error.code, -32601);
assert_eq!(
error.message,
"exec-server stub does not implement `process/start` yet"
);
child.start_kill()?;
Ok(())
}

View File

@@ -0,0 +1,60 @@
#![cfg(unix)]
mod common;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use common::exec_server::exec_server;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> {
let mut server = exec_server().await?;
server.send_raw_text("not-json").await?;
let response = server
.wait_for_event(|event| matches!(event, JSONRPCMessage::Error(_)))
.await?;
let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else {
panic!("expected malformed-message error response");
};
assert_eq!(id, codex_app_server_protocol::RequestId::Integer(-1));
assert_eq!(error.code, -32600);
assert!(
error
.message
.starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"),
"unexpected malformed-message error: {}",
error.message
);
let initialize_id = server
.send_request(
"initialize",
serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?,
)
.await?;
let response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
)
})
.await?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected initialize response after malformed input");
};
assert_eq!(id, initialize_id);
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
server.shutdown().await?;
Ok(())
}

View File

@@ -1,229 +0,0 @@
#![cfg(unix)]
use std::process::Stdio;
use std::time::Duration;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
use tokio::process::Command;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_accepts_initialize_over_websocket() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let websocket_url = reserve_websocket_url()?;
let mut child = Command::new(binary);
child.args(["--listen", &websocket_url]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::inherit());
let mut child = child.spawn()?;
let (mut websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?),
trace: None,
});
futures::SinkExt::send(
&mut websocket,
Message::Text(serde_json::to_string(&initialize)?.into()),
)
.await?;
let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await
else {
panic!("expected initialize response");
};
let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected initialize response");
};
assert_eq!(id, RequestId::Integer(1));
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
method: "initialized".to_string(),
params: Some(serde_json::json!({})),
});
futures::SinkExt::send(
&mut websocket,
Message::Text(serde_json::to_string(&initialized)?.into()),
)
.await?;
child.start_kill()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let websocket_url = reserve_websocket_url()?;
let mut child = Command::new(binary);
child.args(["--listen", &websocket_url]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::inherit());
let mut child = child.spawn()?;
let (mut websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
futures::SinkExt::send(&mut websocket, Message::Text("not-json".to_string().into())).await?;
let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await
else {
panic!("expected malformed-message error response");
};
let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?;
let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else {
panic!("expected malformed-message error response");
};
assert_eq!(id, RequestId::Integer(-1));
assert_eq!(error.code, -32600);
assert!(
error
.message
.starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"),
"unexpected malformed-message error: {}",
error.message
);
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?),
trace: None,
});
futures::SinkExt::send(
&mut websocket,
Message::Text(serde_json::to_string(&initialize)?.into()),
)
.await?;
let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await
else {
panic!("expected initialize response after malformed input");
};
let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?;
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
panic!("expected initialize response after malformed input");
};
assert_eq!(id, RequestId::Integer(1));
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
child.start_kill()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> {
let binary = cargo_bin("codex-exec-server")?;
let websocket_url = reserve_websocket_url()?;
let mut child = Command::new(binary);
child.args(["--listen", &websocket_url]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::inherit());
let mut child = child.spawn()?;
let (mut websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
})?),
trace: None,
});
futures::SinkExt::send(
&mut websocket,
Message::Text(serde_json::to_string(&initialize)?.into()),
)
.await?;
let _ = futures::StreamExt::next(&mut websocket).await;
let exec = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(2),
method: "process/start".to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"argv": ["true"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"arg0": null
})),
trace: None,
});
futures::SinkExt::send(
&mut websocket,
Message::Text(serde_json::to_string(&exec)?.into()),
)
.await?;
let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await
else {
panic!("expected process/start error");
};
let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?;
let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else {
panic!("expected process/start stub error");
};
assert_eq!(id, RequestId::Integer(2));
assert_eq!(error.code, -32601);
assert_eq!(
error.message,
"exec-server stub does not implement `process/start` yet"
);
child.start_kill()?;
Ok(())
}
fn reserve_websocket_url() -> anyhow::Result<String> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;
drop(listener);
Ok(format!("ws://{addr}"))
}
async fn connect_websocket_when_ready(
websocket_url: &str,
) -> anyhow::Result<(
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
tokio_tungstenite::tungstenite::handshake::client::Response,
)> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
match connect_async(websocket_url).await {
Ok(websocket) => return Ok(websocket),
Err(err)
if tokio::time::Instant::now() < deadline
&& matches!(
err,
tokio_tungstenite::tungstenite::Error::Io(ref io_err)
if io_err.kind() == std::io::ErrorKind::ConnectionRefused
) =>
{
tokio::time::sleep(Duration::from_millis(25)).await;
}
Err(err) => return Err(err.into()),
}
}
}