mirror of
https://github.com/openai/codex.git
synced 2026-04-29 17:06:51 +00:00
Document exec-server design flow and add lifecycle tests
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -31,6 +31,8 @@ struct RunningProcess {
|
||||
|
||||
pub(crate) struct ExecServerHandler {
|
||||
outbound_tx: mpsc::Sender<ExecServerOutboundMessage>,
|
||||
// Keyed by the protocol `processId`, which is caller-assigned and scoped to
|
||||
// a single client connection rather than an OS pid.
|
||||
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
|
||||
initialize_requested: bool,
|
||||
initialized: bool,
|
||||
@@ -355,6 +357,7 @@ mod tests {
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::PROTOCOL_VERSION;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::server::routing::ExecServerClientNotification;
|
||||
use crate::server::routing::ExecServerInboundMessage;
|
||||
use crate::server::routing::ExecServerOutboundMessage;
|
||||
@@ -559,4 +562,162 @@ mod tests {
|
||||
"initialize may only be sent once per connection"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn duplicate_process_ids_are_rejected_per_connection() {
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
|
||||
let mut handler = ExecServerHandler::new(outgoing_tx);
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Initialize {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
},
|
||||
},
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("initialize should succeed: {err}");
|
||||
}
|
||||
let _ = recv_outbound(&mut outgoing_rx).await;
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Notification(
|
||||
ExecServerClientNotification::Initialized,
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("initialized should succeed: {err}");
|
||||
}
|
||||
|
||||
let params = crate::protocol::ExecParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
argv: vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
"sleep 30".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir().expect("cwd"),
|
||||
env: HashMap::new(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
};
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: params.clone(),
|
||||
}))
|
||||
.await
|
||||
{
|
||||
panic!("first exec should succeed: {err}");
|
||||
}
|
||||
assert_eq!(
|
||||
recv_outbound(&mut outgoing_rx).await,
|
||||
ExecServerOutboundMessage::Response {
|
||||
request_id: RequestId::Integer(2),
|
||||
response: ExecServerResponseMessage::Exec(crate::protocol::ExecResponse {
|
||||
process_id: "proc-1".to_string(),
|
||||
}),
|
||||
}
|
||||
);
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
|
||||
request_id: RequestId::Integer(3),
|
||||
params,
|
||||
}))
|
||||
.await
|
||||
{
|
||||
panic!("duplicate exec should not fail the handler: {err}");
|
||||
}
|
||||
|
||||
let ExecServerOutboundMessage::Error { request_id, error } =
|
||||
recv_outbound(&mut outgoing_rx).await
|
||||
else {
|
||||
panic!("expected duplicate-process error");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(3));
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(error.message, "process proc-1 already exists");
|
||||
|
||||
handler.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn writes_to_pipe_backed_processes_are_rejected() {
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
|
||||
let mut handler = ExecServerHandler::new(outgoing_tx);
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Initialize {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
},
|
||||
},
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("initialize should succeed: {err}");
|
||||
}
|
||||
let _ = recv_outbound(&mut outgoing_rx).await;
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Notification(
|
||||
ExecServerClientNotification::Initialized,
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("initialized should succeed: {err}");
|
||||
}
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: crate::protocol::ExecParams {
|
||||
process_id: "proc-2".to_string(),
|
||||
argv: vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
"sleep 30".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir().expect("cwd"),
|
||||
env: HashMap::new(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
},
|
||||
}))
|
||||
.await
|
||||
{
|
||||
panic!("exec should succeed: {err}");
|
||||
}
|
||||
let _ = recv_outbound(&mut outgoing_rx).await;
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Write {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: WriteParams {
|
||||
process_id: "proc-2".to_string(),
|
||||
chunk: b"hello\n".to_vec().into(),
|
||||
},
|
||||
},
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("write should not fail the handler: {err}");
|
||||
}
|
||||
|
||||
let ExecServerOutboundMessage::Error { request_id, error } =
|
||||
recv_outbound(&mut outgoing_rx).await
|
||||
else {
|
||||
panic!("expected stdin-closed error");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(3));
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(error.message, "stdin is closed for process proc-2");
|
||||
|
||||
handler.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user