From f47954caefee7f406c5fd460e75928f1a7bfc653 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Tue, 5 May 2026 15:47:17 -0700 Subject: [PATCH] Remove server disconnect race test The stdio transport no longer adds a processor-side disconnect side channel, so drop the test that asserted that removed behavior. Client cleanup is covered at the RPC/client transport boundary instead. Co-authored-by: Codex --- codex-rs/exec-server/src/server/processor.rs | 238 ------------------- 1 file changed, 238 deletions(-) diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index 132de63921..9aa2b715ee 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -163,241 +163,3 @@ async fn run_connection( } let _ = outbound_task.await; } - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - use std::time::Duration; - - use codex_app_server_protocol::JSONRPCMessage; - use codex_app_server_protocol::JSONRPCNotification; - use codex_app_server_protocol::JSONRPCRequest; - use codex_app_server_protocol::JSONRPCResponse; - use codex_app_server_protocol::RequestId; - use serde::Serialize; - use serde::de::DeserializeOwned; - use tokio::io::AsyncBufReadExt; - use tokio::io::AsyncWriteExt; - use tokio::io::BufReader; - use tokio::io::DuplexStream; - use tokio::io::Lines; - use tokio::io::duplex; - use tokio::task::JoinHandle; - use tokio::time::timeout; - - use super::run_connection; - use crate::ExecServerRuntimePaths; - use crate::ProcessId; - use crate::connection::JsonRpcConnection; - use crate::protocol::EXEC_METHOD; - use crate::protocol::EXEC_READ_METHOD; - use crate::protocol::EXEC_TERMINATE_METHOD; - use crate::protocol::ExecParams; - use crate::protocol::ExecResponse; - use crate::protocol::INITIALIZE_METHOD; - use crate::protocol::INITIALIZED_METHOD; - use crate::protocol::InitializeParams; - use crate::protocol::InitializeResponse; - use crate::protocol::ReadParams; - use crate::protocol::TerminateParams; - use crate::protocol::TerminateResponse; - use crate::server::session_registry::SessionRegistry; - - #[tokio::test] - async fn transport_disconnect_detaches_session_during_in_flight_read() { - let registry = SessionRegistry::new(); - let (mut first_writer, mut first_lines, first_task) = - spawn_test_connection(Arc::clone(®istry), "first"); - - send_request( - &mut first_writer, - /*id*/ 1, - INITIALIZE_METHOD, - &InitializeParams { - client_name: "exec-server-test".to_string(), - resume_session_id: None, - }, - ) - .await; - let initialize_response: InitializeResponse = - read_response(&mut first_lines, /*expected_id*/ 1).await; - send_notification(&mut first_writer, INITIALIZED_METHOD, &()).await; - - let process_id = ProcessId::from("proc-long-poll"); - send_request( - &mut first_writer, - /*id*/ 2, - EXEC_METHOD, - &exec_params(process_id.clone()), - ) - .await; - let _: ExecResponse = read_response(&mut first_lines, /*expected_id*/ 2).await; - - send_request( - &mut first_writer, - /*id*/ 3, - EXEC_READ_METHOD, - &ReadParams { - process_id: process_id.clone(), - after_seq: None, - max_bytes: None, - wait_ms: Some(5_000), - }, - ) - .await; - drop(first_writer); - tokio::time::sleep(Duration::from_millis(25)).await; - - let (mut second_writer, mut second_lines, second_task) = - spawn_test_connection(Arc::clone(®istry), "second"); - send_request( - &mut second_writer, - /*id*/ 1, - INITIALIZE_METHOD, - &InitializeParams { - client_name: "exec-server-test".to_string(), - resume_session_id: Some(initialize_response.session_id.clone()), - }, - ) - .await; - let second_initialize_response = timeout( - Duration::from_secs(1), - read_response::(&mut second_lines, /*expected_id*/ 1), - ) - .await - .expect("resume initialize should not wait for the old read to finish"); - assert_eq!( - second_initialize_response.session_id, - initialize_response.session_id - ); - timeout(Duration::from_secs(1), first_task) - .await - .expect("first processor should exit") - .expect("first processor should join"); - send_notification(&mut second_writer, INITIALIZED_METHOD, &()).await; - - send_request( - &mut second_writer, - /*id*/ 2, - EXEC_TERMINATE_METHOD, - &TerminateParams { process_id }, - ) - .await; - let _: TerminateResponse = read_response(&mut second_lines, /*expected_id*/ 2).await; - - drop(second_writer); - drop(second_lines); - timeout(Duration::from_secs(1), second_task) - .await - .expect("second processor should exit") - .expect("second processor should join"); - } - - fn spawn_test_connection( - registry: Arc, - label: &str, - ) -> (DuplexStream, Lines>, JoinHandle<()>) { - let (client_writer, server_reader) = duplex(1 << 20); - let (server_writer, client_reader) = duplex(1 << 20); - let connection = - JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string()); - let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths())); - (client_writer, BufReader::new(client_reader).lines(), task) - } - - fn test_runtime_paths() -> ExecServerRuntimePaths { - ExecServerRuntimePaths::new( - std::env::current_exe().expect("current exe"), - /*codex_linux_sandbox_exe*/ None, - ) - .expect("runtime paths") - } - - async fn send_request( - writer: &mut DuplexStream, - id: i64, - method: &str, - params: &P, - ) { - write_message( - writer, - &JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(id), - method: method.to_string(), - params: Some(serde_json::to_value(params).expect("serialize params")), - trace: None, - }), - ) - .await; - } - - async fn send_notification(writer: &mut DuplexStream, method: &str, params: &P) { - write_message( - writer, - &JSONRPCMessage::Notification(JSONRPCNotification { - method: method.to_string(), - params: Some(serde_json::to_value(params).expect("serialize params")), - }), - ) - .await; - } - - async fn write_message(writer: &mut DuplexStream, message: &JSONRPCMessage) { - let encoded = serde_json::to_vec(message).expect("serialize JSON-RPC message"); - writer.write_all(&encoded).await.expect("write request"); - writer.write_all(b"\n").await.expect("write newline"); - } - - async fn read_response( - lines: &mut Lines>, - expected_id: i64, - ) -> T { - let line = lines - .next_line() - .await - .expect("read response") - .expect("response line"); - match serde_json::from_str::(&line).expect("decode JSON-RPC response") { - JSONRPCMessage::Response(JSONRPCResponse { id, result }) => { - assert_eq!(id, RequestId::Integer(expected_id)); - serde_json::from_value(result).expect("decode response result") - } - JSONRPCMessage::Error(error) => panic!("unexpected JSON-RPC error: {error:?}"), - other => panic!("expected JSON-RPC response, got {other:?}"), - } - } - - fn exec_params(process_id: ProcessId) -> ExecParams { - let mut env = HashMap::new(); - if let Some(path) = std::env::var_os("PATH") { - env.insert("PATH".to_string(), path.to_string_lossy().into_owned()); - } - ExecParams { - process_id, - argv: sleep_then_print_argv(), - cwd: std::env::current_dir().expect("cwd"), - env_policy: None, - env, - tty: false, - pipe_stdin: false, - arg0: None, - } - } - - fn sleep_then_print_argv() -> Vec { - if cfg!(windows) { - vec![ - std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string()), - "/C".to_string(), - "ping -n 3 127.0.0.1 >NUL && echo late".to_string(), - ] - } else { - vec![ - "/bin/sh".to_string(), - "-c".to_string(), - "sleep 1; printf late".to_string(), - ] - } - } -}