From 881e7b5ddf693a2591349a98a2a32b33930d8a97 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Mon, 4 May 2026 12:03:46 -0700 Subject: [PATCH] Clean up stdio client process groups Use the existing process-group cleanup pattern for stdio command transports so wrapper shell children are terminated with the client lifetime. Add a regression test that drops the client after spawning a background shell child through the command-backed transport. Co-authored-by: Codex --- codex-rs/exec-server/src/client.rs | 76 +++++++++++++++++++ codex-rs/exec-server/src/client_transport.rs | 77 ++++++++++++++++---- 2 files changed, 140 insertions(+), 13 deletions(-) diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index c91902f3f9..c04c57eb25 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -872,6 +872,10 @@ mod tests { use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use pretty_assertions::assert_eq; + #[cfg(unix)] + use std::path::Path; + #[cfg(unix)] + use std::process::Command; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; @@ -879,6 +883,8 @@ mod tests { use tokio::io::duplex; use tokio::sync::mpsc; use tokio::time::Duration; + #[cfg(unix)] + use tokio::time::sleep; use tokio::time::timeout; use super::ExecServerClient; @@ -938,6 +944,76 @@ mod tests { assert_eq!(client.session_id().as_deref(), Some("stdio-test")); } + #[cfg(unix)] + #[tokio::test] + async fn dropping_stdio_client_terminates_shell_process_group() { + let tempdir = tempfile::tempdir().expect("tempdir should be created"); + let pid_file = tempdir.path().join("child.pid"); + let shell_command = format!( + "read _line; \ + (trap 'exit 0' TERM; while true; do sleep 1; done) & \ + child=$!; \ + echo \"$child\" > {}; \ + printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \ + read _line; \ + wait \"$child\"", + shell_quote(pid_file.as_path()), + ); + + let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs { + shell_command, + client_name: "stdio-test-client".to_string(), + initialize_timeout: Duration::from_secs(1), + resume_session_id: None, + }) + .await + .expect("stdio client should connect"); + let child_pid = read_pid_file(pid_file.as_path()).await; + assert!( + process_exists(child_pid), + "wrapper child process should be running before client drop" + ); + + drop(client); + + for _ in 0..20 { + if !process_exists(child_pid) { + return; + } + sleep(Duration::from_millis(100)).await; + } + panic!("wrapper child process {child_pid} should exit after client drop"); + } + + #[cfg(unix)] + async fn read_pid_file(path: &Path) -> u32 { + for _ in 0..20 { + if let Ok(contents) = std::fs::read_to_string(path) { + return contents + .trim() + .parse() + .expect("pid file should contain a pid"); + } + sleep(Duration::from_millis(50)).await; + } + panic!("pid file {} should be written", path.display()); + } + + #[cfg(unix)] + fn process_exists(pid: u32) -> bool { + Command::new("kill") + .arg("-0") + .arg(pid.to_string()) + .status() + .is_ok_and(|status| status.success()) + } + + #[cfg(unix)] + fn shell_quote(path: &Path) -> String { + let value = path.to_string_lossy(); + format!("'{}'", value.replace('\'', "'\\''")) + } + #[tokio::test] async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() { let (client_stdin, server_reader) = duplex(1 << 20); diff --git a/codex-rs/exec-server/src/client_transport.rs b/codex-rs/exec-server/src/client_transport.rs index c49e9cdb98..d877aae3df 100644 --- a/codex-rs/exec-server/src/client_transport.rs +++ b/codex-rs/exec-server/src/client_transport.rs @@ -1,6 +1,14 @@ use std::process::Stdio; +#[cfg(unix)] +use std::thread::sleep; +#[cfg(unix)] +use std::thread::spawn; use std::time::Duration; +#[cfg(unix)] +use codex_utils_pty::process_group::kill_process_group; +#[cfg(unix)] +use codex_utils_pty::process_group::terminate_process_group; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio::process::Child; @@ -21,6 +29,8 @@ use crate::connection::JsonRpcConnection; const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment"; const ENVIRONMENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5); +#[cfg(unix)] +const STDIO_CHILD_TERM_GRACE_PERIOD: Duration = Duration::from_millis(500); impl ExecServerTransport { pub(crate) async fn connect_for_environment(self) -> Result { @@ -80,11 +90,13 @@ impl ExecServerClient { ) -> Result { let shell_command = args.shell_command.clone(); let mut child = shell_command_process(&shell_command) + .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .map_err(ExecServerError::Spawn)?; + let process_id = child.id(); let stdin = child.stdin.take().ok_or_else(|| { ExecServerError::Protocol("spawned exec-server command has no stdin".to_string()) @@ -114,7 +126,10 @@ impl ExecServerClient { stdin, format!("exec-server stdio command `{shell_command}`"), ) - .with_transport_lifetime(Box::new(StdioChildGuard { child: Some(child) })), + .with_transport_lifetime(Box::new(StdioChildGuard { + child: Some(child), + process_id, + })), args.into(), ) .await @@ -123,34 +138,69 @@ impl ExecServerClient { struct StdioChildGuard { child: Option, + process_id: Option, } impl Drop for StdioChildGuard { fn drop(&mut self) { - let Some(child) = self.child.take() else { + let Some(mut child) = self.child.take() else { return; }; - match Handle::try_current() { - Ok(handle) => { - let _terminate_task = handle.spawn(terminate_stdio_child(child)); - } - Err(_) => { - terminate_stdio_child_now(child); - } + terminate_stdio_child_process(self.process_id, &mut child); + + if let Ok(handle) = Handle::try_current() { + let _wait_task = handle.spawn(wait_stdio_child(child)); } } } -async fn terminate_stdio_child(mut child: Child) { - kill_stdio_child(&mut child); +async fn wait_stdio_child(mut child: Child) { if let Err(err) = child.wait().await { debug!("failed to wait for exec-server stdio child: {err}"); } } -fn terminate_stdio_child_now(mut child: Child) { - kill_stdio_child(&mut child); +#[cfg(unix)] +fn terminate_stdio_child_process(process_group_id: Option, child: &mut Child) { + let Some(process_group_id) = process_group_id else { + kill_stdio_child(child); + return; + }; + + let should_escalate = match terminate_process_group(process_group_id) { + Ok(exists) => exists, + Err(err) => { + debug!("failed to terminate exec-server stdio process group {process_group_id}: {err}"); + false + } + }; + if should_escalate { + spawn(move || { + sleep(STDIO_CHILD_TERM_GRACE_PERIOD); + if let Err(err) = kill_process_group(process_group_id) { + debug!("failed to kill exec-server stdio process group {process_group_id}: {err}"); + } + }); + } +} + +#[cfg(windows)] +fn terminate_stdio_child_process(process_id: Option, child: &mut Child) { + if let Some(process_id) = process_id { + let _ = std::process::Command::new("taskkill") + .arg("/PID") + .arg(process_id.to_string()) + .arg("/T") + .arg("/F") + .output(); + } + kill_stdio_child(child); +} + +#[cfg(not(any(unix, windows)))] +fn terminate_stdio_child_process(_process_id: Option, child: &mut Child) { + kill_stdio_child(child); } fn kill_stdio_child(child: &mut Child) { @@ -171,6 +221,7 @@ fn shell_command_process(shell_command: &str) -> Command { { let mut command = Command::new("sh"); command.arg("-lc").arg(shell_command); + command.process_group(0); command } }