fix(exec-server): retain output until streams close

This commit is contained in:
Michael Bolin
2026-04-21 22:04:55 -07:00
parent 34800d717e
commit a838006c97
4 changed files with 204 additions and 21 deletions

View File

@@ -323,8 +323,10 @@ impl LocalProcess {
)
};
let has_new_lifecycle_event = response.next_seq > after_seq.saturating_add(1);
if !response.chunks.is_empty()
|| response.exited
|| has_new_lifecycle_event
|| response.closed
|| tokio::time::Instant::now() >= deadline
{
let _total_bytes: usize = response
@@ -625,16 +627,7 @@ async fn watch_exit(
.await;
}
maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await;
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = inner.processes.lock().await;
if matches!(
processes.get(&process_id),
Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code)
) {
processes.remove(&process_id);
}
maybe_emit_closed(process_id, Arc::clone(&inner)).await;
}
async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
@@ -653,7 +646,7 @@ async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
}
async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
let notification = {
let closed = {
let mut processes = inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
return;
@@ -668,16 +661,33 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
process.next_seq += 1;
let _ = process.wake_tx.send(seq);
process.events.publish(ExecProcessEvent::Closed { seq });
Some(ExecClosedNotification {
process_id: process_id.clone(),
seq,
})
Some((
ExecClosedNotification {
process_id: process_id.clone(),
seq,
},
Arc::clone(&process.output_notify),
))
};
let Some(notification) = notification else {
let Some((notification, output_notify)) = closed else {
return;
};
output_notify.notify_waiters();
let cleanup_process_id = process_id.clone();
let cleanup_inner = Arc::clone(&inner);
tokio::spawn(async move {
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = cleanup_inner.processes.lock().await;
if matches!(
processes.get(&cleanup_process_id),
Some(ProcessEntry::Running(process)) if process.closed
) {
processes.remove(&cleanup_process_id);
}
});
if let Some(notifications) = notification_sender(&inner) {
let _ = notifications
.notify(EXEC_CLOSED_METHOD, &notification)

View File

@@ -173,10 +173,7 @@ async fn long_poll_read_fails_after_session_resume() {
first_handler
.exec(exec_params_with_argv(
"proc-long-poll",
shell_argv(
"sleep 0.1; printf resumed",
"ping -n 2 127.0.0.1 >NUL && echo resumed",
),
shell_argv("sleep 5", "ping -n 6 127.0.0.1 >NUL"),
))
.await
.expect("start process");

View File

@@ -1,5 +1,10 @@
use std::env;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::process::Stdio;
use std::time::Duration;
use codex_exec_server::CODEX_FS_HELPER_ARG1;
use codex_exec_server::ExecServerRuntimePaths;
@@ -11,6 +16,11 @@ use ctor::ctor;
pub(crate) mod exec_server;
pub(crate) const DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG: &str =
"--codex-test-delayed-output-after-exit-parent";
const DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG: &str = "--codex-test-delayed-output-after-exit-child";
#[ctor]
pub static TEST_BINARY_DISPATCH_GUARD: Option<TestBinaryDispatchGuard> = {
let guard = configure_test_binary_dispatch("codex-exec-server-tests", |exe_name, argv1| {
@@ -22,6 +32,7 @@ pub static TEST_BINARY_DISPATCH_GUARD: Option<TestBinaryDispatchGuard> = {
}
TestBinaryDispatchMode::InstallAliases
});
maybe_run_delayed_output_after_exit_from_test_binary();
maybe_run_exec_server_from_test_binary(guard.as_ref());
guard
};
@@ -39,6 +50,82 @@ pub(crate) fn current_test_binary_helper_paths() -> anyhow::Result<(PathBuf, Opt
Ok((current_exe, codex_linux_sandbox_exe))
}
fn maybe_run_delayed_output_after_exit_from_test_binary() {
let mut args = env::args();
let _program = args.next();
let Some(command) = args.next() else {
return;
};
match command.as_str() {
DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG => {
let release_path = next_release_path_arg(args);
run_delayed_output_after_exit_parent(&release_path);
}
DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG => {
let release_path = next_release_path_arg(args);
run_delayed_output_after_exit_child(&release_path);
}
_ => {}
}
}
fn next_release_path_arg(mut args: impl Iterator<Item = String>) -> PathBuf {
let Some(release_path) = args.next() else {
eprintln!("expected release path");
std::process::exit(1);
};
if args.next().is_some() {
eprintln!("unexpected extra arguments");
std::process::exit(1);
}
PathBuf::from(release_path)
}
fn run_delayed_output_after_exit_parent(release_path: &Path) {
let current_exe = match env::current_exe() {
Ok(current_exe) => current_exe,
Err(error) => {
eprintln!("failed to resolve current test binary: {error}");
std::process::exit(1);
}
};
match Command::new(current_exe)
.arg(DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG)
.arg(release_path)
.stdin(Stdio::null())
.spawn()
{
Ok(_) => std::process::exit(0),
Err(error) => {
eprintln!("failed to spawn delayed output child: {error}");
std::process::exit(1);
}
}
}
fn run_delayed_output_after_exit_child(release_path: &Path) {
for _ in 0..1_000 {
if release_path.exists() {
let mut stdout = std::io::stdout().lock();
if let Err(error) = writeln!(stdout, "late output after exit") {
eprintln!("failed to write delayed output: {error}");
std::process::exit(1);
}
if let Err(error) = stdout.flush() {
eprintln!("failed to flush delayed output: {error}");
std::process::exit(1);
}
std::process::exit(0);
}
std::thread::sleep(Duration::from_millis(10));
}
eprintln!(
"timed out waiting for release path {}",
release_path.display()
);
std::process::exit(1);
}
fn maybe_run_exec_server_from_test_binary(guard: Option<&TestBinaryDispatchGuard>) {
let mut args = env::args();
let _program = args.next();

View File

@@ -17,11 +17,14 @@ use codex_exec_server::ReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteStatus;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use test_case::test_case;
use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::timeout;
use common::DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG;
use common::current_test_binary_helper_paths;
use common::exec_server::ExecServerHarness;
use common::exec_server::exec_server;
@@ -320,6 +323,81 @@ async fn assert_exec_process_replays_events_after_close(use_remote: bool) -> Res
Ok(())
}
async fn assert_exec_process_retains_output_after_exit_until_streams_close(
use_remote: bool,
) -> Result<()> {
let context = create_process_context(use_remote).await?;
let (helper_binary, _) = current_test_binary_helper_paths()?;
let release_dir = TempDir::new()?;
let release_path = release_dir.path().join("release-delayed-output");
let process_id = "proc-output-after-exit".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
helper_binary.to_string_lossy().into_owned(),
DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG.to_string(),
release_path.to_string_lossy().into_owned(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process } = session;
let exit_response = timeout(
Duration::from_secs(2),
process.read(
/*after_seq*/ None,
/*max_bytes*/ None,
/*wait_ms*/ Some(2_000),
),
)
.await??;
assert!(
exit_response.chunks.is_empty(),
"parent should exit before child writes delayed output"
);
assert_eq!(exit_response.exit_code, Some(0));
assert!(!exit_response.closed);
let exit_seq = exit_response
.next_seq
.checked_sub(1)
.context("exit response should advance next_seq")?;
std::fs::write(&release_path, b"go")?;
let late_response = timeout(
Duration::from_secs(2),
process.read(
/*after_seq*/ Some(exit_seq),
/*max_bytes*/ None,
/*wait_ms*/ Some(2_000),
),
)
.await??;
let mut late_output = String::new();
for chunk in late_response.chunks {
assert_eq!(chunk.stream, ExecOutputStream::Stdout);
late_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
}
assert_eq!(late_output, "late output after exit\n");
let wake_rx = process.subscribe_wake();
let actual = collect_process_output_from_reads(process, wake_rx).await?;
assert_eq!(
actual,
("late output after exit\n".to_string(), Some(0), true)
);
Ok(())
}
async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stdin".to_string();
@@ -586,6 +664,17 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()>
assert_exec_process_replays_events_after_close(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_retains_output_after_exit_until_streams_close(
use_remote: bool,
) -> Result<()> {
assert_exec_process_retains_output_after_exit_until_streams_close(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]