mirror of
https://github.com/openai/codex.git
synced 2026-06-04 04:12:03 +00:00
Compare commits
4 Commits
cconger/co
...
codex/add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cad7d50b90 | ||
|
|
d7343c2ff8 | ||
|
|
89bcdd96ed | ||
|
|
d41783ce54 |
10
codex-rs/Cargo.lock
generated
10
codex-rs/Cargo.lock
generated
@@ -3456,6 +3456,16 @@ dependencies = [
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-process"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"either",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-test",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-process-hardening"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -67,6 +67,7 @@ members = [
|
||||
"models-manager",
|
||||
"network-proxy",
|
||||
"ollama",
|
||||
"process",
|
||||
"process-hardening",
|
||||
"protocol",
|
||||
"realtime-webrtc",
|
||||
@@ -198,6 +199,7 @@ codex-ollama = { path = "ollama" }
|
||||
codex-otel = { path = "otel" }
|
||||
codex-plugin = { path = "plugin" }
|
||||
codex-model-provider = { path = "model-provider" }
|
||||
codex-process = { path = "process" }
|
||||
codex-process-hardening = { path = "process-hardening" }
|
||||
codex-protocol = { path = "protocol" }
|
||||
codex-realtime-webrtc = { path = "realtime-webrtc" }
|
||||
@@ -287,6 +289,7 @@ dns-lookup = "3.0.1"
|
||||
dotenvy = "0.15.7"
|
||||
dunce = "1.0.4"
|
||||
ed25519-dalek = { version = "2.2.0", features = ["pkcs8"] }
|
||||
either = "1.15.0"
|
||||
encoding_rs = "0.8.35"
|
||||
env_logger = "0.11.9"
|
||||
eventsource-stream = "0.2.3"
|
||||
@@ -483,6 +486,7 @@ unwrap_used = "deny"
|
||||
ignored = [
|
||||
"codex-agent-graph-store",
|
||||
"codex-goal-extension",
|
||||
"codex-process", # Registered before callers migrate to spawn_managed().
|
||||
"icu_provider",
|
||||
"openssl-sys",
|
||||
"codex-v8-poc",
|
||||
|
||||
6
codex-rs/process/BUILD.bazel
Normal file
6
codex-rs/process/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "process",
|
||||
crate_name = "codex_process",
|
||||
)
|
||||
22
codex-rs/process/Cargo.toml
Normal file
22
codex-rs/process/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "codex-process"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "codex_process"
|
||||
path = "src/lib.rs"
|
||||
doctest = false
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
either = { workspace = true }
|
||||
tokio = { workspace = true, features = ["process"] }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["io-util", "macros", "rt", "time"] }
|
||||
tracing-test = { workspace = true }
|
||||
13
codex-rs/process/src/command_ext.rs
Normal file
13
codex-rs/process/src/command_ext.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use std::io;
|
||||
|
||||
/// Extends process command types with Codex-specific process spawning.
|
||||
///
|
||||
/// Callers should use this trait to guarantee that child processes are joined.
|
||||
/// Implementations return the corresponding managed child handle.
|
||||
pub trait CommandExt {
|
||||
/// The managed child handle returned after spawning.
|
||||
type Child;
|
||||
|
||||
/// Spawns this command and returns a child handle that must be joined.
|
||||
fn spawn_managed(&mut self) -> io::Result<Self::Child>;
|
||||
}
|
||||
39
codex-rs/process/src/drop_bomb.rs
Normal file
39
codex-rs/process/src/drop_bomb.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DropBomb {
|
||||
armed: bool,
|
||||
}
|
||||
|
||||
impl DropBomb {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self { armed: true }
|
||||
}
|
||||
|
||||
pub(crate) fn disarm(&mut self) {
|
||||
self.armed = false;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn is_armed(&self) -> bool {
|
||||
self.armed
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DropBomb {
|
||||
fn drop(&mut self) {
|
||||
if !self.armed {
|
||||
return;
|
||||
}
|
||||
|
||||
const UNJOINED_CHILD_MESSAGE: &str = "managed child process dropped without being joined";
|
||||
|
||||
if cfg!(debug_assertions) && !std::thread::panicking() {
|
||||
panic!("{UNJOINED_CHILD_MESSAGE}");
|
||||
}
|
||||
|
||||
tracing::error!("{UNJOINED_CHILD_MESSAGE}");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "drop_bomb_tests.rs"]
|
||||
mod tests;
|
||||
41
codex-rs/process/src/drop_bomb_tests.rs
Normal file
41
codex-rs/process/src/drop_bomb_tests.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use super::DropBomb;
|
||||
use crate::test_support::UNJOINED_CHILD_MESSAGE;
|
||||
use crate::test_support::panic_message;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::panic::catch_unwind;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
#[test]
|
||||
fn disarmed_drop_bomb_does_not_report_an_error() {
|
||||
let mut bomb = DropBomb::new();
|
||||
bomb.disarm();
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[test]
|
||||
#[should_panic(expected = "managed child process dropped without being joined")]
|
||||
fn armed_drop_bomb_panics() {
|
||||
drop(DropBomb::new());
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[test]
|
||||
#[traced_test]
|
||||
fn armed_drop_bomb_logs_an_error() {
|
||||
drop(DropBomb::new());
|
||||
|
||||
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[traced_test]
|
||||
fn armed_drop_bomb_logs_instead_of_panicking_during_unwind() {
|
||||
let panic = catch_unwind(AssertUnwindSafe(|| {
|
||||
let _bomb = DropBomb::new();
|
||||
panic!("outer panic");
|
||||
}))
|
||||
.expect_err("outer panic should propagate");
|
||||
|
||||
assert_eq!(panic_message(panic.as_ref()), "outer panic");
|
||||
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
|
||||
}
|
||||
16
codex-rs/process/src/lib.rs
Normal file
16
codex-rs/process/src/lib.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
//! Codex-specific process spawning helpers.
|
||||
//!
|
||||
//! Spawned children must be explicitly joined before their managed handle is
|
||||
//! dropped. Debug builds enforce this with a drop bomb, while release builds
|
||||
//! log an error.
|
||||
|
||||
mod command_ext;
|
||||
mod drop_bomb;
|
||||
|
||||
pub use command_ext::CommandExt;
|
||||
|
||||
pub mod sync;
|
||||
pub mod tokio;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_support;
|
||||
110
codex-rs/process/src/sync.rs
Normal file
110
codex-rs/process/src/sync.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
//! Managed wrappers for [`std::process`].
|
||||
|
||||
pub use crate::command_ext::CommandExt;
|
||||
use crate::drop_bomb::DropBomb;
|
||||
use either::Either;
|
||||
use std::io;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::process::Child as StdChild;
|
||||
use std::process::Command;
|
||||
use std::process::ExitStatus;
|
||||
use std::process::Output;
|
||||
|
||||
impl CommandExt for Command {
|
||||
type Child = Child;
|
||||
|
||||
fn spawn_managed(&mut self) -> io::Result<Self::Child> {
|
||||
self.spawn().map(Child::new)
|
||||
}
|
||||
}
|
||||
|
||||
/// A synchronous child process handle that must be explicitly joined.
|
||||
#[derive(Debug)]
|
||||
pub struct Child {
|
||||
// This is an Option only so DropBomb still runs after wait_with_output()
|
||||
// moves the native child into its consuming join.
|
||||
child: Option<StdChild>,
|
||||
bomb: DropBomb,
|
||||
}
|
||||
|
||||
impl Child {
|
||||
fn new(child: StdChild) -> Self {
|
||||
Self {
|
||||
child: Some(child),
|
||||
bomb: DropBomb::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for the child to exit and disarms the drop bomb on success.
|
||||
pub fn wait(mut self) -> io::Result<ExitStatus> {
|
||||
let result = self.child_mut().wait();
|
||||
if result.is_ok() {
|
||||
self.bomb.disarm();
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the child's exit status without blocking.
|
||||
///
|
||||
/// Returns the still-armed child handle when an exit status is not yet
|
||||
/// available.
|
||||
pub fn try_wait(mut self) -> io::Result<Either<ExitStatus, Self>> {
|
||||
match self.child_mut().try_wait()? {
|
||||
Some(status) => {
|
||||
self.bomb.disarm();
|
||||
Ok(Either::Left(status))
|
||||
}
|
||||
None => Ok(Either::Right(self)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for the child to exit and collects its output.
|
||||
///
|
||||
/// The drop bomb remains armed until this consuming operation returns.
|
||||
pub fn wait_with_output(mut self) -> io::Result<Output> {
|
||||
let child = self.take_child();
|
||||
let result = child.wait_with_output();
|
||||
self.bomb.disarm();
|
||||
result
|
||||
}
|
||||
|
||||
fn child(&self) -> &StdChild {
|
||||
match self.child.as_ref() {
|
||||
Some(child) => child,
|
||||
None => panic!("managed child was made None before its wrapper was dropped"),
|
||||
}
|
||||
}
|
||||
|
||||
fn child_mut(&mut self) -> &mut StdChild {
|
||||
match self.child.as_mut() {
|
||||
Some(child) => child,
|
||||
None => panic!("managed child was made None before its wrapper was dropped"),
|
||||
}
|
||||
}
|
||||
|
||||
fn take_child(&mut self) -> StdChild {
|
||||
match self.child.take() {
|
||||
Some(child) => child,
|
||||
None => panic!("managed child was made None before its wrapper was dropped"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Child {
|
||||
type Target = StdChild;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.child()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Child {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.child_mut()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "sync_tests.rs"]
|
||||
mod tests;
|
||||
123
codex-rs/process/src/sync_tests.rs
Normal file
123
codex-rs/process/src/sync_tests.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use super::Child;
|
||||
use super::CommandExt as _;
|
||||
use crate::test_support;
|
||||
use crate::test_support::STDERR_TEXT;
|
||||
use crate::test_support::STDOUT_TEXT;
|
||||
#[cfg(not(debug_assertions))]
|
||||
use crate::test_support::UNJOINED_CHILD_MESSAGE;
|
||||
use either::Either;
|
||||
use std::io::Read;
|
||||
use std::ops::DerefMut;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
#[test]
|
||||
fn wait_disarms_bomb() {
|
||||
let child = test_support::command("exit-success")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
assert!(child.bomb.is_armed());
|
||||
|
||||
let status = child.wait().expect("wait for helper");
|
||||
assert!(status.success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stdio_is_available_through_deref_mut() {
|
||||
let mut child = test_support::command("output")
|
||||
.stdout(Stdio::piped())
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
let mut stdout = child.stdout.take().expect("piped stdout");
|
||||
let mut output = String::new();
|
||||
stdout.read_to_string(&mut output).expect("read stdout");
|
||||
|
||||
assert!(child.wait().expect("wait for helper").success());
|
||||
assert!(output.contains(STDOUT_TEXT));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_wait_keeps_bomb_armed_until_status_is_available() {
|
||||
let child = test_support::command("sleep")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
|
||||
let mut child = match child.try_wait().expect("poll sleeping helper") {
|
||||
Either::Left(status) => panic!("sleeping helper exited unexpectedly: {status}"),
|
||||
Either::Right(child) => child,
|
||||
};
|
||||
assert!(child.bomb.is_armed());
|
||||
|
||||
child.kill().expect("kill sleeping helper");
|
||||
assert!(child.bomb.is_armed());
|
||||
assert!(!child.wait().expect("wait for killed helper").success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_wait_disarms_bomb_when_status_is_available() {
|
||||
let mut child = test_support::command("exit-success")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
child = match child.try_wait().expect("poll helper") {
|
||||
Either::Left(status) => {
|
||||
assert!(status.success());
|
||||
return;
|
||||
}
|
||||
Either::Right(child) => child,
|
||||
};
|
||||
assert!(child.bomb.is_armed());
|
||||
assert!(Instant::now() < deadline, "helper did not exit");
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_with_output_collects_output() {
|
||||
let output = test_support::command("output")
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn_managed()
|
||||
.expect("spawn helper")
|
||||
.wait_with_output()
|
||||
.expect("collect helper output");
|
||||
|
||||
assert!(output.status.success());
|
||||
assert!(String::from_utf8_lossy(&output.stdout).contains(STDOUT_TEXT));
|
||||
assert!(String::from_utf8_lossy(&output.stderr).contains(STDERR_TEXT));
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[test]
|
||||
#[should_panic(expected = "managed child process dropped without being joined")]
|
||||
fn dropping_unjoined_child_panics() {
|
||||
let mut child = test_support::command("sleep")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
clean_up_without_disarming(&mut child);
|
||||
|
||||
drop(child);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[test]
|
||||
#[tracing_test::traced_test]
|
||||
fn dropping_unjoined_child_logs_an_error() {
|
||||
let mut child = test_support::command("sleep")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
clean_up_without_disarming(&mut child);
|
||||
|
||||
drop(child);
|
||||
|
||||
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
|
||||
}
|
||||
|
||||
fn clean_up_without_disarming(child: &mut Child) {
|
||||
let child = DerefMut::deref_mut(child);
|
||||
child.kill().expect("kill sleeping helper");
|
||||
child.wait().expect("reap sleeping helper");
|
||||
}
|
||||
46
codex-rs/process/src/test_support.rs
Normal file
46
codex-rs/process/src/test_support.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use std::any::Any;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
|
||||
const SUBPROCESS_MODE_ENV: &str = "CODEX_PROCESS_TEST_SUBPROCESS_MODE";
|
||||
|
||||
pub(crate) const STDOUT_TEXT: &str = "managed stdout";
|
||||
pub(crate) const STDERR_TEXT: &str = "managed stderr";
|
||||
pub(crate) const UNJOINED_CHILD_MESSAGE: &str =
|
||||
"managed child process dropped without being joined";
|
||||
|
||||
pub(crate) fn command(mode: &str) -> Command {
|
||||
let mut command = Command::new(std::env::current_exe().expect("current test binary"));
|
||||
command
|
||||
.arg("--exact")
|
||||
.arg("test_support::subprocess_helper")
|
||||
.arg("--ignored")
|
||||
.arg("--nocapture")
|
||||
.env(SUBPROCESS_MODE_ENV, mode);
|
||||
command
|
||||
}
|
||||
|
||||
pub(crate) fn panic_message(payload: &(dyn Any + Send)) -> &str {
|
||||
if let Some(message) = payload.downcast_ref::<&str>() {
|
||||
message
|
||||
} else if let Some(message) = payload.downcast_ref::<String>() {
|
||||
message
|
||||
} else {
|
||||
"non-string panic payload"
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn subprocess_helper() {
|
||||
match std::env::var(SUBPROCESS_MODE_ENV).as_deref() {
|
||||
Ok("exit-success") => {}
|
||||
Ok("output") => {
|
||||
println!("{STDOUT_TEXT}");
|
||||
eprintln!("{STDERR_TEXT}");
|
||||
}
|
||||
Ok("sleep") => std::thread::sleep(Duration::from_secs(60)),
|
||||
Ok(mode) => panic!("unsupported subprocess mode: {mode}"),
|
||||
Err(error) => panic!("missing subprocess mode: {error}"),
|
||||
}
|
||||
}
|
||||
110
codex-rs/process/src/tokio.rs
Normal file
110
codex-rs/process/src/tokio.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
//! Managed wrappers for [`tokio::process`].
|
||||
|
||||
pub use crate::command_ext::CommandExt;
|
||||
use crate::drop_bomb::DropBomb;
|
||||
use ::tokio::process::Child as TokioChild;
|
||||
use ::tokio::process::Command;
|
||||
use either::Either;
|
||||
use std::io;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::process::ExitStatus;
|
||||
use std::process::Output;
|
||||
|
||||
impl CommandExt for Command {
|
||||
type Child = Child;
|
||||
|
||||
fn spawn_managed(&mut self) -> io::Result<Self::Child> {
|
||||
self.spawn().map(Child::new)
|
||||
}
|
||||
}
|
||||
|
||||
/// An asynchronous child process handle that must be explicitly joined.
|
||||
#[derive(Debug)]
|
||||
pub struct Child {
|
||||
// This is an Option only so DropBomb still runs after wait_with_output()
|
||||
// moves the native child into its consuming join.
|
||||
child: Option<TokioChild>,
|
||||
bomb: DropBomb,
|
||||
}
|
||||
|
||||
impl Child {
|
||||
fn new(child: TokioChild) -> Self {
|
||||
Self {
|
||||
child: Some(child),
|
||||
bomb: DropBomb::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for the child to exit and disarms the drop bomb on success.
|
||||
pub async fn wait(mut self) -> io::Result<ExitStatus> {
|
||||
let result = self.child_mut().wait().await;
|
||||
if result.is_ok() {
|
||||
self.bomb.disarm();
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the child's exit status without blocking.
|
||||
///
|
||||
/// Returns the still-armed child handle when an exit status is not yet
|
||||
/// available.
|
||||
pub fn try_wait(mut self) -> io::Result<Either<ExitStatus, Self>> {
|
||||
match self.child_mut().try_wait()? {
|
||||
Some(status) => {
|
||||
self.bomb.disarm();
|
||||
Ok(Either::Left(status))
|
||||
}
|
||||
None => Ok(Either::Right(self)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for the child to exit and collects its output.
|
||||
///
|
||||
/// The drop bomb remains armed until this consuming operation returns.
|
||||
pub async fn wait_with_output(mut self) -> io::Result<Output> {
|
||||
let child = self.take_child();
|
||||
let result = child.wait_with_output().await;
|
||||
self.bomb.disarm();
|
||||
result
|
||||
}
|
||||
|
||||
fn child(&self) -> &TokioChild {
|
||||
match self.child.as_ref() {
|
||||
Some(child) => child,
|
||||
None => panic!("managed child was made None before its wrapper was dropped"),
|
||||
}
|
||||
}
|
||||
|
||||
fn child_mut(&mut self) -> &mut TokioChild {
|
||||
match self.child.as_mut() {
|
||||
Some(child) => child,
|
||||
None => panic!("managed child was made None before its wrapper was dropped"),
|
||||
}
|
||||
}
|
||||
|
||||
fn take_child(&mut self) -> TokioChild {
|
||||
match self.child.take() {
|
||||
Some(child) => child,
|
||||
None => panic!("managed child was made None before its wrapper was dropped"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Child {
|
||||
type Target = TokioChild;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.child()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Child {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.child_mut()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "tokio_tests.rs"]
|
||||
mod tests;
|
||||
223
codex-rs/process/src/tokio_tests.rs
Normal file
223
codex-rs/process/src/tokio_tests.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
use super::Child;
|
||||
use super::CommandExt as _;
|
||||
use crate::test_support;
|
||||
use crate::test_support::STDERR_TEXT;
|
||||
use crate::test_support::STDOUT_TEXT;
|
||||
#[cfg(not(debug_assertions))]
|
||||
use crate::test_support::UNJOINED_CHILD_MESSAGE;
|
||||
use ::tokio as tokio_crate;
|
||||
use either::Either;
|
||||
use std::ops::DerefMut;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio_crate::io::AsyncReadExt;
|
||||
use tokio_crate::process::Command;
|
||||
use tokio_crate::time::Instant;
|
||||
use tokio_crate::time::sleep;
|
||||
use tokio_crate::time::timeout;
|
||||
|
||||
#[tokio_crate::test]
|
||||
async fn wait_disarms_bomb() {
|
||||
let child = command("exit-success")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
assert!(child.bomb.is_armed());
|
||||
|
||||
let status = child.wait().await.expect("wait for helper");
|
||||
assert!(status.success());
|
||||
}
|
||||
|
||||
#[tokio_crate::test]
|
||||
async fn stdio_is_available_through_deref_mut() {
|
||||
let mut child = command("output")
|
||||
.stdout(Stdio::piped())
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
let mut stdout = child.stdout.take().expect("piped stdout");
|
||||
let mut output = String::new();
|
||||
stdout
|
||||
.read_to_string(&mut output)
|
||||
.await
|
||||
.expect("read stdout");
|
||||
|
||||
assert!(child.wait().await.expect("wait for helper").success());
|
||||
assert!(output.contains(STDOUT_TEXT));
|
||||
}
|
||||
|
||||
#[tokio_crate::test]
|
||||
async fn try_wait_keeps_bomb_armed_until_status_is_available() {
|
||||
let child = command("sleep").spawn_managed().expect("spawn helper");
|
||||
|
||||
let mut child = match child.try_wait().expect("poll sleeping helper") {
|
||||
Either::Left(status) => panic!("sleeping helper exited unexpectedly: {status}"),
|
||||
Either::Right(child) => child,
|
||||
};
|
||||
assert!(child.bomb.is_armed());
|
||||
|
||||
child.start_kill().expect("kill sleeping helper");
|
||||
assert!(child.bomb.is_armed());
|
||||
assert!(
|
||||
!child
|
||||
.wait()
|
||||
.await
|
||||
.expect("wait for killed helper")
|
||||
.success()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio_crate::test]
|
||||
async fn try_wait_disarms_bomb_when_status_is_available() {
|
||||
let mut child = command("exit-success")
|
||||
.spawn_managed()
|
||||
.expect("spawn helper");
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
child = match child.try_wait().expect("poll helper") {
|
||||
Either::Left(status) => {
|
||||
assert!(status.success());
|
||||
return;
|
||||
}
|
||||
Either::Right(child) => child,
|
||||
};
|
||||
assert!(child.bomb.is_armed());
|
||||
assert!(Instant::now() < deadline, "helper did not exit");
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio_crate::test]
|
||||
async fn wait_with_output_collects_output() {
|
||||
let output = command("output")
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn_managed()
|
||||
.expect("spawn helper")
|
||||
.wait_with_output()
|
||||
.await
|
||||
.expect("collect helper output");
|
||||
|
||||
assert!(output.status.success());
|
||||
assert!(String::from_utf8_lossy(&output.stdout).contains(STDOUT_TEXT));
|
||||
assert!(String::from_utf8_lossy(&output.stderr).contains(STDERR_TEXT));
|
||||
}
|
||||
|
||||
#[tokio_crate::test]
|
||||
async fn kill_keeps_bomb_armed_until_explicit_wait() {
|
||||
let mut child = command("sleep").spawn_managed().expect("spawn helper");
|
||||
|
||||
child.kill().await.expect("kill sleeping helper");
|
||||
assert!(child.bomb.is_armed());
|
||||
|
||||
assert!(
|
||||
!child
|
||||
.wait()
|
||||
.await
|
||||
.expect("wait for killed helper")
|
||||
.success()
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[tokio_crate::test]
|
||||
#[should_panic(expected = "managed child process dropped without being joined")]
|
||||
async fn cancelled_wait_panics_when_dropped() {
|
||||
let mut command = command("sleep");
|
||||
command.kill_on_drop(true);
|
||||
let child = command.spawn_managed().expect("spawn helper");
|
||||
let mut wait = Box::pin(child.wait());
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(10), wait.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
drop(wait);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[tokio_crate::test]
|
||||
#[tracing_test::traced_test]
|
||||
async fn cancelled_wait_logs_an_error_when_dropped() {
|
||||
let mut command = command("sleep");
|
||||
command.kill_on_drop(true);
|
||||
let child = command.spawn_managed().expect("spawn helper");
|
||||
let mut wait = Box::pin(child.wait());
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(10), wait.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
drop(wait);
|
||||
|
||||
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[tokio_crate::test]
|
||||
#[should_panic(expected = "managed child process dropped without being joined")]
|
||||
async fn cancelled_wait_with_output_panics_when_dropped() {
|
||||
let mut command = command("sleep");
|
||||
command.kill_on_drop(true);
|
||||
let child = command.spawn_managed().expect("spawn helper");
|
||||
let mut wait = Box::pin(child.wait_with_output());
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(10), wait.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
drop(wait);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[tokio_crate::test]
|
||||
#[tracing_test::traced_test]
|
||||
async fn cancelled_wait_with_output_logs_an_error_when_dropped() {
|
||||
let mut command = command("sleep");
|
||||
command.kill_on_drop(true);
|
||||
let child = command.spawn_managed().expect("spawn helper");
|
||||
let mut wait = Box::pin(child.wait_with_output());
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(10), wait.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
drop(wait);
|
||||
|
||||
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[tokio_crate::test]
|
||||
#[should_panic(expected = "managed child process dropped without being joined")]
|
||||
async fn dropping_unjoined_child_panics() {
|
||||
let mut child = command("sleep").spawn_managed().expect("spawn helper");
|
||||
clean_up_without_disarming(&mut child).await;
|
||||
|
||||
drop(child);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[tokio_crate::test]
|
||||
#[tracing_test::traced_test]
|
||||
async fn dropping_unjoined_child_logs_an_error() {
|
||||
let mut child = command("sleep").spawn_managed().expect("spawn helper");
|
||||
clean_up_without_disarming(&mut child).await;
|
||||
|
||||
drop(child);
|
||||
|
||||
assert!(logs_contain(UNJOINED_CHILD_MESSAGE));
|
||||
}
|
||||
|
||||
fn command(mode: &str) -> Command {
|
||||
Command::from(test_support::command(mode))
|
||||
}
|
||||
|
||||
async fn clean_up_without_disarming(child: &mut Child) {
|
||||
let child = DerefMut::deref_mut(child);
|
||||
child.start_kill().expect("kill sleeping helper");
|
||||
child.wait().await.expect("reap sleeping helper");
|
||||
}
|
||||
Reference in New Issue
Block a user