From a838006c971a437a4ebea94cb82f3dfdeac8a7d8 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Tue, 21 Apr 2026 22:04:55 -0700 Subject: [PATCH] fix(exec-server): retain output until streams close --- codex-rs/exec-server/src/local_process.rs | 44 +++++---- .../exec-server/src/server/handler/tests.rs | 5 +- codex-rs/exec-server/tests/common/mod.rs | 87 ++++++++++++++++++ codex-rs/exec-server/tests/exec_process.rs | 89 +++++++++++++++++++ 4 files changed, 204 insertions(+), 21 deletions(-) diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index aa88e37fdd..3496d3f2d6 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -323,8 +323,10 @@ impl LocalProcess { ) }; + let has_new_lifecycle_event = response.next_seq > after_seq.saturating_add(1); if !response.chunks.is_empty() - || response.exited + || has_new_lifecycle_event + || response.closed || tokio::time::Instant::now() >= deadline { let _total_bytes: usize = response @@ -625,16 +627,7 @@ async fn watch_exit( .await; } - maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await; - - tokio::time::sleep(EXITED_PROCESS_RETENTION).await; - let mut processes = inner.processes.lock().await; - if matches!( - processes.get(&process_id), - Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code) - ) { - processes.remove(&process_id); - } + maybe_emit_closed(process_id, Arc::clone(&inner)).await; } async fn finish_output_stream(process_id: ProcessId, inner: Arc) { @@ -653,7 +646,7 @@ async fn finish_output_stream(process_id: ProcessId, inner: Arc) { } async fn maybe_emit_closed(process_id: ProcessId, inner: Arc) { - let notification = { + let closed = { let mut processes = inner.processes.lock().await; let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else { return; @@ -668,16 +661,33 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc) { process.next_seq += 1; let _ = process.wake_tx.send(seq); process.events.publish(ExecProcessEvent::Closed { seq }); - Some(ExecClosedNotification { - process_id: process_id.clone(), - seq, - }) + Some(( + ExecClosedNotification { + process_id: process_id.clone(), + seq, + }, + Arc::clone(&process.output_notify), + )) }; - let Some(notification) = notification else { + let Some((notification, output_notify)) = closed else { return; }; + output_notify.notify_waiters(); + let cleanup_process_id = process_id.clone(); + let cleanup_inner = Arc::clone(&inner); + tokio::spawn(async move { + tokio::time::sleep(EXITED_PROCESS_RETENTION).await; + let mut processes = cleanup_inner.processes.lock().await; + if matches!( + processes.get(&cleanup_process_id), + Some(ProcessEntry::Running(process)) if process.closed + ) { + processes.remove(&cleanup_process_id); + } + }); + if let Some(notifications) = notification_sender(&inner) { let _ = notifications .notify(EXEC_CLOSED_METHOD, ¬ification) diff --git a/codex-rs/exec-server/src/server/handler/tests.rs b/codex-rs/exec-server/src/server/handler/tests.rs index 9d1e4c470c..f1769bfe82 100644 --- a/codex-rs/exec-server/src/server/handler/tests.rs +++ b/codex-rs/exec-server/src/server/handler/tests.rs @@ -173,10 +173,7 @@ async fn long_poll_read_fails_after_session_resume() { first_handler .exec(exec_params_with_argv( "proc-long-poll", - shell_argv( - "sleep 0.1; printf resumed", - "ping -n 2 127.0.0.1 >NUL && echo resumed", - ), + shell_argv("sleep 5", "ping -n 6 127.0.0.1 >NUL"), )) .await .expect("start process"); diff --git a/codex-rs/exec-server/tests/common/mod.rs b/codex-rs/exec-server/tests/common/mod.rs index c206d8b972..387edf36db 100644 --- a/codex-rs/exec-server/tests/common/mod.rs +++ b/codex-rs/exec-server/tests/common/mod.rs @@ -1,5 +1,10 @@ use std::env; +use std::io::Write; +use std::path::Path; use std::path::PathBuf; +use std::process::Command; +use std::process::Stdio; +use std::time::Duration; use codex_exec_server::CODEX_FS_HELPER_ARG1; use codex_exec_server::ExecServerRuntimePaths; @@ -11,6 +16,11 @@ use ctor::ctor; pub(crate) mod exec_server; +pub(crate) const DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG: &str = + "--codex-test-delayed-output-after-exit-parent"; + +const DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG: &str = "--codex-test-delayed-output-after-exit-child"; + #[ctor] pub static TEST_BINARY_DISPATCH_GUARD: Option = { let guard = configure_test_binary_dispatch("codex-exec-server-tests", |exe_name, argv1| { @@ -22,6 +32,7 @@ pub static TEST_BINARY_DISPATCH_GUARD: Option = { } TestBinaryDispatchMode::InstallAliases }); + maybe_run_delayed_output_after_exit_from_test_binary(); maybe_run_exec_server_from_test_binary(guard.as_ref()); guard }; @@ -39,6 +50,82 @@ pub(crate) fn current_test_binary_helper_paths() -> anyhow::Result<(PathBuf, Opt Ok((current_exe, codex_linux_sandbox_exe)) } +fn maybe_run_delayed_output_after_exit_from_test_binary() { + let mut args = env::args(); + let _program = args.next(); + let Some(command) = args.next() else { + return; + }; + match command.as_str() { + DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG => { + let release_path = next_release_path_arg(args); + run_delayed_output_after_exit_parent(&release_path); + } + DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG => { + let release_path = next_release_path_arg(args); + run_delayed_output_after_exit_child(&release_path); + } + _ => {} + } +} + +fn next_release_path_arg(mut args: impl Iterator) -> PathBuf { + let Some(release_path) = args.next() else { + eprintln!("expected release path"); + std::process::exit(1); + }; + if args.next().is_some() { + eprintln!("unexpected extra arguments"); + std::process::exit(1); + } + PathBuf::from(release_path) +} + +fn run_delayed_output_after_exit_parent(release_path: &Path) { + let current_exe = match env::current_exe() { + Ok(current_exe) => current_exe, + Err(error) => { + eprintln!("failed to resolve current test binary: {error}"); + std::process::exit(1); + } + }; + match Command::new(current_exe) + .arg(DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG) + .arg(release_path) + .stdin(Stdio::null()) + .spawn() + { + Ok(_) => std::process::exit(0), + Err(error) => { + eprintln!("failed to spawn delayed output child: {error}"); + std::process::exit(1); + } + } +} + +fn run_delayed_output_after_exit_child(release_path: &Path) { + for _ in 0..1_000 { + if release_path.exists() { + let mut stdout = std::io::stdout().lock(); + if let Err(error) = writeln!(stdout, "late output after exit") { + eprintln!("failed to write delayed output: {error}"); + std::process::exit(1); + } + if let Err(error) = stdout.flush() { + eprintln!("failed to flush delayed output: {error}"); + std::process::exit(1); + } + std::process::exit(0); + } + std::thread::sleep(Duration::from_millis(10)); + } + eprintln!( + "timed out waiting for release path {}", + release_path.display() + ); + std::process::exit(1); +} + fn maybe_run_exec_server_from_test_binary(guard: Option<&TestBinaryDispatchGuard>) { let mut args = env::args(); let _program = args.next(); diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 9972cc004a..e1f330fc4e 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -17,11 +17,14 @@ use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteStatus; use pretty_assertions::assert_eq; +use tempfile::TempDir; use test_case::test_case; use tokio::sync::watch; use tokio::time::Duration; use tokio::time::timeout; +use common::DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG; +use common::current_test_binary_helper_paths; use common::exec_server::ExecServerHarness; use common::exec_server::exec_server; @@ -320,6 +323,81 @@ async fn assert_exec_process_replays_events_after_close(use_remote: bool) -> Res Ok(()) } +async fn assert_exec_process_retains_output_after_exit_until_streams_close( + use_remote: bool, +) -> Result<()> { + let context = create_process_context(use_remote).await?; + let (helper_binary, _) = current_test_binary_helper_paths()?; + let release_dir = TempDir::new()?; + let release_path = release_dir.path().join("release-delayed-output"); + let process_id = "proc-output-after-exit".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + helper_binary.to_string_lossy().into_owned(), + DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG.to_string(), + release_path.to_string_lossy().into_owned(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + let StartedExecProcess { process } = session; + + let exit_response = timeout( + Duration::from_secs(2), + process.read( + /*after_seq*/ None, + /*max_bytes*/ None, + /*wait_ms*/ Some(2_000), + ), + ) + .await??; + assert!( + exit_response.chunks.is_empty(), + "parent should exit before child writes delayed output" + ); + assert_eq!(exit_response.exit_code, Some(0)); + assert!(!exit_response.closed); + let exit_seq = exit_response + .next_seq + .checked_sub(1) + .context("exit response should advance next_seq")?; + std::fs::write(&release_path, b"go")?; + + let late_response = timeout( + Duration::from_secs(2), + process.read( + /*after_seq*/ Some(exit_seq), + /*max_bytes*/ None, + /*wait_ms*/ Some(2_000), + ), + ) + .await??; + let mut late_output = String::new(); + for chunk in late_response.chunks { + assert_eq!(chunk.stream, ExecOutputStream::Stdout); + late_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner())); + } + assert_eq!(late_output, "late output after exit\n"); + + let wake_rx = process.subscribe_wake(); + let actual = collect_process_output_from_reads(process, wake_rx).await?; + assert_eq!( + actual, + ("late output after exit\n".to_string(), Some(0), true) + ); + Ok(()) +} + async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> { let context = create_process_context(use_remote).await?; let process_id = "proc-stdin".to_string(); @@ -586,6 +664,17 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()> assert_exec_process_replays_events_after_close(use_remote).await } +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_retains_output_after_exit_until_streams_close( + use_remote: bool, +) -> Result<()> { + assert_exec_process_retains_output_after_exit_until_streams_close(use_remote).await +} + #[test_case(false ; "local")] #[test_case(true ; "remote")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]