Compare commits

...

4 Commits

Author SHA1 Message Date
Adam Perry
cad7d50b90 codex: address PR review feedback (#26206) 2026-06-04 00:52:34 +00:00
Adam Perry
d7343c2ff8 codex: address PR review feedback (#26206) 2026-06-03 18:39:57 +00:00
Adam Perry
89bcdd96ed codex: fix CI failure on PR #26206 2026-06-03 17:37:15 +00:00
Adam Perry
d41783ce54 Add managed process spawning crate 2026-06-03 16:12:09 +00:00
13 changed files with 763 additions and 0 deletions

10
codex-rs/Cargo.lock generated
View File

@@ -3456,6 +3456,16 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "codex-process"
version = "0.0.0"
dependencies = [
"either",
"tokio",
"tracing",
"tracing-test",
]
[[package]]
name = "codex-process-hardening"
version = "0.0.0"

View File

@@ -67,6 +67,7 @@ members = [
"models-manager",
"network-proxy",
"ollama",
"process",
"process-hardening",
"protocol",
"realtime-webrtc",
@@ -198,6 +199,7 @@ codex-ollama = { path = "ollama" }
codex-otel = { path = "otel" }
codex-plugin = { path = "plugin" }
codex-model-provider = { path = "model-provider" }
codex-process = { path = "process" }
codex-process-hardening = { path = "process-hardening" }
codex-protocol = { path = "protocol" }
codex-realtime-webrtc = { path = "realtime-webrtc" }
@@ -287,6 +289,7 @@ dns-lookup = "3.0.1"
dotenvy = "0.15.7"
dunce = "1.0.4"
ed25519-dalek = { version = "2.2.0", features = ["pkcs8"] }
either = "1.15.0"
encoding_rs = "0.8.35"
env_logger = "0.11.9"
eventsource-stream = "0.2.3"
@@ -483,6 +486,7 @@ unwrap_used = "deny"
ignored = [
"codex-agent-graph-store",
"codex-goal-extension",
"codex-process", # Registered before callers migrate to spawn_managed().
"icu_provider",
"openssl-sys",
"codex-v8-poc",

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "process",
crate_name = "codex_process",
)

View File

@@ -0,0 +1,22 @@
[package]
name = "codex-process"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
name = "codex_process"
path = "src/lib.rs"
doctest = false
[lints]
workspace = true
[dependencies]
either = { workspace = true }
tokio = { workspace = true, features = ["process"] }
tracing = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["io-util", "macros", "rt", "time"] }
tracing-test = { workspace = true }

View File

@@ -0,0 +1,13 @@
use std::io;
/// Extends process command types with Codex-specific process spawning.
///
/// Callers should use this trait to guarantee that child processes are joined.
/// Implementations return the corresponding managed child handle.
pub trait CommandExt {
/// The managed child handle returned after spawning.
type Child;
/// Spawns this command and returns a child handle that must be joined.
fn spawn_managed(&mut self) -> io::Result<Self::Child>;
}

View File

@@ -0,0 +1,39 @@
#[derive(Debug)]
pub(crate) struct DropBomb {
armed: bool,
}
impl DropBomb {
pub(crate) fn new() -> Self {
Self { armed: true }
}
pub(crate) fn disarm(&mut self) {
self.armed = false;
}
#[cfg(test)]
pub(crate) fn is_armed(&self) -> bool {
self.armed
}
}
impl Drop for DropBomb {
fn drop(&mut self) {
if !self.armed {
return;
}
const UNJOINED_CHILD_MESSAGE: &str = "managed child process dropped without being joined";
if cfg!(debug_assertions) && !std::thread::panicking() {
panic!("{UNJOINED_CHILD_MESSAGE}");
}
tracing::error!("{UNJOINED_CHILD_MESSAGE}");
}
}
#[cfg(test)]
#[path = "drop_bomb_tests.rs"]
mod tests;

View File

@@ -0,0 +1,41 @@
use super::DropBomb;
use crate::test_support::UNJOINED_CHILD_MESSAGE;
use crate::test_support::panic_message;
use std::panic::AssertUnwindSafe;
use std::panic::catch_unwind;
use tracing_test::traced_test;
#[test]
fn disarmed_drop_bomb_does_not_report_an_error() {
let mut bomb = DropBomb::new();
bomb.disarm();
}
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "managed child process dropped without being joined")]
fn armed_drop_bomb_panics() {
drop(DropBomb::new());
}
#[cfg(not(debug_assertions))]
#[test]
#[traced_test]
fn armed_drop_bomb_logs_an_error() {
drop(DropBomb::new());
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
}
#[test]
#[traced_test]
fn armed_drop_bomb_logs_instead_of_panicking_during_unwind() {
let panic = catch_unwind(AssertUnwindSafe(|| {
let _bomb = DropBomb::new();
panic!("outer panic");
}))
.expect_err("outer panic should propagate");
assert_eq!(panic_message(panic.as_ref()), "outer panic");
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
}

View File

@@ -0,0 +1,16 @@
//! Codex-specific process spawning helpers.
//!
//! Spawned children must be explicitly joined before their managed handle is
//! dropped. Debug builds enforce this with a drop bomb, while release builds
//! log an error.
mod command_ext;
mod drop_bomb;
pub use command_ext::CommandExt;
pub mod sync;
pub mod tokio;
#[cfg(test)]
mod test_support;

View File

@@ -0,0 +1,110 @@
//! Managed wrappers for [`std::process`].
pub use crate::command_ext::CommandExt;
use crate::drop_bomb::DropBomb;
use either::Either;
use std::io;
use std::ops::Deref;
use std::ops::DerefMut;
use std::process::Child as StdChild;
use std::process::Command;
use std::process::ExitStatus;
use std::process::Output;
impl CommandExt for Command {
type Child = Child;
fn spawn_managed(&mut self) -> io::Result<Self::Child> {
self.spawn().map(Child::new)
}
}
/// A synchronous child process handle that must be explicitly joined.
#[derive(Debug)]
pub struct Child {
// This is an Option only so DropBomb still runs after wait_with_output()
// moves the native child into its consuming join.
child: Option<StdChild>,
bomb: DropBomb,
}
impl Child {
fn new(child: StdChild) -> Self {
Self {
child: Some(child),
bomb: DropBomb::new(),
}
}
/// Waits for the child to exit and disarms the drop bomb on success.
pub fn wait(mut self) -> io::Result<ExitStatus> {
let result = self.child_mut().wait();
if result.is_ok() {
self.bomb.disarm();
}
result
}
/// Returns the child's exit status without blocking.
///
/// Returns the still-armed child handle when an exit status is not yet
/// available.
pub fn try_wait(mut self) -> io::Result<Either<ExitStatus, Self>> {
match self.child_mut().try_wait()? {
Some(status) => {
self.bomb.disarm();
Ok(Either::Left(status))
}
None => Ok(Either::Right(self)),
}
}
/// Waits for the child to exit and collects its output.
///
/// The drop bomb remains armed until this consuming operation returns.
pub fn wait_with_output(mut self) -> io::Result<Output> {
let child = self.take_child();
let result = child.wait_with_output();
self.bomb.disarm();
result
}
fn child(&self) -> &StdChild {
match self.child.as_ref() {
Some(child) => child,
None => panic!("managed child was made None before its wrapper was dropped"),
}
}
fn child_mut(&mut self) -> &mut StdChild {
match self.child.as_mut() {
Some(child) => child,
None => panic!("managed child was made None before its wrapper was dropped"),
}
}
fn take_child(&mut self) -> StdChild {
match self.child.take() {
Some(child) => child,
None => panic!("managed child was made None before its wrapper was dropped"),
}
}
}
impl Deref for Child {
type Target = StdChild;
fn deref(&self) -> &Self::Target {
self.child()
}
}
impl DerefMut for Child {
fn deref_mut(&mut self) -> &mut Self::Target {
self.child_mut()
}
}
#[cfg(test)]
#[path = "sync_tests.rs"]
mod tests;

View File

@@ -0,0 +1,123 @@
use super::Child;
use super::CommandExt as _;
use crate::test_support;
use crate::test_support::STDERR_TEXT;
use crate::test_support::STDOUT_TEXT;
#[cfg(not(debug_assertions))]
use crate::test_support::UNJOINED_CHILD_MESSAGE;
use either::Either;
use std::io::Read;
use std::ops::DerefMut;
use std::process::Stdio;
use std::time::Duration;
use std::time::Instant;
#[test]
fn wait_disarms_bomb() {
let child = test_support::command("exit-success")
.spawn_managed()
.expect("spawn helper");
assert!(child.bomb.is_armed());
let status = child.wait().expect("wait for helper");
assert!(status.success());
}
#[test]
fn stdio_is_available_through_deref_mut() {
let mut child = test_support::command("output")
.stdout(Stdio::piped())
.spawn_managed()
.expect("spawn helper");
let mut stdout = child.stdout.take().expect("piped stdout");
let mut output = String::new();
stdout.read_to_string(&mut output).expect("read stdout");
assert!(child.wait().expect("wait for helper").success());
assert!(output.contains(STDOUT_TEXT));
}
#[test]
fn try_wait_keeps_bomb_armed_until_status_is_available() {
let child = test_support::command("sleep")
.spawn_managed()
.expect("spawn helper");
let mut child = match child.try_wait().expect("poll sleeping helper") {
Either::Left(status) => panic!("sleeping helper exited unexpectedly: {status}"),
Either::Right(child) => child,
};
assert!(child.bomb.is_armed());
child.kill().expect("kill sleeping helper");
assert!(child.bomb.is_armed());
assert!(!child.wait().expect("wait for killed helper").success());
}
#[test]
fn try_wait_disarms_bomb_when_status_is_available() {
let mut child = test_support::command("exit-success")
.spawn_managed()
.expect("spawn helper");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
child = match child.try_wait().expect("poll helper") {
Either::Left(status) => {
assert!(status.success());
return;
}
Either::Right(child) => child,
};
assert!(child.bomb.is_armed());
assert!(Instant::now() < deadline, "helper did not exit");
std::thread::sleep(Duration::from_millis(10));
}
}
#[test]
fn wait_with_output_collects_output() {
let output = test_support::command("output")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn_managed()
.expect("spawn helper")
.wait_with_output()
.expect("collect helper output");
assert!(output.status.success());
assert!(String::from_utf8_lossy(&output.stdout).contains(STDOUT_TEXT));
assert!(String::from_utf8_lossy(&output.stderr).contains(STDERR_TEXT));
}
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "managed child process dropped without being joined")]
fn dropping_unjoined_child_panics() {
let mut child = test_support::command("sleep")
.spawn_managed()
.expect("spawn helper");
clean_up_without_disarming(&mut child);
drop(child);
}
#[cfg(not(debug_assertions))]
#[test]
#[tracing_test::traced_test]
fn dropping_unjoined_child_logs_an_error() {
let mut child = test_support::command("sleep")
.spawn_managed()
.expect("spawn helper");
clean_up_without_disarming(&mut child);
drop(child);
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
}
fn clean_up_without_disarming(child: &mut Child) {
let child = DerefMut::deref_mut(child);
child.kill().expect("kill sleeping helper");
child.wait().expect("reap sleeping helper");
}

View File

@@ -0,0 +1,46 @@
use std::any::Any;
use std::process::Command;
use std::time::Duration;
const SUBPROCESS_MODE_ENV: &str = "CODEX_PROCESS_TEST_SUBPROCESS_MODE";
pub(crate) const STDOUT_TEXT: &str = "managed stdout";
pub(crate) const STDERR_TEXT: &str = "managed stderr";
pub(crate) const UNJOINED_CHILD_MESSAGE: &str =
"managed child process dropped without being joined";
pub(crate) fn command(mode: &str) -> Command {
let mut command = Command::new(std::env::current_exe().expect("current test binary"));
command
.arg("--exact")
.arg("test_support::subprocess_helper")
.arg("--ignored")
.arg("--nocapture")
.env(SUBPROCESS_MODE_ENV, mode);
command
}
pub(crate) fn panic_message(payload: &(dyn Any + Send)) -> &str {
if let Some(message) = payload.downcast_ref::<&str>() {
message
} else if let Some(message) = payload.downcast_ref::<String>() {
message
} else {
"non-string panic payload"
}
}
#[test]
#[ignore]
fn subprocess_helper() {
match std::env::var(SUBPROCESS_MODE_ENV).as_deref() {
Ok("exit-success") => {}
Ok("output") => {
println!("{STDOUT_TEXT}");
eprintln!("{STDERR_TEXT}");
}
Ok("sleep") => std::thread::sleep(Duration::from_secs(60)),
Ok(mode) => panic!("unsupported subprocess mode: {mode}"),
Err(error) => panic!("missing subprocess mode: {error}"),
}
}

View File

@@ -0,0 +1,110 @@
//! Managed wrappers for [`tokio::process`].
pub use crate::command_ext::CommandExt;
use crate::drop_bomb::DropBomb;
use ::tokio::process::Child as TokioChild;
use ::tokio::process::Command;
use either::Either;
use std::io;
use std::ops::Deref;
use std::ops::DerefMut;
use std::process::ExitStatus;
use std::process::Output;
impl CommandExt for Command {
type Child = Child;
fn spawn_managed(&mut self) -> io::Result<Self::Child> {
self.spawn().map(Child::new)
}
}
/// An asynchronous child process handle that must be explicitly joined.
#[derive(Debug)]
pub struct Child {
// This is an Option only so DropBomb still runs after wait_with_output()
// moves the native child into its consuming join.
child: Option<TokioChild>,
bomb: DropBomb,
}
impl Child {
fn new(child: TokioChild) -> Self {
Self {
child: Some(child),
bomb: DropBomb::new(),
}
}
/// Waits for the child to exit and disarms the drop bomb on success.
pub async fn wait(mut self) -> io::Result<ExitStatus> {
let result = self.child_mut().wait().await;
if result.is_ok() {
self.bomb.disarm();
}
result
}
/// Returns the child's exit status without blocking.
///
/// Returns the still-armed child handle when an exit status is not yet
/// available.
pub fn try_wait(mut self) -> io::Result<Either<ExitStatus, Self>> {
match self.child_mut().try_wait()? {
Some(status) => {
self.bomb.disarm();
Ok(Either::Left(status))
}
None => Ok(Either::Right(self)),
}
}
/// Waits for the child to exit and collects its output.
///
/// The drop bomb remains armed until this consuming operation returns.
pub async fn wait_with_output(mut self) -> io::Result<Output> {
let child = self.take_child();
let result = child.wait_with_output().await;
self.bomb.disarm();
result
}
fn child(&self) -> &TokioChild {
match self.child.as_ref() {
Some(child) => child,
None => panic!("managed child was made None before its wrapper was dropped"),
}
}
fn child_mut(&mut self) -> &mut TokioChild {
match self.child.as_mut() {
Some(child) => child,
None => panic!("managed child was made None before its wrapper was dropped"),
}
}
fn take_child(&mut self) -> TokioChild {
match self.child.take() {
Some(child) => child,
None => panic!("managed child was made None before its wrapper was dropped"),
}
}
}
impl Deref for Child {
type Target = TokioChild;
fn deref(&self) -> &Self::Target {
self.child()
}
}
impl DerefMut for Child {
fn deref_mut(&mut self) -> &mut Self::Target {
self.child_mut()
}
}
#[cfg(test)]
#[path = "tokio_tests.rs"]
mod tests;

View File

@@ -0,0 +1,223 @@
use super::Child;
use super::CommandExt as _;
use crate::test_support;
use crate::test_support::STDERR_TEXT;
use crate::test_support::STDOUT_TEXT;
#[cfg(not(debug_assertions))]
use crate::test_support::UNJOINED_CHILD_MESSAGE;
use ::tokio as tokio_crate;
use either::Either;
use std::ops::DerefMut;
use std::process::Stdio;
use std::time::Duration;
use tokio_crate::io::AsyncReadExt;
use tokio_crate::process::Command;
use tokio_crate::time::Instant;
use tokio_crate::time::sleep;
use tokio_crate::time::timeout;
#[tokio_crate::test]
async fn wait_disarms_bomb() {
let child = command("exit-success")
.spawn_managed()
.expect("spawn helper");
assert!(child.bomb.is_armed());
let status = child.wait().await.expect("wait for helper");
assert!(status.success());
}
#[tokio_crate::test]
async fn stdio_is_available_through_deref_mut() {
let mut child = command("output")
.stdout(Stdio::piped())
.spawn_managed()
.expect("spawn helper");
let mut stdout = child.stdout.take().expect("piped stdout");
let mut output = String::new();
stdout
.read_to_string(&mut output)
.await
.expect("read stdout");
assert!(child.wait().await.expect("wait for helper").success());
assert!(output.contains(STDOUT_TEXT));
}
#[tokio_crate::test]
async fn try_wait_keeps_bomb_armed_until_status_is_available() {
let child = command("sleep").spawn_managed().expect("spawn helper");
let mut child = match child.try_wait().expect("poll sleeping helper") {
Either::Left(status) => panic!("sleeping helper exited unexpectedly: {status}"),
Either::Right(child) => child,
};
assert!(child.bomb.is_armed());
child.start_kill().expect("kill sleeping helper");
assert!(child.bomb.is_armed());
assert!(
!child
.wait()
.await
.expect("wait for killed helper")
.success()
);
}
#[tokio_crate::test]
async fn try_wait_disarms_bomb_when_status_is_available() {
let mut child = command("exit-success")
.spawn_managed()
.expect("spawn helper");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
child = match child.try_wait().expect("poll helper") {
Either::Left(status) => {
assert!(status.success());
return;
}
Either::Right(child) => child,
};
assert!(child.bomb.is_armed());
assert!(Instant::now() < deadline, "helper did not exit");
sleep(Duration::from_millis(10)).await;
}
}
#[tokio_crate::test]
async fn wait_with_output_collects_output() {
let output = command("output")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn_managed()
.expect("spawn helper")
.wait_with_output()
.await
.expect("collect helper output");
assert!(output.status.success());
assert!(String::from_utf8_lossy(&output.stdout).contains(STDOUT_TEXT));
assert!(String::from_utf8_lossy(&output.stderr).contains(STDERR_TEXT));
}
#[tokio_crate::test]
async fn kill_keeps_bomb_armed_until_explicit_wait() {
let mut child = command("sleep").spawn_managed().expect("spawn helper");
child.kill().await.expect("kill sleeping helper");
assert!(child.bomb.is_armed());
assert!(
!child
.wait()
.await
.expect("wait for killed helper")
.success()
);
}
#[cfg(debug_assertions)]
#[tokio_crate::test]
#[should_panic(expected = "managed child process dropped without being joined")]
async fn cancelled_wait_panics_when_dropped() {
let mut command = command("sleep");
command.kill_on_drop(true);
let child = command.spawn_managed().expect("spawn helper");
let mut wait = Box::pin(child.wait());
assert!(
timeout(Duration::from_millis(10), wait.as_mut())
.await
.is_err()
);
drop(wait);
}
#[cfg(not(debug_assertions))]
#[tokio_crate::test]
#[tracing_test::traced_test]
async fn cancelled_wait_logs_an_error_when_dropped() {
let mut command = command("sleep");
command.kill_on_drop(true);
let child = command.spawn_managed().expect("spawn helper");
let mut wait = Box::pin(child.wait());
assert!(
timeout(Duration::from_millis(10), wait.as_mut())
.await
.is_err()
);
drop(wait);
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
}
#[cfg(debug_assertions)]
#[tokio_crate::test]
#[should_panic(expected = "managed child process dropped without being joined")]
async fn cancelled_wait_with_output_panics_when_dropped() {
let mut command = command("sleep");
command.kill_on_drop(true);
let child = command.spawn_managed().expect("spawn helper");
let mut wait = Box::pin(child.wait_with_output());
assert!(
timeout(Duration::from_millis(10), wait.as_mut())
.await
.is_err()
);
drop(wait);
}
#[cfg(not(debug_assertions))]
#[tokio_crate::test]
#[tracing_test::traced_test]
async fn cancelled_wait_with_output_logs_an_error_when_dropped() {
let mut command = command("sleep");
command.kill_on_drop(true);
let child = command.spawn_managed().expect("spawn helper");
let mut wait = Box::pin(child.wait_with_output());
assert!(
timeout(Duration::from_millis(10), wait.as_mut())
.await
.is_err()
);
drop(wait);
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
}
#[cfg(debug_assertions)]
#[tokio_crate::test]
#[should_panic(expected = "managed child process dropped without being joined")]
async fn dropping_unjoined_child_panics() {
let mut child = command("sleep").spawn_managed().expect("spawn helper");
clean_up_without_disarming(&mut child).await;
drop(child);
}
#[cfg(not(debug_assertions))]
#[tokio_crate::test]
#[tracing_test::traced_test]
async fn dropping_unjoined_child_logs_an_error() {
let mut child = command("sleep").spawn_managed().expect("spawn helper");
clean_up_without_disarming(&mut child).await;
drop(child);
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
}
fn command(mode: &str) -> Command {
Command::from(test_support::command(mode))
}
async fn clean_up_without_disarming(child: &mut Child) {
let child = DerefMut::deref_mut(child);
child.start_kill().expect("kill sleeping helper");
child.wait().await.expect("reap sleeping helper");
}