mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
utils/pty: add streaming spawn and terminal sizing primitives
Split stdout and stderr routing into reusable output sinks, expose explicit streaming spawn helpers, and thread terminal sizing through the shared PTY layer. This keeps the legacy spawn helpers intact so existing callers only need mechanical call-site updates while app-server gains the lower-level primitives it needs for interactive command execution.
This commit is contained in:
@@ -543,6 +543,7 @@ impl UnifiedExecProcessManager {
|
||||
env.cwd.as_path(),
|
||||
&env.env,
|
||||
&env.arg0,
|
||||
codex_utils_pty::TerminalSize::default(),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
|
||||
@@ -124,6 +124,7 @@ trust_level = "trusted"
|
||||
&repo_root,
|
||||
&env,
|
||||
&None,
|
||||
codex_utils_pty::TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ async fn run_codex_cli(
|
||||
cwd.as_ref(),
|
||||
&env,
|
||||
&None,
|
||||
codex_utils_pty::TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
let mut output = Vec::new();
|
||||
|
||||
@@ -4,15 +4,21 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
|
||||
|
||||
## API surface
|
||||
|
||||
- `spawn_pty_process(program, args, cwd, env, arg0)` → `SpawnedProcess`
|
||||
- `spawn_pty_process(program, args, cwd, env, arg0, size)` → `SpawnedProcess`
|
||||
- `spawn_pipe_process(program, args, cwd, env, arg0)` → `SpawnedProcess`
|
||||
- `spawn_pipe_process_no_stdin(program, args, cwd, env, arg0)` → `SpawnedProcess`
|
||||
- `pty::spawn_streaming_process(program, args, cwd, env, arg0, size, output_sink)` → `SpawnedStreamingProcess`
|
||||
- `pipe::spawn_streaming_process(program, args, cwd, env, arg0, stdin_mode, output_sink)` → `SpawnedStreamingProcess`
|
||||
- `conpty_supported()` → `bool` (Windows only; always true elsewhere)
|
||||
- `TerminalSize { rows, cols }` selects PTY dimensions in character cells.
|
||||
- `ProcessHandle` exposes:
|
||||
- `writer_sender()` → `mpsc::Sender<Vec<u8>>` (stdin)
|
||||
- `output_receiver()` → `broadcast::Receiver<Vec<u8>>` (stdout/stderr merged)
|
||||
- `resize(TerminalSize)`
|
||||
- `close_stdin()`
|
||||
- `has_exited()`, `exit_code()`, `terminate()`
|
||||
- `SpawnedProcess` bundles `handle`, `output_rx`, and `exit_rx` (oneshot exit code).
|
||||
- `SpawnedProcess` bundles `session`, `output_rx`, and `exit_rx` (oneshot exit code).
|
||||
- `SpawnedStreamingProcess` bundles `session` and `exit_rx`; callers own the output sink/receivers.
|
||||
|
||||
## Usage examples
|
||||
|
||||
@@ -20,6 +26,7 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use codex_utils_pty::spawn_pty_process;
|
||||
use codex_utils_pty::TerminalSize;
|
||||
|
||||
# tokio_test::block_on(async {
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
@@ -29,6 +36,7 @@ let spawned = spawn_pty_process(
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
).await?;
|
||||
|
||||
let writer = spawned.session.writer_sender();
|
||||
|
||||
@@ -7,14 +7,22 @@ mod tests;
|
||||
#[cfg(windows)]
|
||||
mod win;
|
||||
|
||||
pub const DEFAULT_OUTPUT_BYTES_CAP: usize = 1024 * 1024;
|
||||
|
||||
/// Spawn a non-interactive process using regular pipes for stdin/stdout/stderr.
|
||||
pub use pipe::spawn_process as spawn_pipe_process;
|
||||
/// Spawn a non-interactive process using regular pipes, but close stdin immediately.
|
||||
pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
|
||||
/// Output routing configuration for spawned processes.
|
||||
pub use process::OutputSink;
|
||||
/// Handle for interacting with a spawned process (PTY or pipe).
|
||||
pub use process::ProcessHandle;
|
||||
/// Bundle of process handles plus output and exit receivers returned by spawn helpers.
|
||||
pub use process::SpawnedProcess;
|
||||
/// Bundle of process handles and exit receiver returned by streaming spawn helpers.
|
||||
pub use process::SpawnedStreamingProcess;
|
||||
/// Terminal size in character cells used for PTY spawn and resize operations.
|
||||
pub use process::TerminalSize;
|
||||
/// Backwards-compatible alias for ProcessHandle.
|
||||
pub type ExecCommandSession = ProcessHandle;
|
||||
/// Backwards-compatible alias for SpawnedProcess.
|
||||
|
||||
@@ -13,14 +13,16 @@ use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::process::ChildTerminator;
|
||||
use crate::process::OutputSink;
|
||||
use crate::process::OutputStream;
|
||||
use crate::process::ProcessHandle;
|
||||
use crate::process::SpawnedProcess;
|
||||
use crate::process::SpawnedStreamingProcess;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use libc;
|
||||
@@ -73,7 +75,7 @@ fn kill_process(pid: u32) -> io::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_output_stream<R>(mut reader: R, output_tx: broadcast::Sender<Vec<u8>>)
|
||||
async fn read_output_stream<R>(mut reader: R, output_sink: OutputSink, stream: OutputStream)
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
@@ -82,7 +84,7 @@ where
|
||||
match reader.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx.send(buf[..n].to_vec());
|
||||
output_sink.send(buf[..n].to_vec(), stream).await;
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(_) => break,
|
||||
@@ -90,20 +92,21 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum PipeStdinMode {
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum PipeStdinMode {
|
||||
Piped,
|
||||
Null,
|
||||
}
|
||||
|
||||
async fn spawn_process_with_stdin_mode(
|
||||
pub async fn spawn_streaming_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
stdin_mode: PipeStdinMode,
|
||||
) -> Result<SpawnedProcess> {
|
||||
output_sink: OutputSink,
|
||||
) -> Result<SpawnedStreamingProcess> {
|
||||
if program.is_empty() {
|
||||
anyhow::bail!("missing program for pipe spawn");
|
||||
}
|
||||
@@ -157,9 +160,7 @@ async fn spawn_process_with_stdin_mode(
|
||||
let stderr = child.stderr.take();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
|
||||
let output_tx = output_sink.combined_sender();
|
||||
let writer_handle = if let Some(stdin) = stdin {
|
||||
let writer = Arc::new(tokio::sync::Mutex::new(stdin));
|
||||
tokio::spawn(async move {
|
||||
@@ -175,15 +176,15 @@ async fn spawn_process_with_stdin_mode(
|
||||
};
|
||||
|
||||
let stdout_handle = stdout.map(|stdout| {
|
||||
let output_tx = output_tx.clone();
|
||||
let output_sink = output_sink.clone();
|
||||
tokio::spawn(async move {
|
||||
read_output_stream(BufReader::new(stdout), output_tx).await;
|
||||
read_output_stream(BufReader::new(stdout), output_sink, OutputStream::Stdout).await;
|
||||
})
|
||||
});
|
||||
let stderr_handle = stderr.map(|stderr| {
|
||||
let output_tx = output_tx.clone();
|
||||
let output_sink = output_sink.clone();
|
||||
tokio::spawn(async move {
|
||||
read_output_stream(BufReader::new(stderr), output_tx).await;
|
||||
read_output_stream(BufReader::new(stderr), output_sink, OutputStream::Stderr).await;
|
||||
})
|
||||
});
|
||||
let mut reader_abort_handles = Vec::new();
|
||||
@@ -219,10 +220,9 @@ async fn spawn_process_with_stdin_mode(
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
let (handle, output_rx) = ProcessHandle::new(
|
||||
let handle = ProcessHandle::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
Box::new(PipeChildTerminator {
|
||||
#[cfg(windows)]
|
||||
pid,
|
||||
@@ -238,9 +238,8 @@ async fn spawn_process_with_stdin_mode(
|
||||
None,
|
||||
);
|
||||
|
||||
Ok(SpawnedProcess {
|
||||
Ok(SpawnedStreamingProcess {
|
||||
session: handle,
|
||||
output_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
@@ -253,7 +252,18 @@ pub async fn spawn_process(
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedProcess> {
|
||||
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
|
||||
let (output_sink, output_rx) = OutputSink::broadcast_combined();
|
||||
let spawned = spawn_streaming_process(
|
||||
program,
|
||||
args,
|
||||
cwd,
|
||||
env,
|
||||
arg0,
|
||||
PipeStdinMode::Piped,
|
||||
output_sink,
|
||||
)
|
||||
.await?;
|
||||
Ok(spawned.into_spawned_process(output_rx))
|
||||
}
|
||||
|
||||
/// Spawn a process using regular pipes, but close stdin immediately.
|
||||
@@ -264,5 +274,16 @@ pub async fn spawn_process_no_stdin(
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedProcess> {
|
||||
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await
|
||||
let (output_sink, output_rx) = OutputSink::broadcast_combined();
|
||||
let spawned = spawn_streaming_process(
|
||||
program,
|
||||
args,
|
||||
cwd,
|
||||
env,
|
||||
arg0,
|
||||
PipeStdinMode::Null,
|
||||
output_sink,
|
||||
)
|
||||
.await?;
|
||||
Ok(spawned.into_spawned_process(output_rx))
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use portable_pty::MasterPty;
|
||||
use portable_pty::PtySize;
|
||||
use portable_pty::SlavePty;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -16,6 +18,29 @@ pub(crate) trait ChildTerminator: Send + Sync {
|
||||
fn kill(&mut self) -> io::Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct TerminalSize {
|
||||
pub rows: u16,
|
||||
pub cols: u16,
|
||||
}
|
||||
|
||||
impl Default for TerminalSize {
|
||||
fn default() -> Self {
|
||||
Self { rows: 24, cols: 80 }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TerminalSize> for PtySize {
|
||||
fn from(value: TerminalSize) -> Self {
|
||||
Self {
|
||||
rows: value.rows,
|
||||
cols: value.cols,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PtyHandles {
|
||||
pub _slave: Option<Box<dyn SlavePty + Send>>,
|
||||
pub _master: Box<dyn MasterPty + Send>,
|
||||
@@ -29,11 +54,14 @@ impl fmt::Debug for PtyHandles {
|
||||
|
||||
/// Handle for driving an interactive process (PTY or pipe).
|
||||
pub struct ProcessHandle {
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
|
||||
output_tx: Option<broadcast::Sender<Vec<u8>>>,
|
||||
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
|
||||
#[allow(dead_code)]
|
||||
reader_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
#[allow(dead_code)]
|
||||
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
|
||||
#[allow(dead_code)]
|
||||
writer_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
wait_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
exit_status: Arc<AtomicBool>,
|
||||
@@ -53,8 +81,7 @@ impl ProcessHandle {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
initial_output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
output_tx: Option<broadcast::Sender<Vec<u8>>>,
|
||||
killer: Box<dyn ChildTerminator>,
|
||||
reader_handle: JoinHandle<()>,
|
||||
reader_abort_handles: Vec<AbortHandle>,
|
||||
@@ -63,32 +90,44 @@ impl ProcessHandle {
|
||||
exit_status: Arc<AtomicBool>,
|
||||
exit_code: Arc<StdMutex<Option<i32>>>,
|
||||
pty_handles: Option<PtyHandles>,
|
||||
) -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
(
|
||||
Self {
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer: StdMutex::new(Some(killer)),
|
||||
reader_handle: StdMutex::new(Some(reader_handle)),
|
||||
reader_abort_handles: StdMutex::new(reader_abort_handles),
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
exit_code,
|
||||
_pty_handles: StdMutex::new(pty_handles),
|
||||
},
|
||||
initial_output_rx,
|
||||
)
|
||||
) -> Self {
|
||||
Self {
|
||||
writer_tx: StdMutex::new(Some(writer_tx)),
|
||||
output_tx,
|
||||
killer: StdMutex::new(Some(killer)),
|
||||
reader_handle: StdMutex::new(Some(reader_handle)),
|
||||
reader_abort_handles: StdMutex::new(reader_abort_handles),
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
exit_code,
|
||||
_pty_handles: StdMutex::new(pty_handles),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a channel sender for writing raw bytes to the child stdin.
|
||||
pub fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
|
||||
self.writer_tx.clone()
|
||||
if let Ok(writer_tx) = self.writer_tx.lock() {
|
||||
if let Some(writer_tx) = writer_tx.as_ref() {
|
||||
return writer_tx.clone();
|
||||
}
|
||||
}
|
||||
|
||||
let (writer_tx, writer_rx) = mpsc::channel(1);
|
||||
drop(writer_rx);
|
||||
writer_tx
|
||||
}
|
||||
|
||||
/// Returns a broadcast receiver that yields stdout/stderr chunks.
|
||||
/// Returns a broadcast receiver that yields stdout/stderr chunks when
|
||||
/// combined output routing is configured.
|
||||
pub fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
|
||||
self.output_tx.subscribe()
|
||||
if let Some(output_tx) = self.output_tx.as_ref() {
|
||||
return output_tx.subscribe();
|
||||
}
|
||||
|
||||
let (output_tx, output_rx) = broadcast::channel(1);
|
||||
drop(output_tx);
|
||||
output_rx
|
||||
}
|
||||
|
||||
/// True if the child process has exited.
|
||||
@@ -101,13 +140,38 @@ impl ProcessHandle {
|
||||
self.exit_code.lock().ok().and_then(|guard| *guard)
|
||||
}
|
||||
|
||||
/// Attempts to kill the child and abort helper tasks.
|
||||
pub fn terminate(&self) {
|
||||
/// Resize the PTY in character cells.
|
||||
pub fn resize(&self, size: TerminalSize) -> anyhow::Result<()> {
|
||||
let handles = self
|
||||
._pty_handles
|
||||
.lock()
|
||||
.map_err(|_| anyhow!("failed to lock PTY handles"))?;
|
||||
let handles = handles
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("process is not attached to a PTY"))?;
|
||||
handles._master.resize(size.into())
|
||||
}
|
||||
|
||||
/// Close the child's stdin channel.
|
||||
pub fn close_stdin(&self) {
|
||||
if let Ok(mut writer_tx) = self.writer_tx.lock() {
|
||||
writer_tx.take();
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to kill the child while leaving the reader/writer tasks alive
|
||||
/// so callers can still drain output until EOF.
|
||||
pub fn request_terminate(&self) {
|
||||
if let Ok(mut killer_opt) = self.killer.lock() {
|
||||
if let Some(mut killer) = killer_opt.take() {
|
||||
let _ = killer.kill();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to kill the child and abort helper tasks.
|
||||
pub fn terminate(&self) {
|
||||
self.request_terminate();
|
||||
|
||||
if let Ok(mut h) = self.reader_handle.lock() {
|
||||
if let Some(handle) = h.take() {
|
||||
@@ -138,7 +202,102 @@ impl Drop for ProcessHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value from spawn helpers (PTY or pipe).
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum OutputStream {
|
||||
Stdout,
|
||||
Stderr,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OutputSink {
|
||||
BroadcastCombined(broadcast::Sender<Vec<u8>>),
|
||||
GuaranteedSeparate {
|
||||
stdout: mpsc::Sender<Vec<u8>>,
|
||||
stderr: mpsc::Sender<Vec<u8>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl OutputSink {
|
||||
pub fn broadcast_combined() -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
let (tx, rx) = broadcast::channel(256);
|
||||
(OutputSink::BroadcastCombined(tx), rx)
|
||||
}
|
||||
|
||||
pub fn guaranteed_separate() -> (Self, mpsc::Receiver<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel(128);
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel(128);
|
||||
(
|
||||
OutputSink::GuaranteedSeparate {
|
||||
stdout: stdout_tx,
|
||||
stderr: stderr_tx,
|
||||
},
|
||||
stdout_rx,
|
||||
stderr_rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn send(&self, chunk: Vec<u8>, stream: OutputStream) {
|
||||
match self {
|
||||
OutputSink::BroadcastCombined(tx) => {
|
||||
let _ = tx.send(chunk);
|
||||
}
|
||||
OutputSink::GuaranteedSeparate { stdout, stderr } => match stream {
|
||||
OutputStream::Stdout => {
|
||||
let _ = stdout.send(chunk).await;
|
||||
}
|
||||
OutputStream::Stderr => {
|
||||
let _ = stderr.send(chunk).await;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn send_blocking(&self, chunk: Vec<u8>, stream: OutputStream) {
|
||||
match self {
|
||||
OutputSink::BroadcastCombined(tx) => {
|
||||
let _ = tx.send(chunk);
|
||||
}
|
||||
OutputSink::GuaranteedSeparate { stdout, stderr } => match stream {
|
||||
OutputStream::Stdout => {
|
||||
let _ = stdout.blocking_send(chunk);
|
||||
}
|
||||
OutputStream::Stderr => {
|
||||
let _ = stderr.blocking_send(chunk);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn combined_sender(&self) -> Option<broadcast::Sender<Vec<u8>>> {
|
||||
match self {
|
||||
OutputSink::BroadcastCombined(tx) => Some(tx.clone()),
|
||||
OutputSink::GuaranteedSeparate { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value from explicit streaming spawn helpers (PTY or pipe).
|
||||
#[derive(Debug)]
|
||||
pub struct SpawnedStreamingProcess {
|
||||
pub session: ProcessHandle,
|
||||
pub exit_rx: oneshot::Receiver<i32>,
|
||||
}
|
||||
|
||||
impl SpawnedStreamingProcess {
|
||||
pub(crate) fn into_spawned_process(
|
||||
self,
|
||||
output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
) -> SpawnedProcess {
|
||||
let Self { session, exit_rx } = self;
|
||||
SpawnedProcess {
|
||||
session,
|
||||
output_rx,
|
||||
exit_rx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value from backwards-compatible spawn helpers (PTY or pipe).
|
||||
#[derive(Debug)]
|
||||
pub struct SpawnedProcess {
|
||||
pub session: ProcessHandle,
|
||||
|
||||
@@ -10,16 +10,18 @@ use anyhow::Result;
|
||||
#[cfg(not(windows))]
|
||||
use portable_pty::native_pty_system;
|
||||
use portable_pty::CommandBuilder;
|
||||
use portable_pty::PtySize;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::process::ChildTerminator;
|
||||
use crate::process::OutputSink;
|
||||
use crate::process::OutputStream;
|
||||
use crate::process::ProcessHandle;
|
||||
use crate::process::PtyHandles;
|
||||
use crate::process::SpawnedProcess;
|
||||
use crate::process::SpawnedStreamingProcess;
|
||||
use crate::process::TerminalSize;
|
||||
|
||||
/// Returns true when ConPTY support is available (Windows only).
|
||||
#[cfg(windows)]
|
||||
@@ -72,25 +74,23 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a process attached to a PTY, returning handles for stdin, output, and exit.
|
||||
pub async fn spawn_process(
|
||||
/// Spawn a process attached to a PTY, returning handles for stdin and exit
|
||||
/// while routing output through the caller-provided sink.
|
||||
pub async fn spawn_streaming_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedProcess> {
|
||||
size: TerminalSize,
|
||||
output_sink: OutputSink,
|
||||
) -> Result<SpawnedStreamingProcess> {
|
||||
if program.is_empty() {
|
||||
anyhow::bail!("missing program for PTY spawn");
|
||||
}
|
||||
|
||||
let pty_system = platform_native_pty_system();
|
||||
let pair = pty_system.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})?;
|
||||
let pair = pty_system.openpty(size.into())?;
|
||||
|
||||
let mut command_builder = CommandBuilder::new(arg0.as_ref().unwrap_or(&program.to_string()));
|
||||
command_builder.cwd(cwd);
|
||||
@@ -111,18 +111,15 @@ pub async fn spawn_process(
|
||||
let killer = child.clone_killer();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
|
||||
let output_tx = output_sink.combined_sender();
|
||||
let mut reader = pair.master.try_clone_reader()?;
|
||||
let output_tx_clone = output_tx.clone();
|
||||
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8_192];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx_clone.send(buf[..n].to_vec());
|
||||
output_sink.send_blocking(buf[..n].to_vec(), OutputStream::Stdout);
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
@@ -174,10 +171,9 @@ pub async fn spawn_process(
|
||||
_master: pair.master,
|
||||
};
|
||||
|
||||
let (handle, output_rx) = ProcessHandle::new(
|
||||
let handle = ProcessHandle::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
Box::new(PtyChildTerminator {
|
||||
killer,
|
||||
#[cfg(unix)]
|
||||
@@ -192,9 +188,22 @@ pub async fn spawn_process(
|
||||
Some(handles),
|
||||
);
|
||||
|
||||
Ok(SpawnedProcess {
|
||||
Ok(SpawnedStreamingProcess {
|
||||
session: handle,
|
||||
output_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn a process attached to a PTY, returning handles for stdin, output, and exit.
|
||||
pub async fn spawn_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
size: TerminalSize,
|
||||
) -> Result<SpawnedProcess> {
|
||||
let (output_sink, output_rx) = OutputSink::broadcast_combined();
|
||||
let spawned = spawn_streaming_process(program, args, cwd, env, arg0, size, output_sink).await?;
|
||||
Ok(spawned.into_spawned_process(output_rx))
|
||||
}
|
||||
|
||||
@@ -3,8 +3,12 @@ use std::path::Path;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use crate::pipe::spawn_streaming_process as spawn_pipe_streaming_process;
|
||||
use crate::pipe::PipeStdinMode;
|
||||
use crate::spawn_pipe_process;
|
||||
use crate::spawn_pty_process;
|
||||
use crate::OutputSink;
|
||||
use crate::TerminalSize;
|
||||
|
||||
fn find_python() -> Option<String> {
|
||||
for candidate in ["python3", "python"] {
|
||||
@@ -219,7 +223,15 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pty_process(&python, &[], Path::new("."), &env_map, &None).await?;
|
||||
let spawned = spawn_pty_process(
|
||||
&python,
|
||||
&[],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
let writer = spawned.session.writer_sender();
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
|
||||
@@ -327,7 +339,15 @@ async fn pipe_and_pty_share_interface() -> anyhow::Result<()> {
|
||||
|
||||
let pipe =
|
||||
spawn_pipe_process(&pipe_program, &pipe_args, Path::new("."), &env_map, &None).await?;
|
||||
let pty = spawn_pty_process(&pty_program, &pty_args, Path::new("."), &env_map, &None).await?;
|
||||
let pty = spawn_pty_process(
|
||||
&pty_program,
|
||||
&pty_args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let timeout_ms = if cfg!(windows) { 10_000 } else { 3_000 };
|
||||
let (pipe_out, pipe_code) =
|
||||
@@ -370,6 +390,39 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let (program, args) = shell_command("printf 'split-out\\n'; printf 'split-err\\n' >&2");
|
||||
let (output_sink, mut stdout_rx, mut stderr_rx) = OutputSink::guaranteed_separate();
|
||||
let spawned = spawn_pipe_streaming_process(
|
||||
&program,
|
||||
&args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
PipeStdinMode::Null,
|
||||
output_sink,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let stdout = tokio::time::timeout(tokio::time::Duration::from_secs(2), stdout_rx.recv())
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting for split stdout"))?
|
||||
.ok_or_else(|| anyhow::anyhow!("split stdout receiver closed unexpectedly"))?;
|
||||
let stderr = tokio::time::timeout(tokio::time::Duration::from_secs(2), stderr_rx.recv())
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting for split stderr"))?
|
||||
.ok_or_else(|| anyhow::anyhow!("split stderr receiver closed unexpectedly"))?;
|
||||
let code = spawned.exit_rx.await.unwrap_or(-1);
|
||||
|
||||
assert_eq!(String::from_utf8_lossy(&stdout), "split-out\n");
|
||||
assert_eq!(String::from_utf8_lossy(&stderr), "split-err\n");
|
||||
assert_eq!(code, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> {
|
||||
if !setsid_available() {
|
||||
@@ -416,7 +469,15 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
|
||||
let marker = "__codex_bg_pid:";
|
||||
let script = format!("sleep 1000 & bg=$!; echo {marker}$bg; wait");
|
||||
let (program, args) = shell_command(&script);
|
||||
let mut spawned = spawn_pty_process(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let mut spawned = spawn_pty_process(
|
||||
&program,
|
||||
&args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let bg_pid = match wait_for_marker_pid(&mut spawned.output_rx, marker, 2_000).await {
|
||||
Ok(pid) => pid,
|
||||
|
||||
Reference in New Issue
Block a user