exec-server: add in-process client mode

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-17 20:10:22 +00:00
parent 30e094e34a
commit 238840fe08
9 changed files with 1023 additions and 347 deletions

View File

@@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::TerminalSize;
@@ -12,6 +14,8 @@ use crate::protocol::ExecOutputStream;
use crate::protocol::ExecResponse;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
use crate::server::routing::ExecServerClientNotification;
@@ -24,15 +28,28 @@ use crate::server::routing::internal_error;
use crate::server::routing::invalid_params;
use crate::server::routing::invalid_request;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
#[derive(Clone)]
struct RetainedOutputChunk {
seq: u64,
stream: ExecOutputStream,
chunk: Vec<u8>,
}
struct RunningProcess {
session: ExecCommandSession,
tty: bool,
output: VecDeque<RetainedOutputChunk>,
retained_bytes: usize,
next_seq: u64,
exit_code: Option<i32>,
}
pub(crate) struct ExecServerHandler {
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
// Keyed by the protocol `processId`, which is caller-assigned and scoped to
// a single client connection rather than an OS pid.
// Keyed by client-chosen logical `processId` scoped to this connection.
// This is a protocol handle, not an OS pid.
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
initialize_requested: bool,
initialized: bool,
@@ -94,6 +111,19 @@ impl ExecServerHandler {
)
.await;
}
ExecServerRequest::Read { request_id, params } => {
self.send_request_result(
request_id,
match self.require_initialized() {
Ok(()) => self
.handle_read_request(params)
.await
.map(ExecServerResponseMessage::Read),
Err(err) => Err(err),
},
)
.await;
}
ExecServerRequest::Write { request_id, params } => {
self.send_request_result(
request_id,
@@ -177,14 +207,6 @@ impl ExecServerHandler {
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
let process_id = params.process_id.clone();
{
let process_map = self.processes.lock().await;
if process_map.contains_key(&process_id) {
return Err(invalid_request(format!(
"process {process_id} already exists"
)));
}
}
let spawned = if params.tty {
codex_utils_pty::spawn_pty_process(
@@ -221,21 +243,35 @@ impl ExecServerHandler {
RunningProcess {
session: spawned.session,
tty: params.tty,
output: std::collections::VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
},
);
}
tokio::spawn(stream_output(
process_id.clone(),
ExecOutputStream::Stdout,
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
));
tokio::spawn(stream_output(
process_id.clone(),
ExecOutputStream::Stderr,
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
self.outbound_tx.clone(),
Arc::clone(&self.processes),
));
tokio::spawn(watch_exit(
process_id.clone(),
@@ -247,6 +283,59 @@ impl ExecServerHandler {
Ok(ExecResponse { process_id })
}
async fn handle_read_request(
&self,
params: crate::protocol::ReadParams,
) -> Result<ReadResponse, codex_app_server_protocol::JSONRPCErrorError> {
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
let deadline = tokio::time::Instant::now() + wait;
loop {
let response = {
let process_map = self.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
let mut chunks = Vec::new();
let mut total_bytes = 0;
for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) {
let chunk_len = retained.chunk.len();
if !chunks.is_empty() && total_bytes + chunk_len > max_bytes {
break;
}
total_bytes += chunk_len;
chunks.push(ProcessOutputChunk {
seq: retained.seq,
stream: retained.stream,
chunk: retained.chunk.clone().into(),
});
if total_bytes >= max_bytes {
break;
}
}
ReadResponse {
chunks,
next_seq: process.next_seq,
exited: process.exit_code.is_some(),
exit_code: process.exit_code,
}
};
if !response.chunks.is_empty()
|| response.exited
|| tokio::time::Instant::now() >= deadline
{
return Ok(response);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
async fn handle_write_request(
&self,
params: crate::protocol::WriteParams,
@@ -315,15 +404,38 @@ async fn stream_output(
stream: ExecOutputStream,
mut receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
) {
while let Some(chunk) = receiver.recv().await {
let notification = {
let mut processes = processes.lock().await;
let Some(process) = processes.get_mut(&process_id) else {
break;
};
let seq = process.next_seq;
process.next_seq += 1;
process.retained_bytes += chunk.len();
process.output.push_back(RetainedOutputChunk {
seq,
stream,
chunk: chunk.clone(),
});
while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS {
let Some(evicted) = process.output.pop_front() else {
break;
};
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
}
ExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
chunk: chunk.into(),
}
};
if outbound_tx
.send(ExecServerOutboundMessage::Notification(
ExecServerServerNotification::OutputDelta(ExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
chunk: chunk.into(),
}),
ExecServerServerNotification::OutputDelta(notification),
))
.await
.is_err()
@@ -342,7 +454,9 @@ async fn watch_exit(
let exit_code = exit_rx.await.unwrap_or(-1);
{
let mut processes = processes.lock().await;
processes.remove(&process_id);
if let Some(process) = processes.get_mut(&process_id) {
process.exit_code = Some(exit_code);
}
}
let _ = outbound_tx
.send(ExecServerOutboundMessage::Notification(
@@ -574,18 +688,9 @@ mod tests {
}
#[tokio::test]
async fn duplicate_process_ids_are_rejected_per_connection() {
async fn exec_echoes_client_process_ids() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
let mut handler = ExecServerHandler::new(outgoing_tx);
let marker_path = std::env::temp_dir().join(format!(
"codex-exec-server-duplicate-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before unix epoch")
.as_nanos()
));
let _ = std::fs::remove_file(&marker_path);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
@@ -631,48 +736,41 @@ mod tests {
{
panic!("first exec should succeed: {err}");
}
assert_eq!(
recv_outbound(&mut outgoing_rx).await,
ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(2),
response: ExecServerResponseMessage::Exec(crate::protocol::ExecResponse {
process_id: "proc-1".to_string(),
}),
}
);
let ExecServerOutboundMessage::Response {
request_id,
response: ExecServerResponseMessage::Exec(first_exec),
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected first exec response");
};
assert_eq!(request_id, RequestId::Integer(2));
assert_eq!(first_exec.process_id, "proc-1");
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(3),
params: crate::protocol::ExecParams {
argv: vec![
"bash".to_string(),
"-lc".to_string(),
format!("printf duplicate > {}", marker_path.display()),
],
process_id: "proc-2".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
..params
},
}))
.await
{
panic!("duplicate exec should not fail the handler: {err}");
panic!("second exec should succeed: {err}");
}
let ExecServerOutboundMessage::Error { request_id, error } =
recv_outbound(&mut outgoing_rx).await
let ExecServerOutboundMessage::Response {
request_id,
response: ExecServerResponseMessage::Exec(second_exec),
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected duplicate-process error");
panic!("expected second exec response");
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "process proc-1 already exists");
assert!(
!marker_path.exists(),
"duplicate process ids must be rejected before spawning the command"
);
assert_eq!(second_exec.process_id, "proc-2");
handler.shutdown().await;
let _ = std::fs::remove_file(&marker_path);
}
#[tokio::test]
@@ -707,7 +805,7 @@ mod tests {
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(2),
params: crate::protocol::ExecParams {
process_id: "proc-2".to_string(),
process_id: "proc-1".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
@@ -723,14 +821,20 @@ mod tests {
{
panic!("exec should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
let ExecServerOutboundMessage::Response {
response: ExecServerResponseMessage::Exec(exec_response),
..
} = recv_outbound(&mut outgoing_rx).await
else {
panic!("expected exec response");
};
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Write {
request_id: RequestId::Integer(3),
params: WriteParams {
process_id: "proc-2".to_string(),
process_id: exec_response.process_id,
chunk: b"hello\n".to_vec().into(),
},
},
@@ -747,7 +851,7 @@ mod tests {
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "stdin is closed for process proc-2");
assert_eq!(error.message, "stdin is closed for process proc-1");
handler.shutdown().await;
}
@@ -859,7 +963,7 @@ mod tests {
}
#[tokio::test]
async fn terminate_keeps_process_ids_reserved_until_exit_cleanup() {
async fn terminate_keeps_process_ids_reserved() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let mut handler = ExecServerHandler::new(outgoing_tx);
@@ -902,6 +1006,10 @@ mod tests {
super::RunningProcess {
session: spawned.session,
tty: false,
output: std::collections::VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
},
);
}