mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
utils/pty: add streaming spawn and terminal sizing primitives (#13695)
Enhance pty utils: * Support closing stdin * Separate stderr and stdout streams to allow consumers differentiate them * Provide compatibility helper to merge both streams back into combined one * Support specifying terminal size for pty, including on-demand resizes while process is already running * Support terminating the process while still consuming its outputs
This commit is contained in:
committed by
GitHub
parent
4e68fb96e2
commit
5b04cc657f
@@ -5,6 +5,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -47,6 +48,7 @@ pub(crate) struct OutputHandles {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UnifiedExecProcess {
|
||||
process_handle: ExecCommandSession,
|
||||
output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
@@ -72,6 +74,7 @@ impl UnifiedExecProcess {
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let output_drained = Arc::new(Notify::new());
|
||||
let mut receiver = initial_output_rx;
|
||||
let output_rx = receiver.resubscribe();
|
||||
let buffer_clone = Arc::clone(&output_buffer);
|
||||
let notify_clone = Arc::clone(&output_notify);
|
||||
let output_closed_clone = Arc::clone(&output_closed);
|
||||
@@ -97,6 +100,7 @@ impl UnifiedExecProcess {
|
||||
|
||||
Self {
|
||||
process_handle,
|
||||
output_rx,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -124,7 +128,7 @@ impl UnifiedExecProcess {
|
||||
}
|
||||
|
||||
pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
|
||||
self.process_handle.output_receiver()
|
||||
self.output_rx.resubscribe()
|
||||
}
|
||||
|
||||
pub(super) fn cancellation_token(&self) -> CancellationToken {
|
||||
@@ -214,9 +218,11 @@ impl UnifiedExecProcess {
|
||||
) -> Result<Self, UnifiedExecError> {
|
||||
let SpawnedPty {
|
||||
session: process_handle,
|
||||
output_rx,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
mut exit_rx,
|
||||
} = spawned;
|
||||
let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
|
||||
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
|
||||
|
||||
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));
|
||||
|
||||
@@ -543,6 +543,7 @@ impl UnifiedExecProcessManager {
|
||||
env.cwd.as_path(),
|
||||
&env.env,
|
||||
&env.arg0,
|
||||
codex_utils_pty::TerminalSize::default(),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
|
||||
@@ -124,13 +124,20 @@ trust_level = "trusted"
|
||||
&repo_root,
|
||||
&env,
|
||||
&None,
|
||||
codex_utils_pty::TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut output = Vec::new();
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let mut exit_rx = spawned.exit_rx;
|
||||
let writer_tx = spawned.session.writer_sender();
|
||||
let codex_utils_pty::SpawnedProcess {
|
||||
session,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
exit_rx,
|
||||
} = spawned;
|
||||
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
|
||||
let mut exit_rx = exit_rx;
|
||||
let writer_tx = session.writer_sender();
|
||||
let interrupt_writer = writer_tx.clone();
|
||||
let interrupt_task = tokio::spawn(async move {
|
||||
sleep(Duration::from_secs(2)).await;
|
||||
@@ -165,7 +172,7 @@ trust_level = "trusted"
|
||||
Ok(Ok(code)) => code,
|
||||
Ok(Err(err)) => return Err(err.into()),
|
||||
Err(_) => {
|
||||
spawned.session.terminate();
|
||||
session.terminate();
|
||||
anyhow::bail!("timed out waiting for codex resume to exit");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -71,12 +71,19 @@ async fn run_codex_cli(
|
||||
cwd.as_ref(),
|
||||
&env,
|
||||
&None,
|
||||
codex_utils_pty::TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
let mut output = Vec::new();
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let mut exit_rx = spawned.exit_rx;
|
||||
let writer_tx = spawned.session.writer_sender();
|
||||
let codex_utils_pty::SpawnedProcess {
|
||||
session,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
exit_rx,
|
||||
} = spawned;
|
||||
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
|
||||
let mut exit_rx = exit_rx;
|
||||
let writer_tx = session.writer_sender();
|
||||
let exit_code_result = timeout(Duration::from_secs(10), async {
|
||||
// Read PTY output until the process exits while replying to cursor
|
||||
// position queries so the TUI can initialize without a real terminal.
|
||||
@@ -103,7 +110,7 @@ async fn run_codex_cli(
|
||||
Ok(Ok(code)) => code,
|
||||
Ok(Err(err)) => return Err(err.into()),
|
||||
Err(_) => {
|
||||
spawned.session.terminate();
|
||||
session.terminate();
|
||||
anyhow::bail!("timed out waiting for codex CLI to exit");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -4,22 +4,27 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
|
||||
|
||||
## API surface
|
||||
|
||||
- `spawn_pty_process(program, args, cwd, env, arg0)` → `SpawnedProcess`
|
||||
- `spawn_pty_process(program, args, cwd, env, arg0, size)` → `SpawnedProcess`
|
||||
- `spawn_pipe_process(program, args, cwd, env, arg0)` → `SpawnedProcess`
|
||||
- `spawn_pipe_process_no_stdin(program, args, cwd, env, arg0)` → `SpawnedProcess`
|
||||
- `combine_output_receivers(stdout_rx, stderr_rx)` → `broadcast::Receiver<Vec<u8>>`
|
||||
- `conpty_supported()` → `bool` (Windows only; always true elsewhere)
|
||||
- `TerminalSize { rows, cols }` selects PTY dimensions in character cells.
|
||||
- `ProcessHandle` exposes:
|
||||
- `writer_sender()` → `mpsc::Sender<Vec<u8>>` (stdin)
|
||||
- `output_receiver()` → `broadcast::Receiver<Vec<u8>>` (stdout/stderr merged)
|
||||
- `resize(TerminalSize)`
|
||||
- `close_stdin()`
|
||||
- `has_exited()`, `exit_code()`, `terminate()`
|
||||
- `SpawnedProcess` bundles `handle`, `output_rx`, and `exit_rx` (oneshot exit code).
|
||||
- `SpawnedProcess` bundles `session`, `stdout_rx`, `stderr_rx`, and `exit_rx` (oneshot exit code).
|
||||
|
||||
## Usage examples
|
||||
|
||||
```rust
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use codex_utils_pty::combine_output_receivers;
|
||||
use codex_utils_pty::spawn_pty_process;
|
||||
use codex_utils_pty::TerminalSize;
|
||||
|
||||
# tokio_test::block_on(async {
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
@@ -29,13 +34,14 @@ let spawned = spawn_pty_process(
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
).await?;
|
||||
|
||||
let writer = spawned.session.writer_sender();
|
||||
writer.send(b"exit\n".to_vec()).await?;
|
||||
|
||||
// Collect output until the process exits.
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let mut output_rx = combine_output_receivers(spawned.stdout_rx, spawned.stderr_rx);
|
||||
let mut collected = Vec::new();
|
||||
while let Ok(chunk) = output_rx.try_recv() {
|
||||
collected.extend_from_slice(&chunk);
|
||||
|
||||
@@ -7,14 +7,20 @@ mod tests;
|
||||
#[cfg(windows)]
|
||||
mod win;
|
||||
|
||||
pub const DEFAULT_OUTPUT_BYTES_CAP: usize = 1024 * 1024;
|
||||
|
||||
/// Spawn a non-interactive process using regular pipes for stdin/stdout/stderr.
|
||||
pub use pipe::spawn_process as spawn_pipe_process;
|
||||
/// Spawn a non-interactive process using regular pipes, but close stdin immediately.
|
||||
pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
|
||||
/// Combine stdout/stderr receivers into a single broadcast receiver.
|
||||
pub use process::combine_output_receivers;
|
||||
/// Handle for interacting with a spawned process (PTY or pipe).
|
||||
pub use process::ProcessHandle;
|
||||
/// Bundle of process handles plus output and exit receivers returned by spawn helpers.
|
||||
/// Bundle of process handles plus split output and exit receivers returned by spawn helpers.
|
||||
pub use process::SpawnedProcess;
|
||||
/// Terminal size in character cells used for PTY spawn and resize operations.
|
||||
pub use process::TerminalSize;
|
||||
/// Backwards-compatible alias for ProcessHandle.
|
||||
pub type ExecCommandSession = ProcessHandle;
|
||||
/// Backwards-compatible alias for SpawnedProcess.
|
||||
|
||||
@@ -13,7 +13,6 @@ use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -73,7 +72,7 @@ fn kill_process(pid: u32) -> io::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_output_stream<R>(mut reader: R, output_tx: broadcast::Sender<Vec<u8>>)
|
||||
async fn read_output_stream<R>(mut reader: R, output_tx: mpsc::Sender<Vec<u8>>)
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
@@ -82,7 +81,7 @@ where
|
||||
match reader.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx.send(buf[..n].to_vec());
|
||||
let _ = output_tx.send(buf[..n].to_vec()).await;
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(_) => break,
|
||||
@@ -157,9 +156,8 @@ async fn spawn_process_with_stdin_mode(
|
||||
let stderr = child.stderr.take();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let writer_handle = if let Some(stdin) = stdin {
|
||||
let writer = Arc::new(tokio::sync::Mutex::new(stdin));
|
||||
tokio::spawn(async move {
|
||||
@@ -175,15 +173,15 @@ async fn spawn_process_with_stdin_mode(
|
||||
};
|
||||
|
||||
let stdout_handle = stdout.map(|stdout| {
|
||||
let output_tx = output_tx.clone();
|
||||
let stdout_tx = stdout_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
read_output_stream(BufReader::new(stdout), output_tx).await;
|
||||
read_output_stream(BufReader::new(stdout), stdout_tx).await;
|
||||
})
|
||||
});
|
||||
let stderr_handle = stderr.map(|stderr| {
|
||||
let output_tx = output_tx.clone();
|
||||
let stderr_tx = stderr_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
read_output_stream(BufReader::new(stderr), output_tx).await;
|
||||
read_output_stream(BufReader::new(stderr), stderr_tx).await;
|
||||
})
|
||||
});
|
||||
let mut reader_abort_handles = Vec::new();
|
||||
@@ -219,10 +217,8 @@ async fn spawn_process_with_stdin_mode(
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
let (handle, output_rx) = ProcessHandle::new(
|
||||
let handle = ProcessHandle::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
Box::new(PipeChildTerminator {
|
||||
#[cfg(windows)]
|
||||
pid,
|
||||
@@ -240,12 +236,13 @@ async fn spawn_process_with_stdin_mode(
|
||||
|
||||
Ok(SpawnedProcess {
|
||||
session: handle,
|
||||
output_rx,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, output, and exit.
|
||||
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, split output, and exit.
|
||||
pub async fn spawn_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
|
||||
@@ -4,7 +4,9 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use portable_pty::MasterPty;
|
||||
use portable_pty::PtySize;
|
||||
use portable_pty::SlavePty;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -16,6 +18,29 @@ pub(crate) trait ChildTerminator: Send + Sync {
|
||||
fn kill(&mut self) -> io::Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct TerminalSize {
|
||||
pub rows: u16,
|
||||
pub cols: u16,
|
||||
}
|
||||
|
||||
impl Default for TerminalSize {
|
||||
fn default() -> Self {
|
||||
Self { rows: 24, cols: 80 }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TerminalSize> for PtySize {
|
||||
fn from(value: TerminalSize) -> Self {
|
||||
Self {
|
||||
rows: value.rows,
|
||||
cols: value.cols,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PtyHandles {
|
||||
pub _slave: Option<Box<dyn SlavePty + Send>>,
|
||||
pub _master: Box<dyn MasterPty + Send>,
|
||||
@@ -29,8 +54,7 @@ impl fmt::Debug for PtyHandles {
|
||||
|
||||
/// Handle for driving an interactive process (PTY or pipe).
|
||||
pub struct ProcessHandle {
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
|
||||
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
|
||||
reader_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
|
||||
@@ -53,8 +77,6 @@ impl ProcessHandle {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
initial_output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
killer: Box<dyn ChildTerminator>,
|
||||
reader_handle: JoinHandle<()>,
|
||||
reader_abort_handles: Vec<AbortHandle>,
|
||||
@@ -63,32 +85,31 @@ impl ProcessHandle {
|
||||
exit_status: Arc<AtomicBool>,
|
||||
exit_code: Arc<StdMutex<Option<i32>>>,
|
||||
pty_handles: Option<PtyHandles>,
|
||||
) -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
(
|
||||
Self {
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer: StdMutex::new(Some(killer)),
|
||||
reader_handle: StdMutex::new(Some(reader_handle)),
|
||||
reader_abort_handles: StdMutex::new(reader_abort_handles),
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
exit_code,
|
||||
_pty_handles: StdMutex::new(pty_handles),
|
||||
},
|
||||
initial_output_rx,
|
||||
)
|
||||
) -> Self {
|
||||
Self {
|
||||
writer_tx: StdMutex::new(Some(writer_tx)),
|
||||
killer: StdMutex::new(Some(killer)),
|
||||
reader_handle: StdMutex::new(Some(reader_handle)),
|
||||
reader_abort_handles: StdMutex::new(reader_abort_handles),
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
exit_code,
|
||||
_pty_handles: StdMutex::new(pty_handles),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a channel sender for writing raw bytes to the child stdin.
|
||||
pub fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
|
||||
self.writer_tx.clone()
|
||||
}
|
||||
if let Ok(writer_tx) = self.writer_tx.lock() {
|
||||
if let Some(writer_tx) = writer_tx.as_ref() {
|
||||
return writer_tx.clone();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a broadcast receiver that yields stdout/stderr chunks.
|
||||
pub fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
|
||||
self.output_tx.subscribe()
|
||||
let (writer_tx, writer_rx) = mpsc::channel(1);
|
||||
drop(writer_rx);
|
||||
writer_tx
|
||||
}
|
||||
|
||||
/// True if the child process has exited.
|
||||
@@ -101,13 +122,38 @@ impl ProcessHandle {
|
||||
self.exit_code.lock().ok().and_then(|guard| *guard)
|
||||
}
|
||||
|
||||
/// Attempts to kill the child and abort helper tasks.
|
||||
pub fn terminate(&self) {
|
||||
/// Resize the PTY in character cells.
|
||||
pub fn resize(&self, size: TerminalSize) -> anyhow::Result<()> {
|
||||
let handles = self
|
||||
._pty_handles
|
||||
.lock()
|
||||
.map_err(|_| anyhow!("failed to lock PTY handles"))?;
|
||||
let handles = handles
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("process is not attached to a PTY"))?;
|
||||
handles._master.resize(size.into())
|
||||
}
|
||||
|
||||
/// Close the child's stdin channel.
|
||||
pub fn close_stdin(&self) {
|
||||
if let Ok(mut writer_tx) = self.writer_tx.lock() {
|
||||
writer_tx.take();
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to kill the child while leaving the reader/writer tasks alive
|
||||
/// so callers can still drain output until EOF.
|
||||
pub fn request_terminate(&self) {
|
||||
if let Ok(mut killer_opt) = self.killer.lock() {
|
||||
if let Some(mut killer) = killer_opt.take() {
|
||||
let _ = killer.kill();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to kill the child and abort helper tasks.
|
||||
pub fn terminate(&self) {
|
||||
self.request_terminate();
|
||||
|
||||
if let Ok(mut h) = self.reader_handle.lock() {
|
||||
if let Some(handle) = h.take() {
|
||||
@@ -138,10 +184,46 @@ impl Drop for ProcessHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value from spawn helpers (PTY or pipe).
|
||||
/// Combine split stdout/stderr receivers into a single broadcast receiver.
|
||||
pub fn combine_output_receivers(
|
||||
mut stdout_rx: mpsc::Receiver<Vec<u8>>,
|
||||
mut stderr_rx: mpsc::Receiver<Vec<u8>>,
|
||||
) -> broadcast::Receiver<Vec<u8>> {
|
||||
let (combined_tx, combined_rx) = broadcast::channel(256);
|
||||
tokio::spawn(async move {
|
||||
let mut stdout_open = true;
|
||||
let mut stderr_open = true;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
stdout = stdout_rx.recv(), if stdout_open => match stdout {
|
||||
Some(chunk) => {
|
||||
let _ = combined_tx.send(chunk);
|
||||
}
|
||||
None => {
|
||||
stdout_open = false;
|
||||
}
|
||||
},
|
||||
stderr = stderr_rx.recv(), if stderr_open => match stderr {
|
||||
Some(chunk) => {
|
||||
let _ = combined_tx.send(chunk);
|
||||
}
|
||||
None => {
|
||||
stderr_open = false;
|
||||
}
|
||||
},
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
combined_rx
|
||||
}
|
||||
|
||||
/// Return value from PTY or pipe spawn helpers.
|
||||
#[derive(Debug)]
|
||||
pub struct SpawnedProcess {
|
||||
pub session: ProcessHandle,
|
||||
pub output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
pub stdout_rx: mpsc::Receiver<Vec<u8>>,
|
||||
pub stderr_rx: mpsc::Receiver<Vec<u8>>,
|
||||
pub exit_rx: oneshot::Receiver<i32>,
|
||||
}
|
||||
|
||||
@@ -10,8 +10,6 @@ use anyhow::Result;
|
||||
#[cfg(not(windows))]
|
||||
use portable_pty::native_pty_system;
|
||||
use portable_pty::CommandBuilder;
|
||||
use portable_pty::PtySize;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -20,6 +18,7 @@ use crate::process::ChildTerminator;
|
||||
use crate::process::ProcessHandle;
|
||||
use crate::process::PtyHandles;
|
||||
use crate::process::SpawnedProcess;
|
||||
use crate::process::TerminalSize;
|
||||
|
||||
/// Returns true when ConPTY support is available (Windows only).
|
||||
#[cfg(windows)]
|
||||
@@ -72,25 +71,21 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a process attached to a PTY, returning handles for stdin, output, and exit.
|
||||
/// Spawn a process attached to a PTY, returning handles for stdin, split output, and exit.
|
||||
pub async fn spawn_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
size: TerminalSize,
|
||||
) -> Result<SpawnedProcess> {
|
||||
if program.is_empty() {
|
||||
anyhow::bail!("missing program for PTY spawn");
|
||||
}
|
||||
|
||||
let pty_system = platform_native_pty_system();
|
||||
let pair = pty_system.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})?;
|
||||
let pair = pty_system.openpty(size.into())?;
|
||||
|
||||
let mut command_builder = CommandBuilder::new(arg0.as_ref().unwrap_or(&program.to_string()));
|
||||
command_builder.cwd(cwd);
|
||||
@@ -111,18 +106,16 @@ pub async fn spawn_process(
|
||||
let killer = child.clone_killer();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (_stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(1);
|
||||
let mut reader = pair.master.try_clone_reader()?;
|
||||
let output_tx_clone = output_tx.clone();
|
||||
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8_192];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx_clone.send(buf[..n].to_vec());
|
||||
let _ = stdout_tx.blocking_send(buf[..n].to_vec());
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
@@ -174,10 +167,8 @@ pub async fn spawn_process(
|
||||
_master: pair.master,
|
||||
};
|
||||
|
||||
let (handle, output_rx) = ProcessHandle::new(
|
||||
let handle = ProcessHandle::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
Box::new(PtyChildTerminator {
|
||||
killer,
|
||||
#[cfg(unix)]
|
||||
@@ -194,7 +185,8 @@ pub async fn spawn_process(
|
||||
|
||||
Ok(SpawnedProcess {
|
||||
session: handle,
|
||||
output_rx,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,8 +3,12 @@ use std::path::Path;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use crate::combine_output_receivers;
|
||||
use crate::spawn_pipe_process;
|
||||
use crate::spawn_pipe_process_no_stdin;
|
||||
use crate::spawn_pty_process;
|
||||
use crate::SpawnedProcess;
|
||||
use crate::TerminalSize;
|
||||
|
||||
fn find_python() -> Option<String> {
|
||||
for candidate in ["python3", "python"] {
|
||||
@@ -51,6 +55,38 @@ fn echo_sleep_command(marker: &str) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
fn split_stdout_stderr_command() -> String {
|
||||
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
|
||||
}
|
||||
|
||||
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
|
||||
let mut collected = Vec::new();
|
||||
while let Some(chunk) = output_rx.recv().await {
|
||||
collected.extend_from_slice(&chunk);
|
||||
}
|
||||
collected
|
||||
}
|
||||
|
||||
fn combine_spawned_output(
|
||||
spawned: SpawnedProcess,
|
||||
) -> (
|
||||
crate::ProcessHandle,
|
||||
tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
tokio::sync::oneshot::Receiver<i32>,
|
||||
) {
|
||||
let SpawnedProcess {
|
||||
session,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
exit_rx,
|
||||
} = spawned;
|
||||
(
|
||||
session,
|
||||
combine_output_receivers(stdout_rx, stderr_rx),
|
||||
exit_rx,
|
||||
)
|
||||
}
|
||||
|
||||
async fn collect_output_until_exit(
|
||||
mut output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
exit_rx: tokio::sync::oneshot::Receiver<i32>,
|
||||
@@ -219,9 +255,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pty_process(&python, &[], Path::new("."), &env_map, &None).await?;
|
||||
let writer = spawned.session.writer_sender();
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let spawned = spawn_pty_process(
|
||||
&python,
|
||||
&[],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
let (session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let writer = session.writer_sender();
|
||||
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
|
||||
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
|
||||
let mut output =
|
||||
@@ -232,8 +276,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
writer.send(format!("exit(){newline}").into_bytes()).await?;
|
||||
|
||||
let timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
|
||||
let (remaining_output, code) =
|
||||
collect_output_until_exit(output_rx, spawned.exit_rx, timeout_ms).await;
|
||||
let (remaining_output, code) = collect_output_until_exit(output_rx, exit_rx, timeout_ms).await;
|
||||
output.extend_from_slice(&remaining_output);
|
||||
let text = String::from_utf8_lossy(&output);
|
||||
|
||||
@@ -260,10 +303,11 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
|
||||
];
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?;
|
||||
let writer = spawned.session.writer_sender();
|
||||
let (session, output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let writer = session.writer_sender();
|
||||
writer.send(b"roundtrip\n".to_vec()).await?;
|
||||
|
||||
let (output, code) = collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 5_000).await;
|
||||
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
|
||||
let text = String::from_utf8_lossy(&output);
|
||||
|
||||
assert!(
|
||||
@@ -288,7 +332,7 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
|
||||
let (program, args) = shell_command(script);
|
||||
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let (_session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let pid_bytes =
|
||||
tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()).await??;
|
||||
let pid_text = String::from_utf8_lossy(&pid_bytes);
|
||||
@@ -309,7 +353,7 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
|
||||
"expected child to be detached from parent session"
|
||||
);
|
||||
|
||||
let exit_code = spawned.exit_rx.await.unwrap_or(-1);
|
||||
let exit_code = exit_rx.await.unwrap_or(-1);
|
||||
assert_eq!(
|
||||
exit_code, 0,
|
||||
"expected detached pipe process to exit cleanly"
|
||||
@@ -327,13 +371,23 @@ async fn pipe_and_pty_share_interface() -> anyhow::Result<()> {
|
||||
|
||||
let pipe =
|
||||
spawn_pipe_process(&pipe_program, &pipe_args, Path::new("."), &env_map, &None).await?;
|
||||
let pty = spawn_pty_process(&pty_program, &pty_args, Path::new("."), &env_map, &None).await?;
|
||||
let pty = spawn_pty_process(
|
||||
&pty_program,
|
||||
&pty_args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
let (_pipe_session, pipe_output_rx, pipe_exit_rx) = combine_spawned_output(pipe);
|
||||
let (_pty_session, pty_output_rx, pty_exit_rx) = combine_spawned_output(pty);
|
||||
|
||||
let timeout_ms = if cfg!(windows) { 10_000 } else { 3_000 };
|
||||
let (pipe_out, pipe_code) =
|
||||
collect_output_until_exit(pipe.output_rx, pipe.exit_rx, timeout_ms).await;
|
||||
collect_output_until_exit(pipe_output_rx, pipe_exit_rx, timeout_ms).await;
|
||||
let (pty_out, pty_code) =
|
||||
collect_output_until_exit(pty.output_rx, pty.exit_rx, timeout_ms).await;
|
||||
collect_output_until_exit(pty_output_rx, pty_exit_rx, timeout_ms).await;
|
||||
|
||||
assert_eq!(pipe_code, 0);
|
||||
assert_eq!(pty_code, 0);
|
||||
@@ -360,9 +414,9 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
|
||||
let args = vec!["-c".to_string(), script.to_string()];
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?;
|
||||
let (_session, output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
|
||||
let (output, code) =
|
||||
collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 10_000).await;
|
||||
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 10_000).await;
|
||||
|
||||
assert_eq!(code, 0, "expected python to exit cleanly");
|
||||
assert!(!output.is_empty(), "expected stderr output to be drained");
|
||||
@@ -370,6 +424,53 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let (program, args) = if cfg!(windows) {
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr");
|
||||
return Ok(());
|
||||
};
|
||||
(
|
||||
python,
|
||||
vec![
|
||||
"-c".to_string(),
|
||||
"import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(),
|
||||
],
|
||||
)
|
||||
} else {
|
||||
shell_command(&split_stdout_stderr_command())
|
||||
};
|
||||
let spawned =
|
||||
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let SpawnedProcess {
|
||||
session: _session,
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
exit_rx,
|
||||
} = spawned;
|
||||
|
||||
let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await });
|
||||
let stderr_task = tokio::spawn(async move { collect_split_output(stderr_rx).await });
|
||||
let code = tokio::time::timeout(tokio::time::Duration::from_secs(2), exit_rx)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting for split process exit"))?
|
||||
.unwrap_or(-1);
|
||||
let stdout = tokio::time::timeout(tokio::time::Duration::from_secs(2), stdout_task)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stdout"))??;
|
||||
let stderr = tokio::time::timeout(tokio::time::Duration::from_secs(2), stderr_task)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??;
|
||||
|
||||
assert_eq!(stdout, b"split-out\n".to_vec());
|
||||
assert_eq!(stderr, b"split-err\n".to_vec());
|
||||
assert_eq!(code, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> {
|
||||
if !setsid_available() {
|
||||
@@ -381,17 +482,15 @@ async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> {
|
||||
let script =
|
||||
"setsid sh -c 'i=0; while [ $i -lt 200 ]; do echo tick; sleep 0.01; i=$((i+1)); done' &";
|
||||
let (program, args) = shell_command(script);
|
||||
let mut spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned);
|
||||
|
||||
let _ = tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(500),
|
||||
spawned.output_rx.recv(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??;
|
||||
let _ = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv())
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??;
|
||||
|
||||
spawned.session.terminate();
|
||||
let mut post_rx = spawned.session.output_receiver();
|
||||
session.terminate();
|
||||
let mut post_rx = output_rx.resubscribe();
|
||||
|
||||
let post_terminate =
|
||||
tokio::time::timeout(tokio::time::Duration::from_millis(200), post_rx.recv()).await;
|
||||
@@ -416,12 +515,21 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
|
||||
let marker = "__codex_bg_pid:";
|
||||
let script = format!("sleep 1000 & bg=$!; echo {marker}$bg; wait");
|
||||
let (program, args) = shell_command(&script);
|
||||
let mut spawned = spawn_pty_process(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let spawned = spawn_pty_process(
|
||||
&program,
|
||||
&args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned);
|
||||
|
||||
let bg_pid = match wait_for_marker_pid(&mut spawned.output_rx, marker, 2_000).await {
|
||||
let bg_pid = match wait_for_marker_pid(&mut output_rx, marker, 2_000).await {
|
||||
Ok(pid) => pid,
|
||||
Err(err) => {
|
||||
spawned.session.terminate();
|
||||
session.terminate();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
@@ -430,7 +538,7 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
|
||||
"expected background child pid {bg_pid} to exist before terminate"
|
||||
);
|
||||
|
||||
spawned.session.terminate();
|
||||
session.terminate();
|
||||
|
||||
let exited = wait_for_process_exit(bg_pid, 3_000).await?;
|
||||
if !exited {
|
||||
|
||||
Reference in New Issue
Block a user