Compare commits

...

8 Commits

Author SHA1 Message Date
jif-oai
bd061a6292 clippy 2025-12-17 19:10:37 +00:00
jif-oai
459abecf52 shear 2025-12-17 19:09:38 +00:00
jif-oai
10d048fbd5 nits 2025-12-17 19:01:55 +00:00
jif-oai
dfdf696062 NIT 2025-12-17 18:16:19 +00:00
jif-oai
ad39b7ea72 Use child handle instead 2025-12-17 18:00:20 +00:00
jif-oai
8baa8c912b Always more tests 2025-12-17 16:11:25 +00:00
jif-oai
6793b686ec add a ton of tests 2025-12-17 16:03:01 +00:00
jif-oai
0c34d6633f v1 2025-12-17 15:15:43 +00:00
8 changed files with 1129 additions and 17 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1881,6 +1881,7 @@ dependencies = [
"lazy_static",
"log",
"portable-pty",
"pretty_assertions",
"shared_library",
"tokio",
"winapi",

View File

@@ -43,12 +43,7 @@ impl ToolsConfig {
let shell_type = if !features.enabled(Feature::ShellTool) {
ConfigShellToolType::Disabled
} else if features.enabled(Feature::UnifiedExec) {
// If ConPTY not supported (for old Windows versions), fallback on ShellCommand.
if codex_utils_pty::conpty_supported() {
ConfigShellToolType::UnifiedExec
} else {
ConfigShellToolType::ShellCommand
}
ConfigShellToolType::UnifiedExec
} else {
model_family.shell_type
};

View File

@@ -460,7 +460,7 @@ impl UnifiedExecSessionManager {
.split_first()
.ok_or(UnifiedExecError::MissingCommandLine)?;
let spawned = codex_utils_pty::spawn_pty_process(
let spawned = codex_utils_pty::spawn_exec_session(
program,
args,
env.cwd.as_path(),

View File

@@ -12,6 +12,9 @@ anyhow = { workspace = true }
portable-pty = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
[target.'cfg(windows)'.dependencies]
filedescriptor = "0.8.3"
lazy_static = { workspace = true }

View File

@@ -0,0 +1,20 @@
# codex-utils-pty parity notes
The tests in `utils/pty/src/lib.rs` compare PTY-backed sessions with the piped
fallback to ensure the shared behaviors stay aligned. Some behavior differences
are inherent to PTY semantics, so the tests normalize or explicitly allow them.
Known differences
- TTY line discipline: PTY-backed children run with a terminal line discipline
(canonical input, echo, signal generation). Piped fallback uses plain pipes,
so input is raw and no echo or line editing occurs.
- Line endings: PTY output may translate LF to CRLF (for example when `ONLCR` is
enabled). Piped output preserves what the program writes.
- Stdout/stderr interleaving: PTY output is a single stream with ordering
preserved by the terminal. Piped fallback merges stdout/stderr from separate
readers, so relative ordering between the streams is not guaranteed.
- Terminal features: PTY sessions support terminal-only behaviors (window size,
control sequences). Piped fallback does not emulate these features.
- Signals: PTY sessions can receive terminal-generated signals (such as Ctrl+C).
Piped fallback does not provide that terminal-level signaling.

View File

@@ -0,0 +1,171 @@
use std::collections::HashMap;
use std::io;
use std::path::Path;
use std::process::Command as StdCommand;
use std::process::Stdio;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use anyhow::Result;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use super::ExecCommandSession;
use super::SpawnedPty;
pub async fn spawn_piped_process(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedPty> {
if program.is_empty() {
anyhow::bail!("missing program for exec spawn");
}
let program = arg0.as_deref().unwrap_or(program);
let mut command = StdCommand::new(program);
command.args(args);
command.current_dir(cwd);
command.env_clear();
command.envs(env);
command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = command.spawn()?;
let stdin = child.stdin.take().ok_or_else(|| {
anyhow::anyhow!("stdin pipe was unexpectedly not available for exec spawn")
})?;
let stdout = child.stdout.take().ok_or_else(|| {
anyhow::anyhow!("stdout pipe was unexpectedly not available for exec spawn")
})?;
let stderr = child.stderr.take().ok_or_else(|| {
anyhow::anyhow!("stderr pipe was unexpectedly not available for exec spawn")
})?;
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let initial_output_rx = output_tx.subscribe();
// Pipes separate stdout and stderr; merge to match PTY semantics.
let stdout_handle = spawn_pipe_reader(stdout, output_tx.clone());
let stderr_handle = spawn_pipe_reader(stderr, output_tx.clone());
let writer_handle = tokio::task::spawn_blocking(move || {
let mut stdin = stdin;
use std::io::Write;
while let Some(bytes) = writer_rx.blocking_recv() {
let _ = stdin.write_all(&bytes);
let _ = stdin.flush();
}
});
let child = Arc::new(StdMutex::new(child));
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let exit_code = Arc::new(StdMutex::new(None));
let wait_exit_code = Arc::clone(&exit_code);
let wait_child = Arc::clone(&child);
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let code = loop {
let status = match wait_child.lock() {
Ok(mut guard) => guard.try_wait(),
Err(_) => break -1,
};
match status {
Ok(Some(status)) => {
break status
.code()
.unwrap_or_else(|| if status.success() { 0 } else { 1 });
}
Ok(None) => std::thread::sleep(Duration::from_millis(10)),
Err(_) => break -1,
}
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut guard) = wait_exit_code.lock() {
*guard = Some(code);
}
let _ = exit_tx.send(code);
});
let (session, output_rx) = ExecCommandSession::new(
writer_tx,
output_tx,
initial_output_rx,
Box::new(PipedChildKiller::new(child)),
vec![stdout_handle, stderr_handle],
writer_handle,
wait_handle,
exit_status,
exit_code,
None,
);
Ok(SpawnedPty {
session,
output_rx,
exit_rx,
})
}
fn spawn_pipe_reader<R: std::io::Read + Send + 'static>(
mut reader: R,
output_tx: broadcast::Sender<Vec<u8>>,
) -> JoinHandle<()> {
tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8_192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
})
}
#[derive(Debug)]
struct PipedChildKiller {
child: Arc<StdMutex<std::process::Child>>,
}
impl PipedChildKiller {
fn new(child: Arc<StdMutex<std::process::Child>>) -> Self {
Self { child }
}
}
impl portable_pty::ChildKiller for PipedChildKiller {
fn kill(&mut self) -> io::Result<()> {
if let Ok(mut guard) = self.child.try_lock() {
return guard.kill();
}
let child = Arc::clone(&self.child);
std::thread::spawn(move || {
if let Ok(mut guard) = child.lock() {
let _ = guard.kill();
}
});
Ok(())
}
fn clone_killer(&self) -> Box<dyn portable_pty::ChildKiller + Send + Sync> {
Box::new(Self {
child: Arc::clone(&self.child),
})
}
}

View File

@@ -7,9 +7,13 @@ use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
mod fallback;
#[cfg(windows)]
mod win;
#[cfg(test)]
mod tests;
use anyhow::Result;
#[cfg(not(windows))]
use portable_pty::native_pty_system;
@@ -33,14 +37,14 @@ pub struct ExecCommandSession {
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
killer: StdMutex<Option<Box<dyn portable_pty::ChildKiller + Send + Sync>>>,
reader_handle: StdMutex<Option<JoinHandle<()>>>,
reader_handles: StdMutex<Vec<JoinHandle<()>>>,
writer_handle: StdMutex<Option<JoinHandle<()>>>,
wait_handle: StdMutex<Option<JoinHandle<()>>>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
// PtyPair must be preserved because the process will receive Control+C if the
// slave is closed
_pair: StdMutex<PtyPairWrapper>,
_pair: StdMutex<Option<PtyPairWrapper>>,
}
impl fmt::Debug for PtyPairWrapper {
@@ -56,19 +60,19 @@ impl ExecCommandSession {
output_tx: broadcast::Sender<Vec<u8>>,
initial_output_rx: broadcast::Receiver<Vec<u8>>,
killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
reader_handle: JoinHandle<()>,
reader_handles: Vec<JoinHandle<()>>,
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
pair: PtyPairWrapper,
pair: Option<PtyPairWrapper>,
) -> (Self, broadcast::Receiver<Vec<u8>>) {
(
Self {
writer_tx,
output_tx,
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_handles: StdMutex::new(reader_handles),
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
@@ -102,8 +106,8 @@ impl ExecCommandSession {
}
}
if let Ok(mut h) = self.reader_handle.lock() {
if let Some(handle) = h.take() {
if let Ok(mut handles) = self.reader_handles.lock() {
for handle in handles.drain(..) {
handle.abort();
}
}
@@ -152,6 +156,20 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
native_pty_system()
}
pub async fn spawn_exec_session(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedPty> {
if cfg!(windows) && !conpty_supported() {
return fallback::spawn_piped_process(program, args, cwd, env, arg0).await;
}
spawn_pty_process(program, args, cwd, env, arg0).await
}
pub async fn spawn_pty_process(
program: &str,
args: &[String],
@@ -171,7 +189,8 @@ pub async fn spawn_pty_process(
pixel_height: 0,
})?;
let mut command_builder = CommandBuilder::new(arg0.as_ref().unwrap_or(&program.to_string()));
let program = arg0.as_deref().unwrap_or(program);
let mut command_builder = CommandBuilder::new(program);
command_builder.cwd(cwd);
command_builder.env_clear();
for arg in args {
@@ -255,12 +274,12 @@ pub async fn spawn_pty_process(
output_tx,
initial_output_rx,
killer,
reader_handle,
vec![reader_handle],
writer_handle,
wait_handle,
exit_status,
exit_code,
pair,
Some(pair),
);
Ok(SpawnedPty {

View File

@@ -0,0 +1,903 @@
use super::conpty_supported;
use super::fallback;
use super::spawn_pty_process;
use super::ExecCommandSession;
use super::SpawnedPty;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Instant;
const OUTPUT_TIMEOUT: Duration = Duration::from_secs(5);
const TERMINATE_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Debug, Clone)]
struct CommandSpec {
program: String,
args: Vec<String>,
}
#[derive(Debug, PartialEq, Eq)]
enum ExitOutcome {
Exited(i32),
Dropped,
}
#[derive(Debug)]
struct RunResult {
output: Vec<u8>,
exit: ExitOutcome,
}
#[derive(Debug)]
struct InteractiveSession {
session: ExecCommandSession,
writer: mpsc::Sender<Vec<u8>>,
output_rx: broadcast::Receiver<Vec<u8>>,
exit_rx: oneshot::Receiver<i32>,
buffer: Vec<u8>,
}
fn shell_command(script: &str) -> CommandSpec {
#[cfg(unix)]
{
CommandSpec {
program: "/bin/sh".to_string(),
args: vec!["-c".to_string(), script.to_string()],
}
}
#[cfg(windows)]
{
CommandSpec {
program: windows_cmd_path(),
args: vec!["/C".to_string(), script.to_string()],
}
}
}
fn shell_repl_command() -> CommandSpec {
#[cfg(unix)]
{
CommandSpec {
program: "/bin/sh".to_string(),
args: Vec::new(),
}
}
#[cfg(windows)]
{
CommandSpec {
program: windows_cmd_path(),
args: vec!["/Q".to_string(), "/D".to_string()],
}
}
}
#[cfg(windows)]
fn windows_cmd_path() -> String {
if let Ok(comspec) = std::env::var("ComSpec") {
return comspec;
}
if let Ok(system_root) = std::env::var("SystemRoot") {
return format!(r"{system_root}\System32\cmd.exe");
}
r"C:\Windows\System32\cmd.exe".to_string()
}
fn base_env() -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert("FOO".to_string(), "bar".to_string());
#[cfg(windows)]
{
if let Ok(system_root) = std::env::var("SystemRoot") {
env.insert("SystemRoot".to_string(), system_root.clone());
env.insert("PATH".to_string(), format!(r"{system_root}\System32"));
env.insert("PROMPT".to_string(), "".to_string());
}
}
env
}
fn temp_dir() -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let dir = std::env::temp_dir().join(format!("codex-pty-{nanos}"));
std::fs::create_dir_all(&dir).unwrap();
dir
}
async fn run_pty(
command: &CommandSpec,
cwd: &Path,
env: &HashMap<String, String>,
input: Option<Vec<u8>>,
) -> anyhow::Result<RunResult> {
let spawned = spawn_pty_process(&command.program, &command.args, cwd, env, &None).await?;
run_spawned(spawned, input, OUTPUT_TIMEOUT).await
}
async fn run_piped(
command: &CommandSpec,
cwd: &Path,
env: &HashMap<String, String>,
input: Option<Vec<u8>>,
) -> anyhow::Result<RunResult> {
let spawned =
fallback::spawn_piped_process(&command.program, &command.args, cwd, env, &None).await?;
run_spawned(spawned, input, OUTPUT_TIMEOUT).await
}
async fn run_spawned(
spawned: SpawnedPty,
input: Option<Vec<u8>>,
timeout: Duration,
) -> anyhow::Result<RunResult> {
let writer = spawned.session.writer_sender();
if let Some(bytes) = input {
writer.send(bytes).await?;
}
drop(writer);
tokio::time::timeout(timeout, collect_output(spawned.output_rx, spawned.exit_rx)).await?
}
async fn spawn_interactive_pty(
command: &CommandSpec,
cwd: &Path,
env: &HashMap<String, String>,
) -> anyhow::Result<InteractiveSession> {
let spawned = spawn_pty_process(&command.program, &command.args, cwd, env, &None).await?;
Ok(InteractiveSession {
writer: spawned.session.writer_sender(),
session: spawned.session,
output_rx: spawned.output_rx,
exit_rx: spawned.exit_rx,
buffer: Vec::new(),
})
}
async fn spawn_interactive_piped(
command: &CommandSpec,
cwd: &Path,
env: &HashMap<String, String>,
) -> anyhow::Result<InteractiveSession> {
let spawned =
fallback::spawn_piped_process(&command.program, &command.args, cwd, env, &None).await?;
Ok(InteractiveSession {
writer: spawned.session.writer_sender(),
session: spawned.session,
output_rx: spawned.output_rx,
exit_rx: spawned.exit_rx,
buffer: Vec::new(),
})
}
async fn collect_output(
mut output_rx: broadcast::Receiver<Vec<u8>>,
mut exit_rx: oneshot::Receiver<i32>,
) -> anyhow::Result<RunResult> {
let mut output = Vec::new();
let mut lagged = None;
let exit = loop {
tokio::select! {
received = output_rx.recv() => {
match received {
Ok(bytes) => output.extend_from_slice(&bytes),
Err(broadcast::error::RecvError::Closed) => {}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
lagged = Some(skipped);
}
}
}
res = &mut exit_rx => {
break match res {
Ok(code) => ExitOutcome::Exited(code),
Err(_) => ExitOutcome::Dropped,
};
}
}
};
if let Some(skipped) = lagged {
anyhow::bail!("output lagged by {skipped} messages");
}
let drain_deadline = Instant::now() + Duration::from_millis(50);
while Instant::now() < drain_deadline {
match output_rx.try_recv() {
Ok(bytes) => output.extend_from_slice(&bytes),
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Closed) => break,
Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
anyhow::bail!("output lagged by {skipped} messages");
}
}
}
Ok(RunResult { output, exit })
}
async fn wait_for_output_contains(
output_rx: &mut broadcast::Receiver<Vec<u8>>,
buffer: &mut Vec<u8>,
marker: &str,
timeout: Duration,
) -> anyhow::Result<()> {
tokio::time::timeout(timeout, async {
loop {
if normalize_output(buffer).contains(marker) {
return Ok(());
}
match output_rx.recv().await {
Ok(bytes) => buffer.extend_from_slice(&bytes),
Err(broadcast::error::RecvError::Closed) => {
anyhow::bail!("output channel closed before receiving {marker}");
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
anyhow::bail!("output lagged by {skipped} messages");
}
}
}
})
.await?
}
async fn finish_interactive(session: InteractiveSession) -> anyhow::Result<RunResult> {
let mut result = collect_output(session.output_rx, session.exit_rx).await?;
let mut output = session.buffer;
output.extend_from_slice(&result.output);
result.output = output;
Ok(result)
}
fn normalize_output(output: &[u8]) -> String {
String::from_utf8_lossy(output)
.replace("\r\n", "\n")
.replace('\r', "\n")
}
fn normalize_lines(output: &[u8]) -> Vec<String> {
normalize_output(output)
.lines()
.map(str::to_string)
.collect()
}
fn normalize_cwd_lines(output: &[u8]) -> Vec<String> {
normalize_lines(output)
.into_iter()
.map(|line| {
let Some(stripped) = line.strip_prefix("CWD=/private/") else {
return line;
};
format!("CWD=/{stripped}")
})
.collect()
}
// This is not dead code but this is not used on Windows, and we still want compiles check
// everywhere.
#[allow(dead_code)]
fn strip_echoed_input(lines: Vec<String>, input: &str) -> Vec<String> {
lines.into_iter().filter(|line| line != input).collect()
}
fn assert_lines_match(actual: &[u8], expected: &[&str]) {
let lines = normalize_lines(actual);
let expected = expected
.iter()
.copied()
.map(str::to_string)
.collect::<Vec<_>>();
assert_eq!(lines, expected);
}
fn assert_line_set_match(actual: &[u8], expected: &[&str]) {
let mut lines = normalize_lines(actual);
lines.retain(|line| !line.is_empty());
lines.sort();
let mut expected = expected
.iter()
.copied()
.map(str::to_string)
.collect::<Vec<_>>();
expected.sort();
assert_eq!(lines, expected);
}
fn assert_parity(pty: &RunResult, piped: &RunResult) {
assert_eq!(pty.exit, piped.exit);
assert_eq!(
normalize_output(&pty.output),
normalize_output(&piped.output)
);
}
fn line_ending() -> &'static str {
#[cfg(windows)]
{
"\r\n"
}
#[cfg(unix)]
{
"\n"
}
}
async fn send_line(
writer: &mut mpsc::Sender<Vec<u8>>,
line: &str,
) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
writer
.send(format!("{line}{line_ending}", line_ending = line_ending()).into_bytes())
.await
}
fn extract_marker_lines(output: &[u8]) -> Vec<String> {
normalize_output(output)
.lines()
.filter_map(|line| {
let trimmed = line.trim_start();
trimmed
.starts_with("CMD_MARKER_")
.then(|| trimmed.to_string())
})
.collect()
}
fn output_has_marker_line(output: &[u8], marker: &str) -> bool {
normalize_output(output).lines().any(|line| {
let trimmed = line.trim_start();
trimmed.starts_with(marker)
})
}
#[tokio::test]
// Verifies basic stdout and exit code parity for a simple command.
async fn standard_output_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
r#"printf "hello\nworld\n"; exit 7"#,
#[cfg(windows)]
r#"echo hello & echo world & exit /b 7"#,
);
let env = base_env();
let cwd = temp_dir();
let pty = run_pty(&command, &cwd, &env, None).await?;
let piped = run_piped(&command, &cwd, &env, None).await?;
assert_eq!(pty.exit, ExitOutcome::Exited(7));
assert_eq!(piped.exit, ExitOutcome::Exited(7));
assert_lines_match(&pty.output, &["hello", "world"]);
assert_parity(&pty, &piped);
Ok(())
}
#[tokio::test]
// Verifies commands with no output stay empty and match parity.
async fn no_output_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
"true",
#[cfg(windows)]
"exit /b 0",
);
let env = base_env();
let cwd = temp_dir();
let pty = run_pty(&command, &cwd, &env, None).await?;
let piped = run_piped(&command, &cwd, &env, None).await?;
assert_eq!(normalize_output(&pty.output), "");
assert_eq!(normalize_output(&piped.output), "");
assert_parity(&pty, &piped);
Ok(())
}
#[tokio::test]
// Verifies exit state and stored exit code match PTY and are consistent with exit_rx.
async fn exit_state_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
r#"printf "done\n"; exit 3"#,
#[cfg(windows)]
r#"echo done & exit /b 3"#,
);
let env = base_env();
let cwd = temp_dir();
let pty_spawned = spawn_pty_process(&command.program, &command.args, &cwd, &env, &None).await?;
let pty_result = tokio::time::timeout(
OUTPUT_TIMEOUT,
collect_output(pty_spawned.output_rx, pty_spawned.exit_rx),
)
.await??;
let pty_code = match pty_result.exit {
ExitOutcome::Exited(code) => code,
ExitOutcome::Dropped => anyhow::bail!("pty exit dropped"),
};
assert!(pty_spawned.session.has_exited());
assert_eq!(pty_spawned.session.exit_code(), Some(pty_code));
let piped_spawned =
fallback::spawn_piped_process(&command.program, &command.args, &cwd, &env, &None).await?;
let piped_result = tokio::time::timeout(
OUTPUT_TIMEOUT,
collect_output(piped_spawned.output_rx, piped_spawned.exit_rx),
)
.await??;
let piped_code = match piped_result.exit {
ExitOutcome::Exited(code) => code,
ExitOutcome::Dropped => anyhow::bail!("piped exit dropped"),
};
assert!(piped_spawned.session.has_exited());
assert_eq!(piped_spawned.session.exit_code(), Some(piped_code));
assert_eq!(pty_code, piped_code);
Ok(())
}
#[tokio::test]
// Verifies env propagation and working directory parity.
async fn env_and_cwd_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
r#"printf "FOO=%s\n" "$FOO"; printf "CWD=%s\n" "$(pwd)""#,
#[cfg(windows)]
r#"echo FOO=%FOO% & echo CWD=%CD%"#,
);
let env = base_env();
let cwd = temp_dir();
let pty = run_pty(&command, &cwd, &env, None).await?;
let piped = run_piped(&command, &cwd, &env, None).await?;
let cwd_line = format!("CWD={}", cwd.display());
let expected = vec!["FOO=bar".to_string(), cwd_line];
assert_eq!(normalize_cwd_lines(&pty.output), expected);
assert_eq!(normalize_cwd_lines(&piped.output), expected);
Ok(())
}
#[tokio::test]
// Verifies large output throughput parity.
async fn large_output_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
r#"i=0; while [ $i -lt 200 ]; do printf "line-%s\n" "$i"; i=$((i+1)); done"#,
#[cfg(windows)]
r#"for /L %i in (0,1,199) do @echo line-%i"#,
);
let env = base_env();
let cwd = temp_dir();
let pty = run_pty(&command, &cwd, &env, None).await?;
let piped = run_piped(&command, &cwd, &env, None).await?;
let mut expected = Vec::new();
for i in 0..200 {
expected.push(format!("line-{i}"));
}
assert_eq!(normalize_lines(&pty.output), expected);
assert_parity(&pty, &piped);
Ok(())
}
#[tokio::test]
// Verifies merged stdout/stderr output parity, ignoring ordering.
async fn stderr_merge_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
r#"printf "stdout\n"; printf "stderr\n" 1>&2"#,
#[cfg(windows)]
r#"echo stdout & echo stderr 1>&2"#,
);
let env = base_env();
let cwd = temp_dir();
let pty = run_pty(&command, &cwd, &env, None).await?;
let piped = run_piped(&command, &cwd, &env, None).await?;
assert_line_set_match(&pty.output, &["stdout", "stderr"]);
assert_line_set_match(&piped.output, &["stdout", "stderr"]);
assert_eq!(pty.exit, piped.exit);
Ok(())
}
#[tokio::test]
// Verifies terminate behavior parity for long-running children.
async fn terminate_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
"sleep 60",
#[cfg(windows)]
r#"ping -n 60 127.0.0.1 >nul"#,
);
let env = base_env();
let cwd = temp_dir();
let pty = spawn_pty_process(&command.program, &command.args, &cwd, &env, &None).await?;
let piped =
fallback::spawn_piped_process(&command.program, &command.args, &cwd, &env, &None).await?;
tokio::time::sleep(Duration::from_millis(50)).await;
pty.session.terminate();
piped.session.terminate();
let pty_exit = tokio::time::timeout(TERMINATE_TIMEOUT, pty.exit_rx).await;
let piped_exit = tokio::time::timeout(TERMINATE_TIMEOUT, piped.exit_rx).await;
assert!(pty_exit.is_ok());
assert!(piped_exit.is_ok());
let pty_exit = pty_exit.unwrap();
let piped_exit = piped_exit.unwrap();
match pty_exit {
Ok(code) => {
let piped_code = piped_exit?;
assert_eq!(code, piped_code);
}
Err(_) => {
assert!(piped_exit.is_err());
}
}
Ok(())
}
#[tokio::test]
// Verifies terminate can be called twice and piped matches PTY.
async fn terminate_idempotency_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
"sleep 60",
#[cfg(windows)]
r#"ping -n 60 127.0.0.1 >nul"#,
);
let env = base_env();
let cwd = temp_dir();
let pty = spawn_pty_process(&command.program, &command.args, &cwd, &env, &None).await?;
let piped =
fallback::spawn_piped_process(&command.program, &command.args, &cwd, &env, &None).await?;
tokio::time::sleep(Duration::from_millis(50)).await;
pty.session.terminate();
pty.session.terminate();
piped.session.terminate();
piped.session.terminate();
let pty_exit = tokio::time::timeout(TERMINATE_TIMEOUT, pty.exit_rx).await?;
let piped_exit = tokio::time::timeout(TERMINATE_TIMEOUT, piped.exit_rx).await?;
match pty_exit {
Ok(code) => {
let piped_code = piped_exit?;
assert_eq!(code, piped_code);
}
Err(pty_err) => {
let piped_err = piped_exit.unwrap_err();
assert_eq!(pty_err.to_string(), piped_err.to_string());
}
}
Ok(())
}
#[tokio::test]
// Verifies empty program errors are consistent across implementations.
async fn empty_program_errors() {
if cfg!(windows) && !conpty_supported() {
return;
}
let env = base_env();
let cwd = temp_dir();
let args = Vec::new();
let pty_err = spawn_pty_process("", &args, &cwd, &env, &None)
.await
.unwrap_err();
let piped_err = fallback::spawn_piped_process("", &args, &cwd, &env, &None)
.await
.unwrap_err();
let pty_msg = pty_err.to_string();
let piped_msg = piped_err.to_string();
assert!(pty_msg.contains("missing program"));
assert!(piped_msg.contains("missing program"));
}
#[tokio::test]
// Verifies arg0 overrides the program path for both implementations.
async fn arg0_overrides_program() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_command(
#[cfg(unix)]
r#"printf "ok\n""#,
#[cfg(windows)]
r#"echo ok"#,
);
let env = base_env();
let cwd = temp_dir();
let bogus = "this-does-not-exist".to_string();
let arg0 = Some(command.program.clone());
let pty = spawn_pty_process(&bogus, &command.args, &cwd, &env, &arg0).await?;
let piped = fallback::spawn_piped_process(&bogus, &command.args, &cwd, &env, &arg0).await?;
let pty = run_spawned(pty, None, OUTPUT_TIMEOUT).await?;
let piped = run_spawned(piped, None, OUTPUT_TIMEOUT).await?;
assert_lines_match(&pty.output, &["ok"]);
assert_parity(&pty, &piped);
Ok(())
}
#[tokio::test]
// Verifies multi-command interactive sessions return outputs in order.
async fn multi_command_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_repl_command();
let env = base_env();
let cwd = temp_dir();
let mut pty = spawn_interactive_pty(&command, &cwd, &env).await?;
let mut piped = spawn_interactive_piped(&command, &cwd, &env).await?;
let marker_one = "CMD_MARKER_ONE";
send_line(&mut pty.writer, &format!("echo {marker_one}")).await?;
send_line(&mut piped.writer, &format!("echo {marker_one}")).await?;
wait_for_output_contains(
&mut pty.output_rx,
&mut pty.buffer,
marker_one,
OUTPUT_TIMEOUT,
)
.await?;
wait_for_output_contains(
&mut piped.output_rx,
&mut piped.buffer,
marker_one,
OUTPUT_TIMEOUT,
)
.await?;
let marker_two = "CMD_MARKER_TWO";
send_line(&mut pty.writer, &format!("echo {marker_two}")).await?;
send_line(&mut piped.writer, &format!("echo {marker_two}")).await?;
wait_for_output_contains(
&mut pty.output_rx,
&mut pty.buffer,
marker_two,
OUTPUT_TIMEOUT,
)
.await?;
wait_for_output_contains(
&mut piped.output_rx,
&mut piped.buffer,
marker_two,
OUTPUT_TIMEOUT,
)
.await?;
send_line(&mut pty.writer, "exit").await?;
send_line(&mut piped.writer, "exit").await?;
let pty = finish_interactive(pty).await?;
let piped = finish_interactive(piped).await?;
assert_eq!(pty.exit, ExitOutcome::Exited(0));
assert_eq!(piped.exit, ExitOutcome::Exited(0));
let pty_markers = extract_marker_lines(&pty.output);
let piped_markers = extract_marker_lines(&piped.output);
assert_eq!(pty_markers, piped_markers);
assert_eq!(
pty_markers,
vec![marker_one.to_string(), marker_two.to_string()]
);
Ok(())
}
#[tokio::test]
// Verifies late output subscribers receive subsequent output.
async fn output_subscriber_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_repl_command();
let env = base_env();
let cwd = temp_dir();
let mut pty = spawn_interactive_pty(&command, &cwd, &env).await?;
let mut piped = spawn_interactive_piped(&command, &cwd, &env).await?;
let mut pty_late_rx = pty.session.output_receiver();
let mut piped_late_rx = piped.session.output_receiver();
let mut pty_late_buffer = Vec::new();
let mut piped_late_buffer = Vec::new();
let marker = "CMD_MARKER_SUB";
send_line(&mut pty.writer, &format!("echo {marker}")).await?;
send_line(&mut piped.writer, &format!("echo {marker}")).await?;
tokio::time::timeout(OUTPUT_TIMEOUT, async {
loop {
if output_has_marker_line(&pty_late_buffer, marker) {
break;
}
match pty_late_rx.recv().await {
Ok(bytes) => pty_late_buffer.extend_from_slice(&bytes),
Err(broadcast::error::RecvError::Closed) => {
anyhow::bail!("output channel closed before receiving {marker}");
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
anyhow::bail!("output lagged by {skipped} messages");
}
}
}
Ok::<_, anyhow::Error>(())
})
.await??;
tokio::time::timeout(OUTPUT_TIMEOUT, async {
loop {
if output_has_marker_line(&piped_late_buffer, marker) {
break;
}
match piped_late_rx.recv().await {
Ok(bytes) => piped_late_buffer.extend_from_slice(&bytes),
Err(broadcast::error::RecvError::Closed) => {
anyhow::bail!("output channel closed before receiving {marker}");
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
anyhow::bail!("output lagged by {skipped} messages");
}
}
}
Ok::<_, anyhow::Error>(())
})
.await??;
send_line(&mut pty.writer, "exit").await?;
send_line(&mut piped.writer, "exit").await?;
let pty = finish_interactive(pty).await?;
let piped = finish_interactive(piped).await?;
assert_eq!(pty.exit, ExitOutcome::Exited(0));
assert_eq!(piped.exit, ExitOutcome::Exited(0));
assert_eq!(
extract_marker_lines(&pty_late_buffer),
vec![marker.to_string()]
);
assert_eq!(
extract_marker_lines(&piped_late_buffer),
vec![marker.to_string()]
);
Ok(())
}
#[tokio::test]
// Verifies post-kill write failures and exit errors match.
async fn multi_command_after_kill_parity() -> anyhow::Result<()> {
if cfg!(windows) && !conpty_supported() {
return Ok(());
}
let command = shell_repl_command();
let env = base_env();
let cwd = temp_dir();
let mut pty = spawn_interactive_pty(&command, &cwd, &env).await?;
let mut piped = spawn_interactive_piped(&command, &cwd, &env).await?;
let marker = "CMD_MARKER_KILL";
send_line(&mut pty.writer, &format!("echo {marker}")).await?;
send_line(&mut piped.writer, &format!("echo {marker}")).await?;
wait_for_output_contains(&mut pty.output_rx, &mut pty.buffer, marker, OUTPUT_TIMEOUT).await?;
wait_for_output_contains(
&mut piped.output_rx,
&mut piped.buffer,
marker,
OUTPUT_TIMEOUT,
)
.await?;
pty.session.terminate();
piped.session.terminate();
let pty_send = send_line(&mut pty.writer, "echo CMD_MARKER_AFTER_KILL")
.await
.map_err(|err| err.to_string());
let piped_send = send_line(&mut piped.writer, "echo CMD_MARKER_AFTER_KILL")
.await
.map_err(|err| err.to_string());
match (pty_send, piped_send) {
(Ok(()), Ok(())) => {}
(Err(pty_err), Err(piped_err)) => {
assert_eq!(pty_err, piped_err);
}
(Ok(()), Err(piped_err)) => {
panic!("piped write failed while PTY succeeded: {piped_err}");
}
(Err(pty_err), Ok(())) => {
panic!("piped write succeeded while PTY failed: {pty_err}");
}
}
let pty_exit = tokio::time::timeout(TERMINATE_TIMEOUT, pty.exit_rx).await?;
let piped_exit = tokio::time::timeout(TERMINATE_TIMEOUT, piped.exit_rx).await?;
match pty_exit {
Ok(code) => {
let piped_code = piped_exit?;
assert_eq!(code, piped_code);
}
Err(pty_err) => {
let piped_err = piped_exit.unwrap_err();
assert_eq!(pty_err.to_string(), piped_err.to_string());
}
}
Ok(())
}
#[cfg(unix)]
#[tokio::test]
// Unix-only: relies on /bin/sh + stty to disable echo; cmd/ConPTY behavior differs.
// Verifies reading from stdin works and matches output parity.
async fn stdin_read_parity() -> anyhow::Result<()> {
let Some(stty) = stty_path() else {
return Ok(());
};
let command = shell_command(&format!(
r#"if [ -t 0 ]; then "{stty}" -echo; fi; IFS= read -r line; printf "got:%s\n" "$line""#,
));
let env = base_env();
let cwd = temp_dir();
let input = Some(b"hello\n".to_vec());
let pty = run_pty(&command, &cwd, &env, input.clone()).await?;
let piped = run_piped(&command, &cwd, &env, input).await?;
let expected = vec!["got:hello".to_string()];
let pty_lines = strip_echoed_input(normalize_lines(&pty.output), "hello");
let piped_lines = strip_echoed_input(normalize_lines(&piped.output), "hello");
assert_eq!(pty_lines, expected);
assert_eq!(piped_lines, expected);
Ok(())
}
#[cfg(unix)]
fn stty_path() -> Option<String> {
let candidates = ["/bin/stty", "/usr/bin/stty"];
candidates
.into_iter()
.find(|path| Path::new(*path).exists())
.map(str::to_string)
}