Clean up stdio client process groups

Use the existing process-group cleanup pattern for stdio command transports so wrapper shell children are terminated with the client lifetime. Add a regression test that drops the client after spawning a background shell child through the command-backed transport.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-05-04 12:03:46 -07:00
parent 7834efe652
commit 881e7b5ddf
2 changed files with 140 additions and 13 deletions

View File

@@ -872,6 +872,10 @@ mod tests {
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use pretty_assertions::assert_eq;
#[cfg(unix)]
use std::path::Path;
#[cfg(unix)]
use std::process::Command;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
@@ -879,6 +883,8 @@ mod tests {
use tokio::io::duplex;
use tokio::sync::mpsc;
use tokio::time::Duration;
#[cfg(unix)]
use tokio::time::sleep;
use tokio::time::timeout;
use super::ExecServerClient;
@@ -938,6 +944,76 @@ mod tests {
assert_eq!(client.session_id().as_deref(), Some("stdio-test"));
}
#[cfg(unix)]
#[tokio::test]
async fn dropping_stdio_client_terminates_shell_process_group() {
let tempdir = tempfile::tempdir().expect("tempdir should be created");
let pid_file = tempdir.path().join("child.pid");
let shell_command = format!(
"read _line; \
(trap 'exit 0' TERM; while true; do sleep 1; done) & \
child=$!; \
echo \"$child\" > {}; \
printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \
read _line; \
wait \"$child\"",
shell_quote(pid_file.as_path()),
);
let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
shell_command,
client_name: "stdio-test-client".to_string(),
initialize_timeout: Duration::from_secs(1),
resume_session_id: None,
})
.await
.expect("stdio client should connect");
let child_pid = read_pid_file(pid_file.as_path()).await;
assert!(
process_exists(child_pid),
"wrapper child process should be running before client drop"
);
drop(client);
for _ in 0..20 {
if !process_exists(child_pid) {
return;
}
sleep(Duration::from_millis(100)).await;
}
panic!("wrapper child process {child_pid} should exit after client drop");
}
#[cfg(unix)]
async fn read_pid_file(path: &Path) -> u32 {
for _ in 0..20 {
if let Ok(contents) = std::fs::read_to_string(path) {
return contents
.trim()
.parse()
.expect("pid file should contain a pid");
}
sleep(Duration::from_millis(50)).await;
}
panic!("pid file {} should be written", path.display());
}
#[cfg(unix)]
fn process_exists(pid: u32) -> bool {
Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.status()
.is_ok_and(|status| status.success())
}
#[cfg(unix)]
fn shell_quote(path: &Path) -> String {
let value = path.to_string_lossy();
format!("'{}'", value.replace('\'', "'\\''"))
}
#[tokio::test]
async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() {
let (client_stdin, server_reader) = duplex(1 << 20);

View File

@@ -1,6 +1,14 @@
use std::process::Stdio;
#[cfg(unix)]
use std::thread::sleep;
#[cfg(unix)]
use std::thread::spawn;
use std::time::Duration;
#[cfg(unix)]
use codex_utils_pty::process_group::kill_process_group;
#[cfg(unix)]
use codex_utils_pty::process_group::terminate_process_group;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Child;
@@ -21,6 +29,8 @@ use crate::connection::JsonRpcConnection;
const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
const ENVIRONMENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(unix)]
const STDIO_CHILD_TERM_GRACE_PERIOD: Duration = Duration::from_millis(500);
impl ExecServerTransport {
pub(crate) async fn connect_for_environment(self) -> Result<ExecServerClient, ExecServerError> {
@@ -80,11 +90,13 @@ impl ExecServerClient {
) -> Result<Self, ExecServerError> {
let shell_command = args.shell_command.clone();
let mut child = shell_command_process(&shell_command)
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(ExecServerError::Spawn)?;
let process_id = child.id();
let stdin = child.stdin.take().ok_or_else(|| {
ExecServerError::Protocol("spawned exec-server command has no stdin".to_string())
@@ -114,7 +126,10 @@ impl ExecServerClient {
stdin,
format!("exec-server stdio command `{shell_command}`"),
)
.with_transport_lifetime(Box::new(StdioChildGuard { child: Some(child) })),
.with_transport_lifetime(Box::new(StdioChildGuard {
child: Some(child),
process_id,
})),
args.into(),
)
.await
@@ -123,34 +138,69 @@ impl ExecServerClient {
struct StdioChildGuard {
child: Option<Child>,
process_id: Option<u32>,
}
impl Drop for StdioChildGuard {
fn drop(&mut self) {
let Some(child) = self.child.take() else {
let Some(mut child) = self.child.take() else {
return;
};
match Handle::try_current() {
Ok(handle) => {
let _terminate_task = handle.spawn(terminate_stdio_child(child));
}
Err(_) => {
terminate_stdio_child_now(child);
}
terminate_stdio_child_process(self.process_id, &mut child);
if let Ok(handle) = Handle::try_current() {
let _wait_task = handle.spawn(wait_stdio_child(child));
}
}
}
async fn terminate_stdio_child(mut child: Child) {
kill_stdio_child(&mut child);
async fn wait_stdio_child(mut child: Child) {
if let Err(err) = child.wait().await {
debug!("failed to wait for exec-server stdio child: {err}");
}
}
fn terminate_stdio_child_now(mut child: Child) {
kill_stdio_child(&mut child);
#[cfg(unix)]
fn terminate_stdio_child_process(process_group_id: Option<u32>, child: &mut Child) {
let Some(process_group_id) = process_group_id else {
kill_stdio_child(child);
return;
};
let should_escalate = match terminate_process_group(process_group_id) {
Ok(exists) => exists,
Err(err) => {
debug!("failed to terminate exec-server stdio process group {process_group_id}: {err}");
false
}
};
if should_escalate {
spawn(move || {
sleep(STDIO_CHILD_TERM_GRACE_PERIOD);
if let Err(err) = kill_process_group(process_group_id) {
debug!("failed to kill exec-server stdio process group {process_group_id}: {err}");
}
});
}
}
#[cfg(windows)]
fn terminate_stdio_child_process(process_id: Option<u32>, child: &mut Child) {
if let Some(process_id) = process_id {
let _ = std::process::Command::new("taskkill")
.arg("/PID")
.arg(process_id.to_string())
.arg("/T")
.arg("/F")
.output();
}
kill_stdio_child(child);
}
#[cfg(not(any(unix, windows)))]
fn terminate_stdio_child_process(_process_id: Option<u32>, child: &mut Child) {
kill_stdio_child(child);
}
fn kill_stdio_child(child: &mut Child) {
@@ -171,6 +221,7 @@ fn shell_command_process(shell_command: &str) -> Command {
{
let mut command = Command::new("sh");
command.arg("-lc").arg(shell_command);
command.process_group(0);
command
}
}