Add managed process wrappers for sync and Tokio children

This commit is contained in:
Adam Perry
2026-05-21 22:15:25 -07:00
parent b7c1f602b0
commit 724faf95da
9 changed files with 548 additions and 213 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -2760,6 +2760,7 @@ dependencies = [
"codex-app-server-protocol",
"codex-client",
"codex-file-system",
"codex-managed-process",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",
@@ -3111,6 +3112,7 @@ name = "codex-managed-process"
version = "0.0.0"
dependencies = [
"libc",
"tokio",
"tracing",
]

View File

@@ -5,19 +5,40 @@ await-holding-invalid-types = [
"tokio::sync::RwLockReadGuard",
"tokio::sync::RwLockWriteGuard",
]
disallowed-methods = [
{ path = "std::process::Command::spawn",
reason = "Don't leak processes on Drop.",
replacement = "codex_managed_process::CommandExt::spawn_managed" },
{ path = "tokio::process::Command::spawn", reason = "Don't leak processes on Drop." },
{ path = "portable_pty::SlavePty::spawn_command", reason = "Don't leak processes on Drop." },
{ path = "ratatui::style::Color::Rgb", reason = "Use ANSI colors, which work better in various terminal themes." },
{ path = "ratatui::style::Color::Indexed", reason = "Use ANSI colors, which work better in various terminal themes." },
{ path = "ratatui::style::Stylize::white", reason = "Avoid hardcoding white; prefer default fg or dim/bold. Exception: Disable this rule if rendering over a hardcoded ANSI background." },
{ path = "ratatui::style::Stylize::black", reason = "Avoid hardcoding black; prefer default fg or dim/bold. Exception: Disable this rule if rendering over a hardcoded ANSI background." },
{ path = "ratatui::style::Stylize::yellow", reason = "Avoid yellow; prefer other colors in `tui/styles.md`." },
]
# Increase the size threshold for result_large_err to accommodate
# richer error variants.
large-error-threshold = 256
[[disallowed-methods]]
path = "std::process::Command::spawn"
reason = "Don't leak processes on Drop."
replacement = "codex_managed_process::CommandExt::spawn_managed"
[[disallowed-methods]]
path = "tokio::process::Command::spawn"
reason = "Don't leak processes on Drop."
replacement = "codex_managed_process::TokioCommandExt::spawn_managed"
[[disallowed-methods]]
path = "portable_pty::SlavePty::spawn_command"
reason = "Don't leak processes on Drop."
[[disallowed-methods]]
path = "ratatui::style::Color::Rgb"
reason = "Use ANSI colors, which work better in various terminal themes."
[[disallowed-methods]]
path = "ratatui::style::Color::Indexed"
reason = "Use ANSI colors, which work better in various terminal themes."
[[disallowed-methods]]
path = "ratatui::style::Stylize::white"
reason = "Avoid hardcoding white; prefer default fg or dim/bold. Exception: Disable this rule if rendering over a hardcoded ANSI background."
[[disallowed-methods]]
path = "ratatui::style::Stylize::black"
reason = "Avoid hardcoding black; prefer default fg or dim/bold. Exception: Disable this rule if rendering over a hardcoded ANSI background."
[[disallowed-methods]]
path = "ratatui::style::Stylize::yellow"
reason = "Avoid yellow; prefer other colors in `tui/styles.md`."

View File

@@ -20,6 +20,7 @@ codex-app-server-protocol = { workspace = true }
codex-api = { workspace = true }
codex-client = { workspace = true }
codex-file-system = { workspace = true }
codex-managed-process = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }

View File

@@ -1,6 +1,8 @@
use std::collections::HashMap;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_managed_process::ManagedTokioChild;
use codex_managed_process::TokioCommandExt;
use codex_protocol::models::PermissionProfile;
use codex_protocol::permissions::FileSystemAccessMode;
use codex_protocol::permissions::FileSystemPath;
@@ -17,6 +19,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_absolute_path::canonicalize_preserving_symlinks;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tracing::warn;
use crate::ExecServerRuntimePaths;
use crate::FileSystemSandboxContext;
@@ -249,12 +252,26 @@ async fn run_command(
request_json: Vec<u8>,
) -> Result<FsHelperPayload, JSONRPCErrorError> {
let mut child = spawn_command(command)?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| internal_error("failed to open fs sandbox helper stdin".to_string()))?;
stdin.write_all(&request_json).await.map_err(io_error)?;
stdin.shutdown().await.map_err(io_error)?;
let Some(mut stdin) = child.stdin.take() else {
if let Err(err) = child.kill_and_wait().await {
warn!("failed to kill fs sandbox helper after missing stdin: {err}");
}
return Err(internal_error(
"failed to open fs sandbox helper stdin".to_string(),
));
};
if let Err(err) = stdin.write_all(&request_json).await {
if let Err(kill_err) = child.kill_and_wait().await {
warn!("failed to kill fs sandbox helper after stdin write failed: {kill_err}");
}
return Err(io_error(err));
}
if let Err(err) = stdin.shutdown().await {
if let Err(kill_err) = child.kill_and_wait().await {
warn!("failed to kill fs sandbox helper after stdin shutdown failed: {kill_err}");
}
return Err(io_error(err));
}
drop(stdin);
let output = child.wait_with_output().await.map_err(io_error)?;
@@ -280,7 +297,7 @@ fn spawn_command(
arg0,
..
}: SandboxExecRequest,
) -> Result<tokio::process::Child, JSONRPCErrorError> {
) -> Result<ManagedTokioChild, JSONRPCErrorError> {
let Some((program, args)) = argv.split_first() else {
return Err(invalid_request("fs sandbox command was empty".to_string()));
};
@@ -298,8 +315,7 @@ fn spawn_command(
command.stdin(std::process::Stdio::piped());
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
#[allow(clippy::disallowed_methods, reason = "Grandfathered-in usage.")]
command.spawn().map_err(io_error)
command.spawn_managed().map_err(io_error)
}
fn io_error(err: std::io::Error) -> JSONRPCErrorError {

View File

@@ -8,10 +8,12 @@ license.workspace = true
workspace = true
[dependencies]
tokio = { workspace = true, features = ["process"] }
tracing = { workspace = true }
[target.'cfg(unix)'.dev-dependencies]
libc = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "time"] }
[lib]
doctest = false

View File

@@ -0,0 +1,74 @@
use std::ffi::OsString;
use std::panic::Location;
#[derive(Debug)]
pub(crate) struct DebugDropBomb {
armed: bool,
program: OsString,
spawn_location: &'static Location<'static>,
}
impl DebugDropBomb {
pub(crate) fn new(program: OsString, spawn_location: &'static Location<'static>) -> Self {
Self {
armed: true,
program,
spawn_location,
}
}
pub(crate) fn defuse(&mut self) {
self.armed = false;
}
}
impl Drop for DebugDropBomb {
fn drop(&mut self) {
if !self.armed {
return;
}
#[cfg(debug_assertions)]
{
panic!(
"managed Tokio child for {:?} spawned at {} dropped without explicit teardown",
self.program, self.spawn_location
);
}
#[cfg(not(debug_assertions))]
tracing::error!(
program = ?self.program,
spawn_location = %self.spawn_location,
"managed Tokio child dropped without explicit teardown"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn defused_bomb_drops() {
let mut bomb = DebugDropBomb::new("test".into(), Location::caller());
bomb.defuse();
}
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "dropped without explicit teardown")]
fn armed_bomb_panics_in_debug() {
drop(DebugDropBomb::new("test".into(), Location::caller()));
}
#[cfg(not(debug_assertions))]
#[test]
fn armed_bomb_does_not_panic_in_release() {
let result = std::panic::catch_unwind(|| {
drop(DebugDropBomb::new("test".into(), Location::caller()));
});
assert!(result.is_ok());
}
}

View File

@@ -1,194 +1,8 @@
//! Child process helpers that keep process lifetime ownership explicit.
use std::ffi::OsString;
use std::io;
use std::ops::Deref;
use std::ops::DerefMut;
use std::process::Child;
use std::process::Command;
use std::process::Output;
use std::thread;
use std::time::Duration;
use std::time::Instant;
pub(crate) mod drop_bomb;
mod sync;
mod tokio;
use tracing::trace;
use tracing::warn;
const DROP_WAIT_TIMEOUT: Duration = Duration::from_secs(1);
const DROP_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
/// Extends [`Command`] with Codex-owned child process spawning.
///
/// Implementations must return a child handle that makes ownership behavior explicit instead of
/// relying on [`Child`]'s no-op drop behavior.
pub trait CommandExt {
/// Spawn the command and return a managed direct-child handle.
fn spawn_managed(&mut self) -> io::Result<ManagedChild>;
}
impl CommandExt for Command {
fn spawn_managed(&mut self) -> io::Result<ManagedChild> {
let program = self.get_program().to_os_string();
#[allow(
clippy::disallowed_methods,
reason = "ManagedChild wraps the raw child handle here."
)]
self.spawn().map(|child| ManagedChild {
child: Some(child),
program,
})
}
}
/// A [`Child`] that best-effort terminates and reaps its direct child process on drop.
///
/// This handle manages only the direct process represented by [`Child`]. Transitive children need
/// their own process-group or process-tree lifetime policy.
#[derive(Debug)]
pub struct ManagedChild {
child: Option<Child>,
program: OsString,
}
impl ManagedChild {
/// Wait for this child to exit and collect its captured output.
#[expect(clippy::expect_used)]
pub fn wait_with_output(mut self) -> io::Result<Output> {
self.child
.take()
.expect("managed child is present until consumed")
.wait_with_output()
}
}
impl Deref for ManagedChild {
type Target = Child;
#[expect(clippy::expect_used)]
fn deref(&self) -> &Self::Target {
self.child
.as_ref()
.expect("managed child is present until consumed")
}
}
impl DerefMut for ManagedChild {
#[expect(clippy::expect_used)]
fn deref_mut(&mut self) -> &mut Self::Target {
self.child
.as_mut()
.expect("managed child is present until consumed")
}
}
impl Drop for ManagedChild {
fn drop(&mut self) {
let Some(child) = self.child.as_mut() else {
// `wait_with_output` takes ownership of the child before this destructor runs.
return;
};
let pid = child.id();
if let Err(error) = child.kill() {
warn!(
pid,
program = ?self.program,
reason = %error,
"failed to kill managed child process during drop"
);
return;
}
match wait_for_exit(child, DROP_WAIT_TIMEOUT) {
Ok(true) => trace!(
pid,
program = ?self.program,
"managed child process exited during drop"
),
Ok(false) => warn!(
pid,
program = ?self.program,
"timed out waiting for managed child process to exit during drop"
),
Err(error) => warn!(
pid,
program = ?self.program,
reason = %error,
"failed to wait for managed child process during drop"
),
}
}
}
/// Wait up to `timeout` for a child to exit and be reaped.
///
/// Returns `Ok(false)` if the child is still running at the deadline.
fn wait_for_exit(child: &mut Child, timeout: Duration) -> io::Result<bool> {
// `Child::try_wait` reaps a finished child but never blocks, so enforce the timeout here.
let deadline = Instant::now() + timeout;
loop {
if child.try_wait()?.is_some() {
return Ok(true);
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Ok(false);
}
thread::sleep(DROP_WAIT_POLL_INTERVAL.min(remaining));
}
}
#[cfg(all(test, unix))]
mod tests {
use std::process::Stdio;
use super::*;
#[test]
fn waits_for_short_lived_managed_child() -> io::Result<()> {
let status = short_lived_command().spawn_managed()?.wait()?;
assert!(status.success());
Ok(())
}
#[test]
fn drop_terminates_direct_child() -> io::Result<()> {
let child = long_lived_command().spawn_managed()?;
let pid = child.id();
drop(child);
assert!(!process_exists(pid));
Ok(())
}
#[test]
fn wait_timeout_path_returns_without_hanging() -> io::Result<()> {
let mut child = long_lived_command().spawn_managed()?;
let exited = wait_for_exit(&mut child, Duration::ZERO)?;
assert!(!exited);
Ok(())
}
fn short_lived_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "exit 0"]);
command
}
fn long_lived_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "sleep 30"]);
command.stdin(Stdio::null());
command.stdout(Stdio::null());
command.stderr(Stdio::null());
command
}
fn process_exists(pid: u32) -> bool {
// SAFETY: `kill` with signal 0 performs existence/permission checks only.
unsafe { libc::kill(pid.cast_signed(), 0) == 0 }
}
}
pub use sync::*;
pub use tokio::*;

View File

@@ -0,0 +1,203 @@
use std::ffi::OsString;
use std::io;
use std::ops::Deref;
use std::ops::DerefMut;
use std::panic::Location;
use std::process::Child;
use std::process::Command;
use std::process::Output;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use tracing::trace;
use tracing::warn;
const DROP_WAIT_TIMEOUT: Duration = Duration::from_secs(1);
const DROP_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
/// Extends [`Command`] with Codex-owned child process spawning.
///
/// Implementations must return a child handle that makes ownership behavior explicit instead of
/// relying on [`Child`]'s no-op drop behavior.
pub trait CommandExt {
/// Spawn the command and return a managed direct-child handle.
#[track_caller]
fn spawn_managed(&mut self) -> io::Result<ManagedChild>;
}
impl CommandExt for Command {
#[track_caller]
fn spawn_managed(&mut self) -> io::Result<ManagedChild> {
let program = self.get_program().to_os_string();
let spawn_location = Location::caller();
#[allow(
clippy::disallowed_methods,
reason = "ManagedChild wraps the raw child handle here."
)]
self.spawn().map(|child| ManagedChild {
child: Some(child),
program,
spawn_location,
})
}
}
/// A [`Child`] that best-effort terminates and reaps its direct child process on drop.
///
/// This handle manages only the direct process represented by [`Child`]. Transitive children need
/// their own process-group or process-tree lifetime policy.
#[derive(Debug)]
pub struct ManagedChild {
child: Option<Child>,
program: OsString,
spawn_location: &'static Location<'static>,
}
impl ManagedChild {
/// Wait for this child to exit and collect its captured output.
#[expect(clippy::expect_used)]
pub fn wait_with_output(mut self) -> io::Result<Output> {
self.child
.take()
.expect("managed child is present until consumed")
.wait_with_output()
}
}
impl Deref for ManagedChild {
type Target = Child;
#[expect(clippy::expect_used)]
fn deref(&self) -> &Self::Target {
self.child
.as_ref()
.expect("managed child is present until consumed")
}
}
impl DerefMut for ManagedChild {
#[expect(clippy::expect_used)]
fn deref_mut(&mut self) -> &mut Self::Target {
self.child
.as_mut()
.expect("managed child is present until consumed")
}
}
impl Drop for ManagedChild {
fn drop(&mut self) {
let Some(child) = self.child.as_mut() else {
// `wait_with_output` takes ownership of the child before this destructor runs.
return;
};
let pid = child.id();
if let Err(error) = child.kill() {
warn!(
pid,
program = ?self.program,
spawn_location = %self.spawn_location,
reason = %error,
"failed to kill managed child process during drop"
);
return;
}
match wait_for_exit(child, DROP_WAIT_TIMEOUT) {
Ok(true) => trace!(
pid,
program = ?self.program,
spawn_location = %self.spawn_location,
"managed child process exited during drop"
),
Ok(false) => warn!(
pid,
program = ?self.program,
spawn_location = %self.spawn_location,
"timed out waiting for managed child process to exit during drop"
),
Err(error) => warn!(
pid,
program = ?self.program,
spawn_location = %self.spawn_location,
reason = %error,
"failed to wait for managed child process during drop"
),
}
}
}
/// Wait up to `timeout` for a child to exit and be reaped.
///
/// Returns `Ok(false)` if the child is still running at the deadline.
fn wait_for_exit(child: &mut Child, timeout: Duration) -> io::Result<bool> {
// `Child::try_wait` reaps a finished child but never blocks, so enforce the timeout here.
let deadline = Instant::now() + timeout;
loop {
if child.try_wait()?.is_some() {
return Ok(true);
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Ok(false);
}
thread::sleep(DROP_WAIT_POLL_INTERVAL.min(remaining));
}
}
// FIXME: Expand these process tests to cover Windows.
#[cfg(all(test, unix))]
mod tests {
use std::process::Stdio;
use super::*;
#[test]
fn waits_for_short_lived_managed_child() -> io::Result<()> {
let status = short_lived_command().spawn_managed()?.wait()?;
assert!(status.success());
Ok(())
}
#[test]
fn drop_terminates_direct_child() -> io::Result<()> {
let child = long_lived_command().spawn_managed()?;
let pid = child.id();
drop(child);
assert!(!process_exists(pid));
Ok(())
}
#[test]
fn wait_timeout_path_returns_without_hanging() -> io::Result<()> {
let mut child = long_lived_command().spawn_managed()?;
let exited = wait_for_exit(&mut child, Duration::ZERO)?;
assert!(!exited);
Ok(())
}
fn short_lived_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "exit 0"]);
command
}
fn long_lived_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "sleep 30"]);
command.stdin(Stdio::null());
command.stdout(Stdio::null());
command.stderr(Stdio::null());
command
}
fn process_exists(pid: u32) -> bool {
// SAFETY: `kill` with signal 0 performs existence/permission checks only.
unsafe { libc::kill(pid.cast_signed(), 0) == 0 }
}
}

View File

@@ -0,0 +1,202 @@
use std::ffi::OsString;
use std::io;
use std::ops::Deref;
use std::ops::DerefMut;
use std::panic::Location;
use std::process::ExitStatus;
use std::process::Output;
use ::tokio::process::Child;
use ::tokio::process::Command;
use crate::drop_bomb::DebugDropBomb;
/// Extends Tokio [`Command`] with Codex-owned child process spawning.
///
/// Implementations must return a child handle that requires callers to explicitly wait for or
/// terminate the spawned process.
pub trait TokioCommandExt {
/// Spawn the command and return a managed direct-child handle.
#[track_caller]
fn spawn_managed(&mut self) -> io::Result<ManagedTokioChild>;
}
impl TokioCommandExt for Command {
#[track_caller]
fn spawn_managed(&mut self) -> io::Result<ManagedTokioChild> {
let program = self.as_std().get_program().to_os_string();
let spawn_location = Location::caller();
// Prefer the explicit terminal methods below; this is only a fallback if ownership escapes.
self.kill_on_drop(true);
#[allow(
clippy::disallowed_methods,
reason = "ManagedTokioChild wraps the raw child handle here."
)]
self.spawn()
.map(|child| ManagedTokioChild::new(child, program, spawn_location))
}
}
/// A Tokio [`Child`] that requires explicit asynchronous teardown via [`Self::wait`],
/// [`Self::wait_with_output`], or [`Self::kill_and_wait`].
///
/// Violating this requirement panics in debug builds.
#[derive(Debug)]
pub struct ManagedTokioChild {
// This only becomes `None` in consuming terminal methods such as `wait_with_output`.
child: Option<Child>,
drop_bomb: DebugDropBomb,
}
impl ManagedTokioChild {
fn new(child: Child, program: OsString, spawn_location: &'static Location<'static>) -> Self {
Self {
child: Some(child),
drop_bomb: DebugDropBomb::new(program, spawn_location),
}
}
/// Wait for this child to exit.
#[expect(clippy::expect_used)]
pub async fn wait(mut self) -> io::Result<ExitStatus> {
let result = self
.child
.as_mut()
.expect("managed Tokio child is present until consumed")
.wait()
.await;
self.drop_bomb.defuse();
result
}
/// Wait for this child to exit and collect its captured output.
#[expect(clippy::expect_used)]
pub async fn wait_with_output(mut self) -> io::Result<Output> {
let result = self
.child
.take()
.expect("managed Tokio child is present until consumed")
.wait_with_output()
.await;
self.drop_bomb.defuse();
result
}
/// Kill this child and wait for it to exit.
#[expect(clippy::expect_used)]
pub async fn kill_and_wait(mut self) -> io::Result<()> {
let result = self
.child
.as_mut()
.expect("managed Tokio child is present until consumed")
.kill()
.await;
self.drop_bomb.defuse();
result
}
}
impl Deref for ManagedTokioChild {
type Target = Child;
#[expect(clippy::expect_used)]
fn deref(&self) -> &Self::Target {
self.child
.as_ref()
.expect("managed Tokio child is present until consumed")
}
}
impl DerefMut for ManagedTokioChild {
#[expect(clippy::expect_used)]
fn deref_mut(&mut self) -> &mut Self::Target {
self.child
.as_mut()
.expect("managed Tokio child is present until consumed")
}
}
// FIXME: Expand these process tests to cover Windows.
#[cfg(all(test, unix))]
mod tests {
use std::process::Stdio;
use ::tokio::time::Duration;
use ::tokio::time::sleep;
use super::*;
#[tokio::test]
async fn waits_for_short_lived_managed_child() -> io::Result<()> {
let status = short_lived_command().spawn_managed()?.wait().await?;
assert!(status.success());
Ok(())
}
#[tokio::test]
async fn waits_for_managed_child_output() -> io::Result<()> {
let output = output_command().spawn_managed()?.wait_with_output().await?;
assert_eq!(output.stdout, b"managed\n");
Ok(())
}
#[tokio::test]
async fn kill_and_wait_terminates_direct_child() -> io::Result<()> {
let child = long_lived_command().spawn_managed()?;
let pid = child.id();
child.kill_and_wait().await?;
assert!(!process_exists(pid.expect("child should have a PID")));
Ok(())
}
#[tokio::test]
#[cfg(debug_assertions)]
#[should_panic(expected = "dropped without explicit teardown")]
async fn drop_without_teardown_panics_in_debug() {
drop(long_lived_command().spawn_managed().expect("spawn child"));
}
#[tokio::test]
#[cfg(debug_assertions)]
#[should_panic(expected = "dropped without explicit teardown")]
async fn cancelled_wait_panics_in_debug() {
let child = long_lived_command().spawn_managed().expect("spawn child");
let wait = child.wait();
::tokio::pin!(wait);
::tokio::select! {
_ = sleep(Duration::from_millis(10)) => {}
_ = &mut wait => panic!("long-lived child should not exit first"),
}
}
fn short_lived_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "exit 0"]);
command
}
fn output_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "printf 'managed\n'"]);
command.stdout(Stdio::piped());
command
}
fn long_lived_command() -> Command {
let mut command = Command::new("/bin/sh");
command.args(["-c", "sleep 30"]);
command.stdin(Stdio::null());
command.stdout(Stdio::null());
command.stderr(Stdio::null());
command
}
fn process_exists(pid: u32) -> bool {
// SAFETY: `kill` with signal 0 performs existence/permission checks only.
unsafe { libc::kill(pid.cast_signed(), 0) == 0 }
}
}