diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 9271778921..e753e5c29e 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -330,13 +330,21 @@ impl ExecServerClient { let process_id = params.process_id.clone(); let status = Arc::new(RemoteProcessStatus::new()); let (output_tx, output_rx) = broadcast::channel(256); - self.inner.processes.lock().await.insert( - process_id.clone(), - RegisteredProcess { - output_tx, - status: Arc::clone(&status), - }, - ); + { + let mut processes = self.inner.processes.lock().await; + if processes.contains_key(&process_id) { + return Err(ExecServerError::Protocol(format!( + "process `{process_id}` already exists" + ))); + } + processes.insert( + process_id.clone(), + RegisteredProcess { + output_tx, + status: Arc::clone(&status), + }, + ); + } let (writer_tx, mut writer_rx) = mpsc::channel::>(128); let client = self.clone(); @@ -1083,6 +1091,115 @@ mod tests { ); } + #[tokio::test] + async fn start_process_rejects_duplicate_local_ids_without_orphaning_existing_process() { + let (client_stdin, server_reader) = tokio::io::duplex(4096); + let (mut server_writer, client_stdout) = tokio::io::duplex(4096); + + tokio::spawn(async move { + let mut lines = BufReader::new(server_reader).lines(); + + let initialize = read_jsonrpc_line(&mut lines).await; + let JSONRPCMessage::Request(initialize_request) = initialize else { + panic!("expected initialize request"); + }; + write_jsonrpc_line( + &mut server_writer, + JSONRPCMessage::Response(JSONRPCResponse { + id: initialize_request.id, + result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }), + }), + ) + .await; + + let initialized = read_jsonrpc_line(&mut lines).await; + let JSONRPCMessage::Notification(notification) = initialized else { + panic!("expected initialized notification"); + }; + assert_eq!(notification.method, INITIALIZED_METHOD); + + let exec_request = read_jsonrpc_line(&mut lines).await; + let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else { + panic!("expected exec request"); + }; + assert_eq!(method, EXEC_METHOD); + write_jsonrpc_line( + &mut server_writer, + JSONRPCMessage::Response(JSONRPCResponse { + id, + result: serde_json::json!({ "processId": "proc-1" }), + }), + ) + .await; + tokio::time::sleep(Duration::from_millis(25)).await; + write_jsonrpc_line( + &mut server_writer, + JSONRPCMessage::Notification(JSONRPCNotification { + method: EXEC_OUTPUT_DELTA_METHOD.to_string(), + params: Some(serde_json::json!({ + "processId": "proc-1", + "stream": "stdout", + "chunk": "YWxpdmUK" + })), + }), + ) + .await; + }); + + let client = match ExecServerClient::connect_stdio( + client_stdin, + client_stdout, + test_options(), + ) + .await + { + Ok(client) => client, + Err(err) => panic!("failed to connect test client: {err}"), + }; + + let first_process = match client + .start_process(ExecParams { + process_id: "proc-1".to_string(), + argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()], + cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")), + env: HashMap::new(), + tty: true, + arg0: None, + }) + .await + { + Ok(process) => process, + Err(err) => panic!("failed to start first process: {err}"), + }; + + let duplicate_result = client + .start_process(ExecParams { + process_id: "proc-1".to_string(), + argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()], + cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")), + env: HashMap::new(), + tty: true, + arg0: None, + }) + .await; + + match duplicate_result { + Err(ExecServerError::Protocol(message)) => { + assert_eq!(message, "process `proc-1` already exists"); + } + Err(err) => panic!("unexpected duplicate start failure: {err}"), + Ok(_) => panic!("expected local duplicate rejection"), + } + + let mut output = first_process.output_receiver(); + let output = timeout(Duration::from_secs(1), output.recv()) + .await + .unwrap_or_else(|err| panic!("timed out waiting for process output: {err}")) + .unwrap_or_else(|err| panic!("failed to receive process output: {err}")); + assert_eq!(output.stream, ExecOutputStream::Stdout); + assert_eq!(output.chunk, b"alive\n".to_vec()); + } + #[tokio::test] async fn transport_shutdown_marks_processes_exited_without_exit_codes() { let (client_stdin, server_reader) = tokio::io::duplex(4096); diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index 1664632f89..d202b2b952 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -176,6 +176,16 @@ impl ExecServerHandler { .split_first() .ok_or_else(|| invalid_params("argv must not be empty".to_string()))?; + let process_id = params.process_id.clone(); + { + let process_map = self.processes.lock().await; + if process_map.contains_key(&process_id) { + return Err(invalid_request(format!( + "process {process_id} already exists" + ))); + } + } + let spawned = if params.tty { codex_utils_pty::spawn_pty_process( program, @@ -198,7 +208,6 @@ impl ExecServerHandler { } .map_err(|err| internal_error(err.to_string()))?; - let process_id = params.process_id.clone(); { let mut process_map = self.processes.lock().await; if process_map.contains_key(&process_id) { @@ -268,17 +277,17 @@ impl ExecServerHandler { &self, params: crate::protocol::TerminateParams, ) -> Result { - let process = { - let mut process_map = self.processes.lock().await; - process_map.remove(¶ms.process_id) + let running = { + let process_map = self.processes.lock().await; + if let Some(process) = process_map.get(¶ms.process_id) { + process.session.terminate(); + true + } else { + false + } }; - Ok(if let Some(process) = process { - process.session.terminate(); - TerminateResponse { running: true } - } else { - TerminateResponse { running: false } - }) + Ok(TerminateResponse { running }) } async fn send_request_result( @@ -568,6 +577,15 @@ mod tests { async fn duplicate_process_ids_are_rejected_per_connection() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4); let mut handler = ExecServerHandler::new(outgoing_tx); + let marker_path = std::env::temp_dir().join(format!( + "codex-exec-server-duplicate-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before unix epoch") + .as_nanos() + )); + let _ = std::fs::remove_file(&marker_path); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( @@ -626,7 +644,14 @@ mod tests { if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec { request_id: RequestId::Integer(3), - params, + params: crate::protocol::ExecParams { + argv: vec![ + "bash".to_string(), + "-lc".to_string(), + format!("printf duplicate > {}", marker_path.display()), + ], + ..params + }, })) .await { @@ -641,8 +666,13 @@ mod tests { assert_eq!(request_id, RequestId::Integer(3)); assert_eq!(error.code, -32600); assert_eq!(error.message, "process proc-1 already exists"); + assert!( + !marker_path.exists(), + "duplicate process ids must be rejected before spawning the command" + ); handler.shutdown().await; + let _ = std::fs::remove_file(&marker_path); } #[tokio::test] @@ -827,4 +857,82 @@ mod tests { } ); } + + #[tokio::test] + async fn terminate_keeps_process_ids_reserved_until_exit_cleanup() { + let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2); + let mut handler = ExecServerHandler::new(outgoing_tx); + + if let Err(err) = handler + .handle_message(ExecServerInboundMessage::Request( + ExecServerRequest::Initialize { + request_id: RequestId::Integer(1), + params: InitializeParams { + client_name: "test".to_string(), + }, + }, + )) + .await + { + panic!("initialize should succeed: {err}"); + } + let _ = recv_outbound(&mut outgoing_rx).await; + if let Err(err) = handler + .handle_message(ExecServerInboundMessage::Notification( + ExecServerClientNotification::Initialized, + )) + .await + { + panic!("initialized should succeed: {err}"); + } + + let spawned = codex_utils_pty::spawn_pipe_process_no_stdin( + "bash", + &["-lc".to_string(), "sleep 30".to_string()], + std::env::current_dir().expect("cwd").as_path(), + &HashMap::new(), + &None, + ) + .await + .expect("spawn test process"); + { + let mut process_map = handler.processes.lock().await; + process_map.insert( + "proc-1".to_string(), + super::RunningProcess { + session: spawned.session, + tty: false, + }, + ); + } + + if let Err(err) = handler + .handle_message(ExecServerInboundMessage::Request( + ExecServerRequest::Terminate { + request_id: RequestId::Integer(2), + params: crate::protocol::TerminateParams { + process_id: "proc-1".to_string(), + }, + }, + )) + .await + { + panic!("terminate should not fail the handler: {err}"); + } + + assert_eq!( + recv_outbound(&mut outgoing_rx).await, + ExecServerOutboundMessage::Response { + request_id: RequestId::Integer(2), + response: ExecServerResponseMessage::Terminate(TerminateResponse { running: true }), + } + ); + + assert!( + handler.processes.lock().await.contains_key("proc-1"), + "terminated ids should stay reserved until exit cleanup removes them" + ); + + handler.shutdown().await; + } }