Files
codex/codex-rs/exec-server/tests/exec_process.rs
Michael Bolin 491a3058f6 fix(exec-server): retain output until streams close (#18946)
## Why

A Mac Bazel run hit a flake in
`server::handler::tests::output_and_exit_are_retained_after_notification_receiver_closes`
where the read path observed process exit but lost the expected buffered
stdout (`first\nsecond\n`). See the [GitHub Actions
job](https://github.com/openai/codex/actions/runs/24758468552/job/72436716505)
and [BuildBuddy
invocation](https://app.buildbuddy.io/invocation/37475a12-4ef2-45fb-ab8a-e49a2aba1d59).

The underlying race is that process exit is not the same thing as
stdout/stderr closure. If a child or grandchild inherits the pipe write
end, or a process duplicates it with `dup2`, the watched process can
exit while the stream is still open and more output can still arrive.
The exec-server was starting exited-process retention cleanup from the
exit event, so the process entry could be removed before the output
streams had actually closed.

While stress-testing the exec-server unit suite,
`server::handler::tests::long_poll_read_fails_after_session_resume`
exposed a separate test race: it started a short-lived command that
could exit and wake the pending long-poll read before the session-resume
assertion observed the resumed-session error. That test is intended to
cover resume eviction, not process-exit delivery, so this change keeps
the process alive and quiet while the second connection resumes the
session.

## What changed

- Keep exec-server process entries retained until stdout/stderr streams
close, then start the post-exit retention timer from the closed event.
- Wake long-poll readers when the closed event is emitted.
- Add focused `local_process` unit coverage that proves late output is
still retained after the short test retention interval has elapsed, and
that closed process entries are eventually evicted.
- Add a local and remote regression test where a parent exits while a
child keeps inherited stdout open. The child waits on an explicit
release file, so the test deterministically observes exit first,
releases the child, then requires a nonzero-wait read from the exit
sequence to receive the late output.
- In `codex-rs/exec-server/src/server/handler/tests.rs`, make
`long_poll_read_fails_after_session_resume` run a long-lived silent
command instead of a short command that prints and exits. This isolates
the test to session-resume behavior and prevents a normal process exit
from satisfying the pending long-poll read first.

## Testing

- `cargo test -p codex-exec-server
exec_process_retains_output_after_exit_until_streams_close`
- `cargo test -p codex-exec-server local_process::tests`
- `cargo test -p codex-exec-server`
- `just fix -p codex-exec-server`
- `bazel test //codex-rs/exec-server:exec-server-unit-tests
//codex-rs/exec-server:exec-server-exec_process-test
//codex-rs/exec-server:exec-server-file_system-test
//codex-rs/exec-server:exec-server-http_client-test
//codex-rs/exec-server:exec-server-initialize-test
//codex-rs/exec-server:exec-server-process-test
//codex-rs/exec-server:exec-server-websocket-test`
- `bazel test --runs_per_test=25
//codex-rs/exec-server:exec-server-unit-tests`

## Documentation

No docs update needed; this is an internal exec-server correctness fix.
2026-04-23 19:49:58 +00:00

713 lines
24 KiB
Rust

#![cfg(unix)]
mod common;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use codex_exec_server::Environment;
use codex_exec_server::ExecBackend;
use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecProcessEvent;
use codex_exec_server::ProcessId;
use codex_exec_server::ReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteStatus;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use test_case::test_case;
use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::timeout;
use common::DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG;
use common::current_test_binary_helper_paths;
use common::exec_server::ExecServerHarness;
use common::exec_server::exec_server;
struct ProcessContext {
backend: Arc<dyn ExecBackend>,
server: Option<ExecServerHarness>,
}
#[derive(Debug, PartialEq, Eq)]
enum ProcessEventSnapshot {
Output {
seq: u64,
stream: ExecOutputStream,
text: String,
},
Exited {
seq: u64,
exit_code: i32,
},
Closed {
seq: u64,
},
}
async fn create_process_context(use_remote: bool) -> Result<ProcessContext> {
if use_remote {
let server = exec_server().await?;
let environment = Environment::create_for_tests(Some(server.websocket_url().to_string()))?;
Ok(ProcessContext {
backend: environment.get_exec_backend(),
server: Some(server),
})
} else {
let environment = Environment::create_for_tests(/*exec_server_url*/ None)?;
Ok(ProcessContext {
backend: environment.get_exec_backend(),
server: None,
})
}
}
async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-1"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), "proc-1");
let wake_rx = session.process.subscribe_wake();
let (_, exit_code, closed) =
collect_process_output_from_reads(session.process, wake_rx).await?;
assert_eq!(exit_code, Some(0));
assert!(closed);
Ok(())
}
async fn read_process_until_change(
session: Arc<dyn ExecProcess>,
wake_rx: &mut watch::Receiver<u64>,
after_seq: Option<u64>,
) -> Result<ReadResponse> {
let response = session
.read(after_seq, /*max_bytes*/ None, /*wait_ms*/ Some(0))
.await?;
if !response.chunks.is_empty() || response.closed || response.failure.is_some() {
return Ok(response);
}
timeout(Duration::from_secs(2), wake_rx.changed()).await??;
session
.read(after_seq, /*max_bytes*/ None, /*wait_ms*/ Some(0))
.await
.map_err(Into::into)
}
async fn collect_process_output_from_reads(
session: Arc<dyn ExecProcess>,
mut wake_rx: watch::Receiver<u64>,
) -> Result<(String, Option<i32>, bool)> {
let mut output = String::new();
let mut exit_code = None;
let mut after_seq = None;
loop {
let response =
read_process_until_change(Arc::clone(&session), &mut wake_rx, after_seq).await?;
if let Some(message) = response.failure {
anyhow::bail!("process failed before closed state: {message}");
}
for chunk in response.chunks {
output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
after_seq = Some(chunk.seq);
}
if response.exited {
exit_code = response.exit_code;
}
if response.closed {
break;
}
after_seq = response.next_seq.checked_sub(1).or(after_seq);
}
drop(session);
Ok((output, exit_code, true))
}
async fn collect_process_output_from_events(
session: Arc<dyn ExecProcess>,
) -> Result<(String, String, Option<i32>, bool)> {
let mut events = session.subscribe_events();
let mut stdout = String::new();
let mut stderr = String::new();
let mut exit_code = None;
loop {
match timeout(Duration::from_secs(2), events.recv()).await?? {
ExecProcessEvent::Output(chunk) => match chunk.stream {
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
stdout.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
}
ExecOutputStream::Stderr => {
stderr.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
}
},
ExecProcessEvent::Exited {
seq: _,
exit_code: code,
} => {
exit_code = Some(code);
}
ExecProcessEvent::Closed { seq: _ } => {
drop(session);
return Ok((stdout, stderr, exit_code, true));
}
ExecProcessEvent::Failed(message) => {
anyhow::bail!("process failed before closed state: {message}");
}
}
}
}
async fn collect_process_event_snapshots(
session: Arc<dyn ExecProcess>,
) -> Result<Vec<ProcessEventSnapshot>> {
let mut events = session.subscribe_events();
let mut snapshots = Vec::new();
loop {
let snapshot = match timeout(Duration::from_secs(2), events.recv()).await?? {
ExecProcessEvent::Output(chunk) => ProcessEventSnapshot::Output {
seq: chunk.seq,
stream: chunk.stream,
text: String::from_utf8_lossy(&chunk.chunk.into_inner()).into_owned(),
},
ExecProcessEvent::Exited { seq, exit_code } => {
ProcessEventSnapshot::Exited { seq, exit_code }
}
ExecProcessEvent::Closed { seq } => ProcessEventSnapshot::Closed { seq },
ExecProcessEvent::Failed(message) => {
anyhow::bail!("process failed before closed state: {message}");
}
};
let closed = matches!(snapshot, ProcessEventSnapshot::Closed { .. });
snapshots.push(snapshot);
if closed {
drop(session);
return Ok(snapshots);
}
}
}
async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stream".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 0.05; printf 'session output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process } = session;
let wake_rx = process.subscribe_wake();
let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?;
assert_eq!(output, "session output\n");
assert_eq!(exit_code, Some(0));
assert!(closed);
Ok(())
}
async fn assert_exec_process_pushes_events(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-events".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf 'event output\\n'; sleep 0.1; printf 'event err\\n' >&2; sleep 0.1; exit 7".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process } = session;
let actual = collect_process_event_snapshots(process).await?;
assert_eq!(
actual,
vec![
ProcessEventSnapshot::Output {
seq: 1,
stream: ExecOutputStream::Stdout,
text: "event output\n".to_string(),
},
ProcessEventSnapshot::Output {
seq: 2,
stream: ExecOutputStream::Stderr,
text: "event err\n".to_string(),
},
ProcessEventSnapshot::Exited {
seq: 3,
exit_code: 7,
},
ProcessEventSnapshot::Closed { seq: 4 },
]
);
Ok(())
}
async fn assert_exec_process_replays_events_after_close(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-events-late".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf 'late one\\n'; printf 'late two\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process } = session;
let wake_rx = process.subscribe_wake();
let read_result = collect_process_output_from_reads(Arc::clone(&process), wake_rx).await?;
assert_eq!(
read_result,
("late one\nlate two\n".to_string(), Some(0), true)
);
let event_result = collect_process_output_from_events(process).await?;
assert_eq!(
event_result,
(
"late one\nlate two\n".to_string(),
String::new(),
Some(0),
true
)
);
Ok(())
}
async fn assert_exec_process_retains_output_after_exit_until_streams_close(
use_remote: bool,
) -> Result<()> {
let context = create_process_context(use_remote).await?;
let (helper_binary, _) = current_test_binary_helper_paths()?;
let release_dir = TempDir::new()?;
let release_path = release_dir.path().join("release-delayed-output");
let process_id = "proc-output-after-exit".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
helper_binary.to_string_lossy().into_owned(),
DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG.to_string(),
release_path.to_string_lossy().into_owned(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process } = session;
let exit_response = timeout(
Duration::from_secs(2),
process.read(
/*after_seq*/ None,
/*max_bytes*/ None,
/*wait_ms*/ Some(2_000),
),
)
.await??;
assert!(
exit_response.chunks.is_empty(),
"parent should exit before child writes delayed output"
);
assert_eq!(exit_response.exit_code, Some(0));
assert!(!exit_response.closed);
let exit_seq = exit_response
.next_seq
.checked_sub(1)
.context("exit response should advance next_seq")?;
std::fs::write(&release_path, b"go")?;
let late_response = timeout(
Duration::from_secs(2),
process.read(
/*after_seq*/ Some(exit_seq),
/*max_bytes*/ None,
/*wait_ms*/ Some(2_000),
),
)
.await??;
let mut late_output = String::new();
for chunk in late_response.chunks {
assert_eq!(chunk.stream, ExecOutputStream::Stdout);
late_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
}
assert_eq!(late_output, "late output after exit\n");
let wake_rx = process.subscribe_wake();
let actual = collect_process_output_from_reads(process, wake_rx).await?;
assert_eq!(
actual,
("late output after exit\n".to_string(), Some(0), true)
);
Ok(())
}
async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stdin".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
// Use `/bin/sh` instead of Python so this stdin round-trip test
// stays portable across Bazel and non-macOS runners where
// `/usr/bin/python3` is not guaranteed to exist.
"/bin/sh".to_string(),
"-c".to_string(),
"IFS= read line; printf 'from-stdin:%s\\n' \"$line\"".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: true,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
tokio::time::sleep(Duration::from_millis(200)).await;
session.process.write(b"hello\n".to_vec()).await?;
let StartedExecProcess { process } = session;
let wake_rx = process.subscribe_wake();
let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?;
assert!(
output.contains("from-stdin:hello"),
"unexpected output: {output:?}"
);
assert_eq!(exit_code, Some(0));
assert!(closed);
Ok(())
}
async fn assert_exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stdin-pipe".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"IFS= read line; printf 'from-stdin:%s\\n' \"$line\"".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: true,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
tokio::time::sleep(Duration::from_millis(200)).await;
let write_response = session.process.write(b"hello\n".to_vec()).await?;
assert_eq!(write_response.status, WriteStatus::Accepted);
let StartedExecProcess { process } = session;
let wake_rx = process.subscribe_wake();
let actual = collect_process_output_from_reads(process, wake_rx).await?;
assert_eq!(actual, ("from-stdin:hello\n".to_string(), Some(0), true));
Ok(())
}
async fn assert_exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stdin-closed".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 0.3; if IFS= read -r line; then printf 'read:%s\\n' \"$line\"; else printf 'eof\\n'; fi".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let write_response = session.process.write(b"ignored\n".to_vec()).await?;
assert_eq!(write_response.status, WriteStatus::StdinClosed);
let StartedExecProcess { process } = session;
let wake_rx = process.subscribe_wake();
let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?;
assert_eq!(output, "eof\n");
assert_eq!(exit_code, Some(0));
assert!(closed);
Ok(())
}
async fn assert_exec_process_preserves_queued_events_before_subscribe(
use_remote: bool,
) -> Result<()> {
let context = create_process_context(use_remote).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-queued"),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf 'queued output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let StartedExecProcess { process } = session;
let wake_rx = process.subscribe_wake();
let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?;
assert_eq!(output, "queued output\n");
assert_eq!(exit_code, Some(0));
assert!(closed);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
let mut context = create_process_context(/*use_remote*/ true).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-disconnect"),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 10".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
let process = Arc::clone(&session.process);
let mut events = process.subscribe_events();
let process_for_pending_read = Arc::clone(&process);
let pending_read = tokio::spawn(async move {
process_for_pending_read
.read(
/*after_seq*/ None,
/*max_bytes*/ None,
/*wait_ms*/ Some(60_000),
)
.await
});
let server = context
.server
.as_mut()
.expect("remote context should include exec-server harness");
server.shutdown().await?;
let event = timeout(Duration::from_secs(2), events.recv()).await??;
let ExecProcessEvent::Failed(event_message) = event else {
anyhow::bail!("expected process failure event, got {event:?}");
};
assert!(
event_message.starts_with("exec-server transport disconnected"),
"unexpected failure event: {event_message}"
);
let pending_response = timeout(Duration::from_secs(2), pending_read).await???;
let pending_message = pending_response
.failure
.expect("pending read should surface disconnect as a failure");
assert!(
pending_message.starts_with("exec-server transport disconnected"),
"unexpected pending failure message: {pending_message}"
);
let mut wake_rx = process.subscribe_wake();
let response = read_process_until_change(process, &mut wake_rx, /*after_seq*/ None).await?;
let message = response
.failure
.expect("disconnect should surface as a failure");
assert!(
message.starts_with("exec-server transport disconnected"),
"unexpected failure message: {message}"
);
assert!(
response.closed,
"disconnect should close the process session"
);
let write_result = timeout(
Duration::from_secs(2),
session.process.write(b"hello".to_vec()),
)
.await
.context("timed out waiting for write after disconnect")?;
let write_error = write_result.expect_err("write after disconnect should fail");
assert!(
write_error
.to_string()
.starts_with("exec-server transport disconnected"),
"unexpected write error: {write_error}"
);
Ok(())
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
assert_exec_process_starts_and_exits(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_streams_output(use_remote: bool) -> Result<()> {
assert_exec_process_streams_output(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_pushes_events(use_remote: bool) -> Result<()> {
assert_exec_process_pushes_events(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()> {
assert_exec_process_replays_events_after_close(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_retains_output_after_exit_until_streams_close(
use_remote: bool,
) -> Result<()> {
assert_exec_process_retains_output_after_exit_until_streams_close(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_write_then_read(use_remote: bool) -> Result<()> {
assert_exec_process_write_then_read(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()> {
assert_exec_process_write_then_read_without_tty(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Result<()> {
assert_exec_process_rejects_write_without_pipe_stdin(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_preserves_queued_events_before_subscribe(use_remote: bool) -> Result<()> {
assert_exec_process_preserves_queued_events_before_subscribe(use_remote).await
}