This commit is contained in:
Ruslan Nigmatullin
2026-03-06 11:43:18 -08:00
parent f59dcafb49
commit 9dbf1a14d3
9 changed files with 152 additions and 238 deletions

View File

@@ -5,6 +5,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
@@ -47,6 +48,7 @@ pub(crate) struct OutputHandles {
#[derive(Debug)]
pub(crate) struct UnifiedExecProcess {
process_handle: ExecCommandSession,
output_rx: broadcast::Receiver<Vec<u8>>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
@@ -72,6 +74,7 @@ impl UnifiedExecProcess {
let cancellation_token = CancellationToken::new();
let output_drained = Arc::new(Notify::new());
let mut receiver = initial_output_rx;
let output_rx = receiver.resubscribe();
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let output_closed_clone = Arc::clone(&output_closed);
@@ -97,6 +100,7 @@ impl UnifiedExecProcess {
Self {
process_handle,
output_rx,
output_buffer,
output_notify,
output_closed,
@@ -124,7 +128,7 @@ impl UnifiedExecProcess {
}
pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
self.process_handle.output_receiver()
self.output_rx.resubscribe()
}
pub(super) fn cancellation_token(&self) -> CancellationToken {
@@ -214,9 +218,11 @@ impl UnifiedExecProcess {
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session: process_handle,
output_rx,
stdout_rx,
stderr_rx,
mut exit_rx,
} = spawned;
let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));

View File

@@ -129,9 +129,15 @@ trust_level = "trusted"
.await?;
let mut output = Vec::new();
let mut output_rx = spawned.output_rx;
let mut exit_rx = spawned.exit_rx;
let writer_tx = spawned.session.writer_sender();
let codex_utils_pty::SpawnedProcess {
session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let mut exit_rx = exit_rx;
let writer_tx = session.writer_sender();
let interrupt_writer = writer_tx.clone();
let interrupt_task = tokio::spawn(async move {
sleep(Duration::from_secs(2)).await;
@@ -166,7 +172,7 @@ trust_level = "trusted"
Ok(Ok(code)) => code,
Ok(Err(err)) => return Err(err.into()),
Err(_) => {
spawned.session.terminate();
session.terminate();
anyhow::bail!("timed out waiting for codex resume to exit");
}
};

View File

@@ -75,9 +75,15 @@ async fn run_codex_cli(
)
.await?;
let mut output = Vec::new();
let mut output_rx = spawned.output_rx;
let mut exit_rx = spawned.exit_rx;
let writer_tx = spawned.session.writer_sender();
let codex_utils_pty::SpawnedProcess {
session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let mut exit_rx = exit_rx;
let writer_tx = session.writer_sender();
let exit_code_result = timeout(Duration::from_secs(10), async {
// Read PTY output until the process exits while replying to cursor
// position queries so the TUI can initialize without a real terminal.
@@ -104,7 +110,7 @@ async fn run_codex_cli(
Ok(Ok(code)) => code,
Ok(Err(err)) => return Err(err.into()),
Err(_) => {
spawned.session.terminate();
session.terminate();
anyhow::bail!("timed out waiting for codex CLI to exit");
}
};

View File

@@ -7,24 +7,22 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
- `spawn_pty_process(program, args, cwd, env, arg0, size)``SpawnedProcess`
- `spawn_pipe_process(program, args, cwd, env, arg0)``SpawnedProcess`
- `spawn_pipe_process_no_stdin(program, args, cwd, env, arg0)``SpawnedProcess`
- `pty::spawn_streaming_process(program, args, cwd, env, arg0, size, output_sink)``SpawnedStreamingProcess`
- `pipe::spawn_streaming_process(program, args, cwd, env, arg0, stdin_mode, output_sink)``SpawnedStreamingProcess`
- `combine_output_receivers(stdout_rx, stderr_rx)``broadcast::Receiver<Vec<u8>>`
- `conpty_supported()``bool` (Windows only; always true elsewhere)
- `TerminalSize { rows, cols }` selects PTY dimensions in character cells.
- `ProcessHandle` exposes:
- `writer_sender()``mpsc::Sender<Vec<u8>>` (stdin)
- `output_receiver()``broadcast::Receiver<Vec<u8>>` (stdout/stderr merged)
- `resize(TerminalSize)`
- `close_stdin()`
- `has_exited()`, `exit_code()`, `terminate()`
- `SpawnedProcess` bundles `session`, `output_rx`, and `exit_rx` (oneshot exit code).
- `SpawnedStreamingProcess` bundles `session` and `exit_rx`; callers own the output sink/receivers.
- `SpawnedProcess` bundles `session`, `stdout_rx`, `stderr_rx`, and `exit_rx` (oneshot exit code).
## Usage examples
```rust
use std::collections::HashMap;
use std::path::Path;
use codex_utils_pty::combine_output_receivers;
use codex_utils_pty::spawn_pty_process;
use codex_utils_pty::TerminalSize;
@@ -43,7 +41,7 @@ let writer = spawned.session.writer_sender();
writer.send(b"exit\n".to_vec()).await?;
// Collect output until the process exits.
let mut output_rx = spawned.output_rx;
let mut output_rx = combine_output_receivers(spawned.stdout_rx, spawned.stderr_rx);
let mut collected = Vec::new();
while let Ok(chunk) = output_rx.try_recv() {
collected.extend_from_slice(&chunk);

View File

@@ -13,14 +13,12 @@ pub const DEFAULT_OUTPUT_BYTES_CAP: usize = 1024 * 1024;
pub use pipe::spawn_process as spawn_pipe_process;
/// Spawn a non-interactive process using regular pipes, but close stdin immediately.
pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
/// Output routing configuration for spawned processes.
pub use process::OutputSink;
/// Combine stdout/stderr receivers into a single broadcast receiver.
pub use process::combine_output_receivers;
/// Handle for interacting with a spawned process (PTY or pipe).
pub use process::ProcessHandle;
/// Bundle of process handles plus output and exit receivers returned by spawn helpers.
/// Bundle of process handles plus split output and exit receivers returned by spawn helpers.
pub use process::SpawnedProcess;
/// Bundle of process handles and exit receiver returned by streaming spawn helpers.
pub use process::SpawnedStreamingProcess;
/// Terminal size in character cells used for PTY spawn and resize operations.
pub use process::TerminalSize;
/// Backwards-compatible alias for ProcessHandle.

View File

@@ -18,11 +18,8 @@ use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::OutputSink;
use crate::process::OutputStream;
use crate::process::ProcessHandle;
use crate::process::SpawnedProcess;
use crate::process::SpawnedStreamingProcess;
#[cfg(target_os = "linux")]
use libc;
@@ -75,7 +72,7 @@ fn kill_process(pid: u32) -> io::Result<()> {
}
}
async fn read_output_stream<R>(mut reader: R, output_sink: OutputSink, stream: OutputStream)
async fn read_output_stream<R>(mut reader: R, output_tx: mpsc::Sender<Vec<u8>>)
where
R: AsyncRead + Unpin,
{
@@ -84,7 +81,7 @@ where
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
output_sink.send(buf[..n].to_vec(), stream).await;
let _ = output_tx.send(buf[..n].to_vec()).await;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
@@ -93,20 +90,19 @@ where
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PipeStdinMode {
enum PipeStdinMode {
Piped,
Null,
}
pub async fn spawn_streaming_process(
async fn spawn_process_with_stdin_mode(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
stdin_mode: PipeStdinMode,
output_sink: OutputSink,
) -> Result<SpawnedStreamingProcess> {
) -> Result<SpawnedProcess> {
if program.is_empty() {
anyhow::bail!("missing program for pipe spawn");
}
@@ -160,7 +156,8 @@ pub async fn spawn_streaming_process(
let stderr = child.stderr.take();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let output_tx = output_sink.combined_sender();
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(128);
let writer_handle = if let Some(stdin) = stdin {
let writer = Arc::new(tokio::sync::Mutex::new(stdin));
tokio::spawn(async move {
@@ -176,15 +173,15 @@ pub async fn spawn_streaming_process(
};
let stdout_handle = stdout.map(|stdout| {
let output_sink = output_sink.clone();
let stdout_tx = stdout_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stdout), output_sink, OutputStream::Stdout).await;
read_output_stream(BufReader::new(stdout), stdout_tx).await;
})
});
let stderr_handle = stderr.map(|stderr| {
let output_sink = output_sink.clone();
let stderr_tx = stderr_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stderr), output_sink, OutputStream::Stderr).await;
read_output_stream(BufReader::new(stderr), stderr_tx).await;
})
});
let mut reader_abort_handles = Vec::new();
@@ -222,7 +219,6 @@ pub async fn spawn_streaming_process(
let handle = ProcessHandle::new(
writer_tx,
output_tx,
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
@@ -238,13 +234,15 @@ pub async fn spawn_streaming_process(
None,
);
Ok(SpawnedStreamingProcess {
Ok(SpawnedProcess {
session: handle,
stdout_rx,
stderr_rx,
exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, output, and exit.
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, split output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
@@ -252,18 +250,7 @@ pub async fn spawn_process(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
let (output_sink, output_rx) = OutputSink::broadcast_combined();
let spawned = spawn_streaming_process(
program,
args,
cwd,
env,
arg0,
PipeStdinMode::Piped,
output_sink,
)
.await?;
Ok(spawned.into_spawned_process(output_rx))
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
}
/// Spawn a process using regular pipes, but close stdin immediately.
@@ -274,16 +261,5 @@ pub async fn spawn_process_no_stdin(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
let (output_sink, output_rx) = OutputSink::broadcast_combined();
let spawned = spawn_streaming_process(
program,
args,
cwd,
env,
arg0,
PipeStdinMode::Null,
output_sink,
)
.await?;
Ok(spawned.into_spawned_process(output_rx))
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await
}

View File

@@ -55,13 +55,9 @@ impl fmt::Debug for PtyHandles {
/// Handle for driving an interactive process (PTY or pipe).
pub struct ProcessHandle {
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
output_tx: Option<broadcast::Sender<Vec<u8>>>,
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
#[allow(dead_code)]
reader_handle: StdMutex<Option<JoinHandle<()>>>,
#[allow(dead_code)]
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
#[allow(dead_code)]
writer_handle: StdMutex<Option<JoinHandle<()>>>,
wait_handle: StdMutex<Option<JoinHandle<()>>>,
exit_status: Arc<AtomicBool>,
@@ -81,7 +77,6 @@ impl ProcessHandle {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: Option<broadcast::Sender<Vec<u8>>>,
killer: Box<dyn ChildTerminator>,
reader_handle: JoinHandle<()>,
reader_abort_handles: Vec<AbortHandle>,
@@ -93,7 +88,6 @@ impl ProcessHandle {
) -> Self {
Self {
writer_tx: StdMutex::new(Some(writer_tx)),
output_tx,
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_abort_handles: StdMutex::new(reader_abort_handles),
@@ -118,18 +112,6 @@ impl ProcessHandle {
writer_tx
}
/// Returns a broadcast receiver that yields stdout/stderr chunks when
/// combined output routing is configured.
pub fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
if let Some(output_tx) = self.output_tx.as_ref() {
return output_tx.subscribe();
}
let (output_tx, output_rx) = broadcast::channel(1);
drop(output_tx);
output_rx
}
/// True if the child process has exited.
pub fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
@@ -202,105 +184,46 @@ impl Drop for ProcessHandle {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum OutputStream {
Stdout,
Stderr,
}
/// Combine split stdout/stderr receivers into a single broadcast receiver.
pub fn combine_output_receivers(
mut stdout_rx: mpsc::Receiver<Vec<u8>>,
mut stderr_rx: mpsc::Receiver<Vec<u8>>,
) -> broadcast::Receiver<Vec<u8>> {
let (combined_tx, combined_rx) = broadcast::channel(256);
tokio::spawn(async move {
let mut stdout_open = true;
let mut stderr_open = true;
#[derive(Clone, Debug)]
pub enum OutputSink {
BroadcastCombined(broadcast::Sender<Vec<u8>>),
GuaranteedSeparate {
stdout: mpsc::Sender<Vec<u8>>,
stderr: mpsc::Sender<Vec<u8>>,
},
}
impl OutputSink {
pub fn broadcast_combined() -> (Self, broadcast::Receiver<Vec<u8>>) {
let (tx, rx) = broadcast::channel(256);
(OutputSink::BroadcastCombined(tx), rx)
}
pub fn guaranteed_separate() -> (Self, mpsc::Receiver<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
let (stdout_tx, stdout_rx) = mpsc::channel(128);
let (stderr_tx, stderr_rx) = mpsc::channel(128);
(
OutputSink::GuaranteedSeparate {
stdout: stdout_tx,
stderr: stderr_tx,
},
stdout_rx,
stderr_rx,
)
}
pub(crate) async fn send(&self, chunk: Vec<u8>, stream: OutputStream) {
match self {
OutputSink::BroadcastCombined(tx) => {
let _ = tx.send(chunk);
loop {
tokio::select! {
stdout = stdout_rx.recv(), if stdout_open => match stdout {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stdout_open = false;
}
},
stderr = stderr_rx.recv(), if stderr_open => match stderr {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stderr_open = false;
}
},
else => break,
}
OutputSink::GuaranteedSeparate { stdout, stderr } => match stream {
OutputStream::Stdout => {
let _ = stdout.send(chunk).await;
}
OutputStream::Stderr => {
let _ = stderr.send(chunk).await;
}
},
}
}
pub(crate) fn send_blocking(&self, chunk: Vec<u8>, stream: OutputStream) {
match self {
OutputSink::BroadcastCombined(tx) => {
let _ = tx.send(chunk);
}
OutputSink::GuaranteedSeparate { stdout, stderr } => match stream {
OutputStream::Stdout => {
let _ = stdout.blocking_send(chunk);
}
OutputStream::Stderr => {
let _ = stderr.blocking_send(chunk);
}
},
}
}
pub(crate) fn combined_sender(&self) -> Option<broadcast::Sender<Vec<u8>>> {
match self {
OutputSink::BroadcastCombined(tx) => Some(tx.clone()),
OutputSink::GuaranteedSeparate { .. } => None,
}
}
});
combined_rx
}
/// Return value from explicit streaming spawn helpers (PTY or pipe).
#[derive(Debug)]
pub struct SpawnedStreamingProcess {
pub session: ProcessHandle,
pub exit_rx: oneshot::Receiver<i32>,
}
impl SpawnedStreamingProcess {
pub(crate) fn into_spawned_process(
self,
output_rx: broadcast::Receiver<Vec<u8>>,
) -> SpawnedProcess {
let Self { session, exit_rx } = self;
SpawnedProcess {
session,
output_rx,
exit_rx,
}
}
}
/// Return value from backwards-compatible spawn helpers (PTY or pipe).
/// Return value from PTY or pipe spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcess {
pub session: ProcessHandle,
pub output_rx: broadcast::Receiver<Vec<u8>>,
pub stdout_rx: mpsc::Receiver<Vec<u8>>,
pub stderr_rx: mpsc::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}

View File

@@ -15,12 +15,9 @@ use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::OutputSink;
use crate::process::OutputStream;
use crate::process::ProcessHandle;
use crate::process::PtyHandles;
use crate::process::SpawnedProcess;
use crate::process::SpawnedStreamingProcess;
use crate::process::TerminalSize;
/// Returns true when ConPTY support is available (Windows only).
@@ -74,17 +71,15 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
}
}
/// Spawn a process attached to a PTY, returning handles for stdin and exit
/// while routing output through the caller-provided sink.
pub async fn spawn_streaming_process(
/// Spawn a process attached to a PTY, returning handles for stdin, split output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
size: TerminalSize,
output_sink: OutputSink,
) -> Result<SpawnedStreamingProcess> {
) -> Result<SpawnedProcess> {
if program.is_empty() {
anyhow::bail!("missing program for PTY spawn");
}
@@ -111,7 +106,8 @@ pub async fn spawn_streaming_process(
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let output_tx = output_sink.combined_sender();
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (_stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(1);
let mut reader = pair.master.try_clone_reader()?;
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8_192];
@@ -119,7 +115,7 @@ pub async fn spawn_streaming_process(
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
output_sink.send_blocking(buf[..n].to_vec(), OutputStream::Stdout);
let _ = stdout_tx.blocking_send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
@@ -173,7 +169,6 @@ pub async fn spawn_streaming_process(
let handle = ProcessHandle::new(
writer_tx,
output_tx,
Box::new(PtyChildTerminator {
killer,
#[cfg(unix)]
@@ -188,22 +183,10 @@ pub async fn spawn_streaming_process(
Some(handles),
);
Ok(SpawnedStreamingProcess {
Ok(SpawnedProcess {
session: handle,
stdout_rx,
stderr_rx,
exit_rx,
})
}
/// Spawn a process attached to a PTY, returning handles for stdin, output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
size: TerminalSize,
) -> Result<SpawnedProcess> {
let (output_sink, output_rx) = OutputSink::broadcast_combined();
let spawned = spawn_streaming_process(program, args, cwd, env, arg0, size, output_sink).await?;
Ok(spawned.into_spawned_process(output_rx))
}

View File

@@ -3,11 +3,11 @@ use std::path::Path;
use pretty_assertions::assert_eq;
use crate::pipe::spawn_streaming_process as spawn_pipe_streaming_process;
use crate::pipe::PipeStdinMode;
use crate::combine_output_receivers;
use crate::spawn_pipe_process;
use crate::spawn_pipe_process_no_stdin;
use crate::spawn_pty_process;
use crate::OutputSink;
use crate::SpawnedProcess;
use crate::TerminalSize;
fn find_python() -> Option<String> {
@@ -67,6 +67,26 @@ async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>
collected
}
fn combine_spawned_output(
spawned: SpawnedProcess,
) -> (
crate::ProcessHandle,
tokio::sync::broadcast::Receiver<Vec<u8>>,
tokio::sync::oneshot::Receiver<i32>,
) {
let SpawnedProcess {
session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
(
session,
combine_output_receivers(stdout_rx, stderr_rx),
exit_rx,
)
}
async fn collect_output_until_exit(
mut output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
exit_rx: tokio::sync::oneshot::Receiver<i32>,
@@ -244,8 +264,8 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
TerminalSize::default(),
)
.await?;
let writer = spawned.session.writer_sender();
let mut output_rx = spawned.output_rx;
let (session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
let writer = session.writer_sender();
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let mut output =
@@ -256,8 +276,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
writer.send(format!("exit(){newline}").into_bytes()).await?;
let timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let (remaining_output, code) =
collect_output_until_exit(output_rx, spawned.exit_rx, timeout_ms).await;
let (remaining_output, code) = collect_output_until_exit(output_rx, exit_rx, timeout_ms).await;
output.extend_from_slice(&remaining_output);
let text = String::from_utf8_lossy(&output);
@@ -284,10 +303,11 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?;
let writer = spawned.session.writer_sender();
let (session, output_rx, exit_rx) = combine_spawned_output(spawned);
let writer = session.writer_sender();
writer.send(b"roundtrip\n".to_vec()).await?;
let (output, code) = collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 5_000).await;
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
let text = String::from_utf8_lossy(&output);
assert!(
@@ -312,7 +332,7 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
let (program, args) = shell_command(script);
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let mut output_rx = spawned.output_rx;
let (_session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
let pid_bytes =
tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()).await??;
let pid_text = String::from_utf8_lossy(&pid_bytes);
@@ -333,7 +353,7 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
"expected child to be detached from parent session"
);
let exit_code = spawned.exit_rx.await.unwrap_or(-1);
let exit_code = exit_rx.await.unwrap_or(-1);
assert_eq!(
exit_code, 0,
"expected detached pipe process to exit cleanly"
@@ -360,12 +380,14 @@ async fn pipe_and_pty_share_interface() -> anyhow::Result<()> {
TerminalSize::default(),
)
.await?;
let (_pipe_session, pipe_output_rx, pipe_exit_rx) = combine_spawned_output(pipe);
let (_pty_session, pty_output_rx, pty_exit_rx) = combine_spawned_output(pty);
let timeout_ms = if cfg!(windows) { 10_000 } else { 3_000 };
let (pipe_out, pipe_code) =
collect_output_until_exit(pipe.output_rx, pipe.exit_rx, timeout_ms).await;
collect_output_until_exit(pipe_output_rx, pipe_exit_rx, timeout_ms).await;
let (pty_out, pty_code) =
collect_output_until_exit(pty.output_rx, pty.exit_rx, timeout_ms).await;
collect_output_until_exit(pty_output_rx, pty_exit_rx, timeout_ms).await;
assert_eq!(pipe_code, 0);
assert_eq!(pty_code, 0);
@@ -392,9 +414,9 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
let args = vec!["-c".to_string(), script.to_string()];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?;
let (_session, output_rx, exit_rx) = combine_spawned_output(spawned);
let (output, code) =
collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 10_000).await;
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 10_000).await;
assert_eq!(code, 0, "expected python to exit cleanly");
assert!(!output.is_empty(), "expected stderr output to be drained");
@@ -420,21 +442,18 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
} else {
shell_command(&split_stdout_stderr_command())
};
let (output_sink, stdout_rx, stderr_rx) = OutputSink::guaranteed_separate();
let spawned = spawn_pipe_streaming_process(
&program,
&args,
Path::new("."),
&env_map,
&None,
PipeStdinMode::Null,
output_sink,
)
.await?;
let spawned =
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
let SpawnedProcess {
session: _session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await });
let stderr_task = tokio::spawn(async move { collect_split_output(stderr_rx).await });
let code = tokio::time::timeout(tokio::time::Duration::from_secs(2), spawned.exit_rx)
let code = tokio::time::timeout(tokio::time::Duration::from_secs(2), exit_rx)
.await
.map_err(|_| anyhow::anyhow!("timed out waiting for split process exit"))?
.unwrap_or(-1);
@@ -463,17 +482,15 @@ async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> {
let script =
"setsid sh -c 'i=0; while [ $i -lt 200 ]; do echo tick; sleep 0.01; i=$((i+1)); done' &";
let (program, args) = shell_command(script);
let mut spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned);
let _ = tokio::time::timeout(
tokio::time::Duration::from_millis(500),
spawned.output_rx.recv(),
)
.await
.map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??;
let _ = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv())
.await
.map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??;
spawned.session.terminate();
let mut post_rx = spawned.session.output_receiver();
session.terminate();
let mut post_rx = output_rx.resubscribe();
let post_terminate =
tokio::time::timeout(tokio::time::Duration::from_millis(200), post_rx.recv()).await;
@@ -498,7 +515,7 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
let marker = "__codex_bg_pid:";
let script = format!("sleep 1000 & bg=$!; echo {marker}$bg; wait");
let (program, args) = shell_command(&script);
let mut spawned = spawn_pty_process(
let spawned = spawn_pty_process(
&program,
&args,
Path::new("."),
@@ -507,11 +524,12 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
TerminalSize::default(),
)
.await?;
let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned);
let bg_pid = match wait_for_marker_pid(&mut spawned.output_rx, marker, 2_000).await {
let bg_pid = match wait_for_marker_pid(&mut output_rx, marker, 2_000).await {
Ok(pid) => pid,
Err(err) => {
spawned.session.terminate();
session.terminate();
return Err(err);
}
};
@@ -520,7 +538,7 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
"expected background child pid {bg_pid} to exist before terminate"
);
spawned.session.terminate();
session.terminate();
let exited = wait_for_process_exit(bg_pid, 3_000).await?;
if !exited {