mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
8 Commits
main
...
jif/fallba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd061a6292 | ||
|
|
459abecf52 | ||
|
|
10d048fbd5 | ||
|
|
dfdf696062 | ||
|
|
ad39b7ea72 | ||
|
|
8baa8c912b | ||
|
|
6793b686ec | ||
|
|
0c34d6633f |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1881,6 +1881,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"log",
|
||||
"portable-pty",
|
||||
"pretty_assertions",
|
||||
"shared_library",
|
||||
"tokio",
|
||||
"winapi",
|
||||
|
||||
@@ -43,12 +43,7 @@ impl ToolsConfig {
|
||||
let shell_type = if !features.enabled(Feature::ShellTool) {
|
||||
ConfigShellToolType::Disabled
|
||||
} else if features.enabled(Feature::UnifiedExec) {
|
||||
// If ConPTY not supported (for old Windows versions), fallback on ShellCommand.
|
||||
if codex_utils_pty::conpty_supported() {
|
||||
ConfigShellToolType::UnifiedExec
|
||||
} else {
|
||||
ConfigShellToolType::ShellCommand
|
||||
}
|
||||
ConfigShellToolType::UnifiedExec
|
||||
} else {
|
||||
model_family.shell_type
|
||||
};
|
||||
|
||||
@@ -460,7 +460,7 @@ impl UnifiedExecSessionManager {
|
||||
.split_first()
|
||||
.ok_or(UnifiedExecError::MissingCommandLine)?;
|
||||
|
||||
let spawned = codex_utils_pty::spawn_pty_process(
|
||||
let spawned = codex_utils_pty::spawn_exec_session(
|
||||
program,
|
||||
args,
|
||||
env.cwd.as_path(),
|
||||
|
||||
@@ -12,6 +12,9 @@ anyhow = { workspace = true }
|
||||
portable-pty = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
filedescriptor = "0.8.3"
|
||||
lazy_static = { workspace = true }
|
||||
|
||||
20
codex-rs/utils/pty/README.md
Normal file
20
codex-rs/utils/pty/README.md
Normal file
@@ -0,0 +1,20 @@
|
||||
# codex-utils-pty parity notes
|
||||
|
||||
The tests in `utils/pty/src/lib.rs` compare PTY-backed sessions with the piped
|
||||
fallback to ensure the shared behaviors stay aligned. Some behavior differences
|
||||
are inherent to PTY semantics, so the tests normalize or explicitly allow them.
|
||||
|
||||
Known differences
|
||||
|
||||
- TTY line discipline: PTY-backed children run with a terminal line discipline
|
||||
(canonical input, echo, signal generation). Piped fallback uses plain pipes,
|
||||
so input is raw and no echo or line editing occurs.
|
||||
- Line endings: PTY output may translate LF to CRLF (for example when `ONLCR` is
|
||||
enabled). Piped output preserves what the program writes.
|
||||
- Stdout/stderr interleaving: PTY output is a single stream with ordering
|
||||
preserved by the terminal. Piped fallback merges stdout/stderr from separate
|
||||
readers, so relative ordering between the streams is not guaranteed.
|
||||
- Terminal features: PTY sessions support terminal-only behaviors (window size,
|
||||
control sequences). Piped fallback does not emulate these features.
|
||||
- Signals: PTY sessions can receive terminal-generated signals (such as Ctrl+C).
|
||||
Piped fallback does not provide that terminal-level signaling.
|
||||
171
codex-rs/utils/pty/src/fallback.rs
Normal file
171
codex-rs/utils/pty/src/fallback.rs
Normal file
@@ -0,0 +1,171 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::process::Command as StdCommand;
|
||||
use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use super::ExecCommandSession;
|
||||
use super::SpawnedPty;
|
||||
|
||||
pub async fn spawn_piped_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedPty> {
|
||||
if program.is_empty() {
|
||||
anyhow::bail!("missing program for exec spawn");
|
||||
}
|
||||
|
||||
let program = arg0.as_deref().unwrap_or(program);
|
||||
let mut command = StdCommand::new(program);
|
||||
command.args(args);
|
||||
command.current_dir(cwd);
|
||||
command.env_clear();
|
||||
command.envs(env);
|
||||
command
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
let mut child = command.spawn()?;
|
||||
|
||||
let stdin = child.stdin.take().ok_or_else(|| {
|
||||
anyhow::anyhow!("stdin pipe was unexpectedly not available for exec spawn")
|
||||
})?;
|
||||
let stdout = child.stdout.take().ok_or_else(|| {
|
||||
anyhow::anyhow!("stdout pipe was unexpectedly not available for exec spawn")
|
||||
})?;
|
||||
let stderr = child.stderr.take().ok_or_else(|| {
|
||||
anyhow::anyhow!("stderr pipe was unexpectedly not available for exec spawn")
|
||||
})?;
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
|
||||
// Pipes separate stdout and stderr; merge to match PTY semantics.
|
||||
let stdout_handle = spawn_pipe_reader(stdout, output_tx.clone());
|
||||
let stderr_handle = spawn_pipe_reader(stderr, output_tx.clone());
|
||||
|
||||
let writer_handle = tokio::task::spawn_blocking(move || {
|
||||
let mut stdin = stdin;
|
||||
use std::io::Write;
|
||||
while let Some(bytes) = writer_rx.blocking_recv() {
|
||||
let _ = stdin.write_all(&bytes);
|
||||
let _ = stdin.flush();
|
||||
}
|
||||
});
|
||||
|
||||
let child = Arc::new(StdMutex::new(child));
|
||||
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
|
||||
let exit_status = Arc::new(AtomicBool::new(false));
|
||||
let wait_exit_status = Arc::clone(&exit_status);
|
||||
let exit_code = Arc::new(StdMutex::new(None));
|
||||
let wait_exit_code = Arc::clone(&exit_code);
|
||||
let wait_child = Arc::clone(&child);
|
||||
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let code = loop {
|
||||
let status = match wait_child.lock() {
|
||||
Ok(mut guard) => guard.try_wait(),
|
||||
Err(_) => break -1,
|
||||
};
|
||||
match status {
|
||||
Ok(Some(status)) => {
|
||||
break status
|
||||
.code()
|
||||
.unwrap_or_else(|| if status.success() { 0 } else { 1 });
|
||||
}
|
||||
Ok(None) => std::thread::sleep(Duration::from_millis(10)),
|
||||
Err(_) => break -1,
|
||||
}
|
||||
};
|
||||
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
if let Ok(mut guard) = wait_exit_code.lock() {
|
||||
*guard = Some(code);
|
||||
}
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
let (session, output_rx) = ExecCommandSession::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
Box::new(PipedChildKiller::new(child)),
|
||||
vec![stdout_handle, stderr_handle],
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
exit_code,
|
||||
None,
|
||||
);
|
||||
|
||||
Ok(SpawnedPty {
|
||||
session,
|
||||
output_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_pipe_reader<R: std::io::Read + Send + 'static>(
|
||||
mut reader: R,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8_192];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx.send(buf[..n].to_vec());
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PipedChildKiller {
|
||||
child: Arc<StdMutex<std::process::Child>>,
|
||||
}
|
||||
|
||||
impl PipedChildKiller {
|
||||
fn new(child: Arc<StdMutex<std::process::Child>>) -> Self {
|
||||
Self { child }
|
||||
}
|
||||
}
|
||||
|
||||
impl portable_pty::ChildKiller for PipedChildKiller {
|
||||
fn kill(&mut self) -> io::Result<()> {
|
||||
if let Ok(mut guard) = self.child.try_lock() {
|
||||
return guard.kill();
|
||||
}
|
||||
|
||||
let child = Arc::clone(&self.child);
|
||||
std::thread::spawn(move || {
|
||||
if let Ok(mut guard) = child.lock() {
|
||||
let _ = guard.kill();
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn clone_killer(&self) -> Box<dyn portable_pty::ChildKiller + Send + Sync> {
|
||||
Box::new(Self {
|
||||
child: Arc::clone(&self.child),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -7,9 +7,13 @@ use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::Duration;
|
||||
|
||||
mod fallback;
|
||||
#[cfg(windows)]
|
||||
mod win;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use anyhow::Result;
|
||||
#[cfg(not(windows))]
|
||||
use portable_pty::native_pty_system;
|
||||
@@ -33,14 +37,14 @@ pub struct ExecCommandSession {
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
killer: StdMutex<Option<Box<dyn portable_pty::ChildKiller + Send + Sync>>>,
|
||||
reader_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
reader_handles: StdMutex<Vec<JoinHandle<()>>>,
|
||||
writer_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
wait_handle: StdMutex<Option<JoinHandle<()>>>,
|
||||
exit_status: Arc<AtomicBool>,
|
||||
exit_code: Arc<StdMutex<Option<i32>>>,
|
||||
// PtyPair must be preserved because the process will receive Control+C if the
|
||||
// slave is closed
|
||||
_pair: StdMutex<PtyPairWrapper>,
|
||||
_pair: StdMutex<Option<PtyPairWrapper>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for PtyPairWrapper {
|
||||
@@ -56,19 +60,19 @@ impl ExecCommandSession {
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
initial_output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
|
||||
reader_handle: JoinHandle<()>,
|
||||
reader_handles: Vec<JoinHandle<()>>,
|
||||
writer_handle: JoinHandle<()>,
|
||||
wait_handle: JoinHandle<()>,
|
||||
exit_status: Arc<AtomicBool>,
|
||||
exit_code: Arc<StdMutex<Option<i32>>>,
|
||||
pair: PtyPairWrapper,
|
||||
pair: Option<PtyPairWrapper>,
|
||||
) -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
(
|
||||
Self {
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer: StdMutex::new(Some(killer)),
|
||||
reader_handle: StdMutex::new(Some(reader_handle)),
|
||||
reader_handles: StdMutex::new(reader_handles),
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
@@ -102,8 +106,8 @@ impl ExecCommandSession {
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(mut h) = self.reader_handle.lock() {
|
||||
if let Some(handle) = h.take() {
|
||||
if let Ok(mut handles) = self.reader_handles.lock() {
|
||||
for handle in handles.drain(..) {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
@@ -152,6 +156,20 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
|
||||
native_pty_system()
|
||||
}
|
||||
|
||||
pub async fn spawn_exec_session(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedPty> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return fallback::spawn_piped_process(program, args, cwd, env, arg0).await;
|
||||
}
|
||||
|
||||
spawn_pty_process(program, args, cwd, env, arg0).await
|
||||
}
|
||||
|
||||
pub async fn spawn_pty_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
@@ -171,7 +189,8 @@ pub async fn spawn_pty_process(
|
||||
pixel_height: 0,
|
||||
})?;
|
||||
|
||||
let mut command_builder = CommandBuilder::new(arg0.as_ref().unwrap_or(&program.to_string()));
|
||||
let program = arg0.as_deref().unwrap_or(program);
|
||||
let mut command_builder = CommandBuilder::new(program);
|
||||
command_builder.cwd(cwd);
|
||||
command_builder.env_clear();
|
||||
for arg in args {
|
||||
@@ -255,12 +274,12 @@ pub async fn spawn_pty_process(
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
killer,
|
||||
reader_handle,
|
||||
vec![reader_handle],
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
exit_code,
|
||||
pair,
|
||||
Some(pair),
|
||||
);
|
||||
|
||||
Ok(SpawnedPty {
|
||||
|
||||
903
codex-rs/utils/pty/src/tests.rs
Normal file
903
codex-rs/utils/pty/src/tests.rs
Normal file
@@ -0,0 +1,903 @@
|
||||
use super::conpty_supported;
|
||||
use super::fallback;
|
||||
use super::spawn_pty_process;
|
||||
use super::ExecCommandSession;
|
||||
use super::SpawnedPty;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Instant;
|
||||
|
||||
const OUTPUT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const TERMINATE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct CommandSpec {
|
||||
program: String,
|
||||
args: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum ExitOutcome {
|
||||
Exited(i32),
|
||||
Dropped,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RunResult {
|
||||
output: Vec<u8>,
|
||||
exit: ExitOutcome,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InteractiveSession {
|
||||
session: ExecCommandSession,
|
||||
writer: mpsc::Sender<Vec<u8>>,
|
||||
output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
exit_rx: oneshot::Receiver<i32>,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
fn shell_command(script: &str) -> CommandSpec {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
CommandSpec {
|
||||
program: "/bin/sh".to_string(),
|
||||
args: vec!["-c".to_string(), script.to_string()],
|
||||
}
|
||||
}
|
||||
#[cfg(windows)]
|
||||
{
|
||||
CommandSpec {
|
||||
program: windows_cmd_path(),
|
||||
args: vec!["/C".to_string(), script.to_string()],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn shell_repl_command() -> CommandSpec {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
CommandSpec {
|
||||
program: "/bin/sh".to_string(),
|
||||
args: Vec::new(),
|
||||
}
|
||||
}
|
||||
#[cfg(windows)]
|
||||
{
|
||||
CommandSpec {
|
||||
program: windows_cmd_path(),
|
||||
args: vec!["/Q".to_string(), "/D".to_string()],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn windows_cmd_path() -> String {
|
||||
if let Ok(comspec) = std::env::var("ComSpec") {
|
||||
return comspec;
|
||||
}
|
||||
if let Ok(system_root) = std::env::var("SystemRoot") {
|
||||
return format!(r"{system_root}\System32\cmd.exe");
|
||||
}
|
||||
r"C:\Windows\System32\cmd.exe".to_string()
|
||||
}
|
||||
|
||||
fn base_env() -> HashMap<String, String> {
|
||||
let mut env = HashMap::new();
|
||||
env.insert("FOO".to_string(), "bar".to_string());
|
||||
#[cfg(windows)]
|
||||
{
|
||||
if let Ok(system_root) = std::env::var("SystemRoot") {
|
||||
env.insert("SystemRoot".to_string(), system_root.clone());
|
||||
env.insert("PATH".to_string(), format!(r"{system_root}\System32"));
|
||||
env.insert("PROMPT".to_string(), "".to_string());
|
||||
}
|
||||
}
|
||||
env
|
||||
}
|
||||
|
||||
fn temp_dir() -> PathBuf {
|
||||
let nanos = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos();
|
||||
let dir = std::env::temp_dir().join(format!("codex-pty-{nanos}"));
|
||||
std::fs::create_dir_all(&dir).unwrap();
|
||||
dir
|
||||
}
|
||||
|
||||
async fn run_pty(
|
||||
command: &CommandSpec,
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
input: Option<Vec<u8>>,
|
||||
) -> anyhow::Result<RunResult> {
|
||||
let spawned = spawn_pty_process(&command.program, &command.args, cwd, env, &None).await?;
|
||||
run_spawned(spawned, input, OUTPUT_TIMEOUT).await
|
||||
}
|
||||
|
||||
async fn run_piped(
|
||||
command: &CommandSpec,
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
input: Option<Vec<u8>>,
|
||||
) -> anyhow::Result<RunResult> {
|
||||
let spawned =
|
||||
fallback::spawn_piped_process(&command.program, &command.args, cwd, env, &None).await?;
|
||||
run_spawned(spawned, input, OUTPUT_TIMEOUT).await
|
||||
}
|
||||
|
||||
async fn run_spawned(
|
||||
spawned: SpawnedPty,
|
||||
input: Option<Vec<u8>>,
|
||||
timeout: Duration,
|
||||
) -> anyhow::Result<RunResult> {
|
||||
let writer = spawned.session.writer_sender();
|
||||
if let Some(bytes) = input {
|
||||
writer.send(bytes).await?;
|
||||
}
|
||||
drop(writer);
|
||||
|
||||
tokio::time::timeout(timeout, collect_output(spawned.output_rx, spawned.exit_rx)).await?
|
||||
}
|
||||
|
||||
async fn spawn_interactive_pty(
|
||||
command: &CommandSpec,
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
) -> anyhow::Result<InteractiveSession> {
|
||||
let spawned = spawn_pty_process(&command.program, &command.args, cwd, env, &None).await?;
|
||||
Ok(InteractiveSession {
|
||||
writer: spawned.session.writer_sender(),
|
||||
session: spawned.session,
|
||||
output_rx: spawned.output_rx,
|
||||
exit_rx: spawned.exit_rx,
|
||||
buffer: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn spawn_interactive_piped(
|
||||
command: &CommandSpec,
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
) -> anyhow::Result<InteractiveSession> {
|
||||
let spawned =
|
||||
fallback::spawn_piped_process(&command.program, &command.args, cwd, env, &None).await?;
|
||||
Ok(InteractiveSession {
|
||||
writer: spawned.session.writer_sender(),
|
||||
session: spawned.session,
|
||||
output_rx: spawned.output_rx,
|
||||
exit_rx: spawned.exit_rx,
|
||||
buffer: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn collect_output(
|
||||
mut output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
mut exit_rx: oneshot::Receiver<i32>,
|
||||
) -> anyhow::Result<RunResult> {
|
||||
let mut output = Vec::new();
|
||||
let mut lagged = None;
|
||||
let exit = loop {
|
||||
tokio::select! {
|
||||
received = output_rx.recv() => {
|
||||
match received {
|
||||
Ok(bytes) => output.extend_from_slice(&bytes),
|
||||
Err(broadcast::error::RecvError::Closed) => {}
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
lagged = Some(skipped);
|
||||
}
|
||||
}
|
||||
}
|
||||
res = &mut exit_rx => {
|
||||
break match res {
|
||||
Ok(code) => ExitOutcome::Exited(code),
|
||||
Err(_) => ExitOutcome::Dropped,
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(skipped) = lagged {
|
||||
anyhow::bail!("output lagged by {skipped} messages");
|
||||
}
|
||||
|
||||
let drain_deadline = Instant::now() + Duration::from_millis(50);
|
||||
while Instant::now() < drain_deadline {
|
||||
match output_rx.try_recv() {
|
||||
Ok(bytes) => output.extend_from_slice(&bytes),
|
||||
Err(broadcast::error::TryRecvError::Empty) => break,
|
||||
Err(broadcast::error::TryRecvError::Closed) => break,
|
||||
Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
|
||||
anyhow::bail!("output lagged by {skipped} messages");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(RunResult { output, exit })
|
||||
}
|
||||
|
||||
async fn wait_for_output_contains(
|
||||
output_rx: &mut broadcast::Receiver<Vec<u8>>,
|
||||
buffer: &mut Vec<u8>,
|
||||
marker: &str,
|
||||
timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
tokio::time::timeout(timeout, async {
|
||||
loop {
|
||||
if normalize_output(buffer).contains(marker) {
|
||||
return Ok(());
|
||||
}
|
||||
match output_rx.recv().await {
|
||||
Ok(bytes) => buffer.extend_from_slice(&bytes),
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
anyhow::bail!("output channel closed before receiving {marker}");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
anyhow::bail!("output lagged by {skipped} messages");
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn finish_interactive(session: InteractiveSession) -> anyhow::Result<RunResult> {
|
||||
let mut result = collect_output(session.output_rx, session.exit_rx).await?;
|
||||
let mut output = session.buffer;
|
||||
output.extend_from_slice(&result.output);
|
||||
result.output = output;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn normalize_output(output: &[u8]) -> String {
|
||||
String::from_utf8_lossy(output)
|
||||
.replace("\r\n", "\n")
|
||||
.replace('\r', "\n")
|
||||
}
|
||||
|
||||
fn normalize_lines(output: &[u8]) -> Vec<String> {
|
||||
normalize_output(output)
|
||||
.lines()
|
||||
.map(str::to_string)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn normalize_cwd_lines(output: &[u8]) -> Vec<String> {
|
||||
normalize_lines(output)
|
||||
.into_iter()
|
||||
.map(|line| {
|
||||
let Some(stripped) = line.strip_prefix("CWD=/private/") else {
|
||||
return line;
|
||||
};
|
||||
format!("CWD=/{stripped}")
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// This is not dead code but this is not used on Windows, and we still want compiles check
|
||||
// everywhere.
|
||||
#[allow(dead_code)]
|
||||
fn strip_echoed_input(lines: Vec<String>, input: &str) -> Vec<String> {
|
||||
lines.into_iter().filter(|line| line != input).collect()
|
||||
}
|
||||
|
||||
fn assert_lines_match(actual: &[u8], expected: &[&str]) {
|
||||
let lines = normalize_lines(actual);
|
||||
let expected = expected
|
||||
.iter()
|
||||
.copied()
|
||||
.map(str::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(lines, expected);
|
||||
}
|
||||
|
||||
fn assert_line_set_match(actual: &[u8], expected: &[&str]) {
|
||||
let mut lines = normalize_lines(actual);
|
||||
lines.retain(|line| !line.is_empty());
|
||||
lines.sort();
|
||||
let mut expected = expected
|
||||
.iter()
|
||||
.copied()
|
||||
.map(str::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
expected.sort();
|
||||
assert_eq!(lines, expected);
|
||||
}
|
||||
|
||||
fn assert_parity(pty: &RunResult, piped: &RunResult) {
|
||||
assert_eq!(pty.exit, piped.exit);
|
||||
assert_eq!(
|
||||
normalize_output(&pty.output),
|
||||
normalize_output(&piped.output)
|
||||
);
|
||||
}
|
||||
|
||||
fn line_ending() -> &'static str {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
"\r\n"
|
||||
}
|
||||
#[cfg(unix)]
|
||||
{
|
||||
"\n"
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_line(
|
||||
writer: &mut mpsc::Sender<Vec<u8>>,
|
||||
line: &str,
|
||||
) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
|
||||
writer
|
||||
.send(format!("{line}{line_ending}", line_ending = line_ending()).into_bytes())
|
||||
.await
|
||||
}
|
||||
|
||||
fn extract_marker_lines(output: &[u8]) -> Vec<String> {
|
||||
normalize_output(output)
|
||||
.lines()
|
||||
.filter_map(|line| {
|
||||
let trimmed = line.trim_start();
|
||||
trimmed
|
||||
.starts_with("CMD_MARKER_")
|
||||
.then(|| trimmed.to_string())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn output_has_marker_line(output: &[u8], marker: &str) -> bool {
|
||||
normalize_output(output).lines().any(|line| {
|
||||
let trimmed = line.trim_start();
|
||||
trimmed.starts_with(marker)
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies basic stdout and exit code parity for a simple command.
|
||||
async fn standard_output_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
r#"printf "hello\nworld\n"; exit 7"#,
|
||||
#[cfg(windows)]
|
||||
r#"echo hello & echo world & exit /b 7"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = run_pty(&command, &cwd, &env, None).await?;
|
||||
let piped = run_piped(&command, &cwd, &env, None).await?;
|
||||
|
||||
assert_eq!(pty.exit, ExitOutcome::Exited(7));
|
||||
assert_eq!(piped.exit, ExitOutcome::Exited(7));
|
||||
assert_lines_match(&pty.output, &["hello", "world"]);
|
||||
assert_parity(&pty, &piped);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies commands with no output stay empty and match parity.
|
||||
async fn no_output_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
"true",
|
||||
#[cfg(windows)]
|
||||
"exit /b 0",
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = run_pty(&command, &cwd, &env, None).await?;
|
||||
let piped = run_piped(&command, &cwd, &env, None).await?;
|
||||
|
||||
assert_eq!(normalize_output(&pty.output), "");
|
||||
assert_eq!(normalize_output(&piped.output), "");
|
||||
assert_parity(&pty, &piped);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies exit state and stored exit code match PTY and are consistent with exit_rx.
|
||||
async fn exit_state_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
r#"printf "done\n"; exit 3"#,
|
||||
#[cfg(windows)]
|
||||
r#"echo done & exit /b 3"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty_spawned = spawn_pty_process(&command.program, &command.args, &cwd, &env, &None).await?;
|
||||
let pty_result = tokio::time::timeout(
|
||||
OUTPUT_TIMEOUT,
|
||||
collect_output(pty_spawned.output_rx, pty_spawned.exit_rx),
|
||||
)
|
||||
.await??;
|
||||
let pty_code = match pty_result.exit {
|
||||
ExitOutcome::Exited(code) => code,
|
||||
ExitOutcome::Dropped => anyhow::bail!("pty exit dropped"),
|
||||
};
|
||||
assert!(pty_spawned.session.has_exited());
|
||||
assert_eq!(pty_spawned.session.exit_code(), Some(pty_code));
|
||||
|
||||
let piped_spawned =
|
||||
fallback::spawn_piped_process(&command.program, &command.args, &cwd, &env, &None).await?;
|
||||
let piped_result = tokio::time::timeout(
|
||||
OUTPUT_TIMEOUT,
|
||||
collect_output(piped_spawned.output_rx, piped_spawned.exit_rx),
|
||||
)
|
||||
.await??;
|
||||
let piped_code = match piped_result.exit {
|
||||
ExitOutcome::Exited(code) => code,
|
||||
ExitOutcome::Dropped => anyhow::bail!("piped exit dropped"),
|
||||
};
|
||||
assert!(piped_spawned.session.has_exited());
|
||||
assert_eq!(piped_spawned.session.exit_code(), Some(piped_code));
|
||||
|
||||
assert_eq!(pty_code, piped_code);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies env propagation and working directory parity.
|
||||
async fn env_and_cwd_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
r#"printf "FOO=%s\n" "$FOO"; printf "CWD=%s\n" "$(pwd)""#,
|
||||
#[cfg(windows)]
|
||||
r#"echo FOO=%FOO% & echo CWD=%CD%"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = run_pty(&command, &cwd, &env, None).await?;
|
||||
let piped = run_piped(&command, &cwd, &env, None).await?;
|
||||
|
||||
let cwd_line = format!("CWD={}", cwd.display());
|
||||
let expected = vec!["FOO=bar".to_string(), cwd_line];
|
||||
assert_eq!(normalize_cwd_lines(&pty.output), expected);
|
||||
assert_eq!(normalize_cwd_lines(&piped.output), expected);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies large output throughput parity.
|
||||
async fn large_output_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
r#"i=0; while [ $i -lt 200 ]; do printf "line-%s\n" "$i"; i=$((i+1)); done"#,
|
||||
#[cfg(windows)]
|
||||
r#"for /L %i in (0,1,199) do @echo line-%i"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = run_pty(&command, &cwd, &env, None).await?;
|
||||
let piped = run_piped(&command, &cwd, &env, None).await?;
|
||||
|
||||
let mut expected = Vec::new();
|
||||
for i in 0..200 {
|
||||
expected.push(format!("line-{i}"));
|
||||
}
|
||||
assert_eq!(normalize_lines(&pty.output), expected);
|
||||
assert_parity(&pty, &piped);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies merged stdout/stderr output parity, ignoring ordering.
|
||||
async fn stderr_merge_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
r#"printf "stdout\n"; printf "stderr\n" 1>&2"#,
|
||||
#[cfg(windows)]
|
||||
r#"echo stdout & echo stderr 1>&2"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = run_pty(&command, &cwd, &env, None).await?;
|
||||
let piped = run_piped(&command, &cwd, &env, None).await?;
|
||||
|
||||
assert_line_set_match(&pty.output, &["stdout", "stderr"]);
|
||||
assert_line_set_match(&piped.output, &["stdout", "stderr"]);
|
||||
assert_eq!(pty.exit, piped.exit);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies terminate behavior parity for long-running children.
|
||||
async fn terminate_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
"sleep 60",
|
||||
#[cfg(windows)]
|
||||
r#"ping -n 60 127.0.0.1 >nul"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = spawn_pty_process(&command.program, &command.args, &cwd, &env, &None).await?;
|
||||
let piped =
|
||||
fallback::spawn_piped_process(&command.program, &command.args, &cwd, &env, &None).await?;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
pty.session.terminate();
|
||||
piped.session.terminate();
|
||||
|
||||
let pty_exit = tokio::time::timeout(TERMINATE_TIMEOUT, pty.exit_rx).await;
|
||||
let piped_exit = tokio::time::timeout(TERMINATE_TIMEOUT, piped.exit_rx).await;
|
||||
|
||||
assert!(pty_exit.is_ok());
|
||||
assert!(piped_exit.is_ok());
|
||||
|
||||
let pty_exit = pty_exit.unwrap();
|
||||
let piped_exit = piped_exit.unwrap();
|
||||
match pty_exit {
|
||||
Ok(code) => {
|
||||
let piped_code = piped_exit?;
|
||||
assert_eq!(code, piped_code);
|
||||
}
|
||||
Err(_) => {
|
||||
assert!(piped_exit.is_err());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies terminate can be called twice and piped matches PTY.
|
||||
async fn terminate_idempotency_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
"sleep 60",
|
||||
#[cfg(windows)]
|
||||
r#"ping -n 60 127.0.0.1 >nul"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let pty = spawn_pty_process(&command.program, &command.args, &cwd, &env, &None).await?;
|
||||
let piped =
|
||||
fallback::spawn_piped_process(&command.program, &command.args, &cwd, &env, &None).await?;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
pty.session.terminate();
|
||||
pty.session.terminate();
|
||||
piped.session.terminate();
|
||||
piped.session.terminate();
|
||||
|
||||
let pty_exit = tokio::time::timeout(TERMINATE_TIMEOUT, pty.exit_rx).await?;
|
||||
let piped_exit = tokio::time::timeout(TERMINATE_TIMEOUT, piped.exit_rx).await?;
|
||||
match pty_exit {
|
||||
Ok(code) => {
|
||||
let piped_code = piped_exit?;
|
||||
assert_eq!(code, piped_code);
|
||||
}
|
||||
Err(pty_err) => {
|
||||
let piped_err = piped_exit.unwrap_err();
|
||||
assert_eq!(pty_err.to_string(), piped_err.to_string());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies empty program errors are consistent across implementations.
|
||||
async fn empty_program_errors() {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return;
|
||||
}
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
let args = Vec::new();
|
||||
|
||||
let pty_err = spawn_pty_process("", &args, &cwd, &env, &None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let piped_err = fallback::spawn_piped_process("", &args, &cwd, &env, &None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
let pty_msg = pty_err.to_string();
|
||||
let piped_msg = piped_err.to_string();
|
||||
assert!(pty_msg.contains("missing program"));
|
||||
assert!(piped_msg.contains("missing program"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies arg0 overrides the program path for both implementations.
|
||||
async fn arg0_overrides_program() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_command(
|
||||
#[cfg(unix)]
|
||||
r#"printf "ok\n""#,
|
||||
#[cfg(windows)]
|
||||
r#"echo ok"#,
|
||||
);
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
let bogus = "this-does-not-exist".to_string();
|
||||
let arg0 = Some(command.program.clone());
|
||||
|
||||
let pty = spawn_pty_process(&bogus, &command.args, &cwd, &env, &arg0).await?;
|
||||
let piped = fallback::spawn_piped_process(&bogus, &command.args, &cwd, &env, &arg0).await?;
|
||||
|
||||
let pty = run_spawned(pty, None, OUTPUT_TIMEOUT).await?;
|
||||
let piped = run_spawned(piped, None, OUTPUT_TIMEOUT).await?;
|
||||
|
||||
assert_lines_match(&pty.output, &["ok"]);
|
||||
assert_parity(&pty, &piped);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies multi-command interactive sessions return outputs in order.
|
||||
async fn multi_command_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_repl_command();
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let mut pty = spawn_interactive_pty(&command, &cwd, &env).await?;
|
||||
let mut piped = spawn_interactive_piped(&command, &cwd, &env).await?;
|
||||
|
||||
let marker_one = "CMD_MARKER_ONE";
|
||||
send_line(&mut pty.writer, &format!("echo {marker_one}")).await?;
|
||||
send_line(&mut piped.writer, &format!("echo {marker_one}")).await?;
|
||||
wait_for_output_contains(
|
||||
&mut pty.output_rx,
|
||||
&mut pty.buffer,
|
||||
marker_one,
|
||||
OUTPUT_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
wait_for_output_contains(
|
||||
&mut piped.output_rx,
|
||||
&mut piped.buffer,
|
||||
marker_one,
|
||||
OUTPUT_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let marker_two = "CMD_MARKER_TWO";
|
||||
send_line(&mut pty.writer, &format!("echo {marker_two}")).await?;
|
||||
send_line(&mut piped.writer, &format!("echo {marker_two}")).await?;
|
||||
wait_for_output_contains(
|
||||
&mut pty.output_rx,
|
||||
&mut pty.buffer,
|
||||
marker_two,
|
||||
OUTPUT_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
wait_for_output_contains(
|
||||
&mut piped.output_rx,
|
||||
&mut piped.buffer,
|
||||
marker_two,
|
||||
OUTPUT_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
send_line(&mut pty.writer, "exit").await?;
|
||||
send_line(&mut piped.writer, "exit").await?;
|
||||
|
||||
let pty = finish_interactive(pty).await?;
|
||||
let piped = finish_interactive(piped).await?;
|
||||
|
||||
assert_eq!(pty.exit, ExitOutcome::Exited(0));
|
||||
assert_eq!(piped.exit, ExitOutcome::Exited(0));
|
||||
let pty_markers = extract_marker_lines(&pty.output);
|
||||
let piped_markers = extract_marker_lines(&piped.output);
|
||||
assert_eq!(pty_markers, piped_markers);
|
||||
assert_eq!(
|
||||
pty_markers,
|
||||
vec![marker_one.to_string(), marker_two.to_string()]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies late output subscribers receive subsequent output.
|
||||
async fn output_subscriber_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_repl_command();
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let mut pty = spawn_interactive_pty(&command, &cwd, &env).await?;
|
||||
let mut piped = spawn_interactive_piped(&command, &cwd, &env).await?;
|
||||
|
||||
let mut pty_late_rx = pty.session.output_receiver();
|
||||
let mut piped_late_rx = piped.session.output_receiver();
|
||||
let mut pty_late_buffer = Vec::new();
|
||||
let mut piped_late_buffer = Vec::new();
|
||||
|
||||
let marker = "CMD_MARKER_SUB";
|
||||
send_line(&mut pty.writer, &format!("echo {marker}")).await?;
|
||||
send_line(&mut piped.writer, &format!("echo {marker}")).await?;
|
||||
tokio::time::timeout(OUTPUT_TIMEOUT, async {
|
||||
loop {
|
||||
if output_has_marker_line(&pty_late_buffer, marker) {
|
||||
break;
|
||||
}
|
||||
match pty_late_rx.recv().await {
|
||||
Ok(bytes) => pty_late_buffer.extend_from_slice(&bytes),
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
anyhow::bail!("output channel closed before receiving {marker}");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
anyhow::bail!("output lagged by {skipped} messages");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})
|
||||
.await??;
|
||||
tokio::time::timeout(OUTPUT_TIMEOUT, async {
|
||||
loop {
|
||||
if output_has_marker_line(&piped_late_buffer, marker) {
|
||||
break;
|
||||
}
|
||||
match piped_late_rx.recv().await {
|
||||
Ok(bytes) => piped_late_buffer.extend_from_slice(&bytes),
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
anyhow::bail!("output channel closed before receiving {marker}");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
anyhow::bail!("output lagged by {skipped} messages");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
send_line(&mut pty.writer, "exit").await?;
|
||||
send_line(&mut piped.writer, "exit").await?;
|
||||
|
||||
let pty = finish_interactive(pty).await?;
|
||||
let piped = finish_interactive(piped).await?;
|
||||
assert_eq!(pty.exit, ExitOutcome::Exited(0));
|
||||
assert_eq!(piped.exit, ExitOutcome::Exited(0));
|
||||
assert_eq!(
|
||||
extract_marker_lines(&pty_late_buffer),
|
||||
vec![marker.to_string()]
|
||||
);
|
||||
assert_eq!(
|
||||
extract_marker_lines(&piped_late_buffer),
|
||||
vec![marker.to_string()]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies post-kill write failures and exit errors match.
|
||||
async fn multi_command_after_kill_parity() -> anyhow::Result<()> {
|
||||
if cfg!(windows) && !conpty_supported() {
|
||||
return Ok(());
|
||||
}
|
||||
let command = shell_repl_command();
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
|
||||
let mut pty = spawn_interactive_pty(&command, &cwd, &env).await?;
|
||||
let mut piped = spawn_interactive_piped(&command, &cwd, &env).await?;
|
||||
|
||||
let marker = "CMD_MARKER_KILL";
|
||||
send_line(&mut pty.writer, &format!("echo {marker}")).await?;
|
||||
send_line(&mut piped.writer, &format!("echo {marker}")).await?;
|
||||
wait_for_output_contains(&mut pty.output_rx, &mut pty.buffer, marker, OUTPUT_TIMEOUT).await?;
|
||||
wait_for_output_contains(
|
||||
&mut piped.output_rx,
|
||||
&mut piped.buffer,
|
||||
marker,
|
||||
OUTPUT_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
pty.session.terminate();
|
||||
piped.session.terminate();
|
||||
|
||||
let pty_send = send_line(&mut pty.writer, "echo CMD_MARKER_AFTER_KILL")
|
||||
.await
|
||||
.map_err(|err| err.to_string());
|
||||
let piped_send = send_line(&mut piped.writer, "echo CMD_MARKER_AFTER_KILL")
|
||||
.await
|
||||
.map_err(|err| err.to_string());
|
||||
match (pty_send, piped_send) {
|
||||
(Ok(()), Ok(())) => {}
|
||||
(Err(pty_err), Err(piped_err)) => {
|
||||
assert_eq!(pty_err, piped_err);
|
||||
}
|
||||
(Ok(()), Err(piped_err)) => {
|
||||
panic!("piped write failed while PTY succeeded: {piped_err}");
|
||||
}
|
||||
(Err(pty_err), Ok(())) => {
|
||||
panic!("piped write succeeded while PTY failed: {pty_err}");
|
||||
}
|
||||
}
|
||||
|
||||
let pty_exit = tokio::time::timeout(TERMINATE_TIMEOUT, pty.exit_rx).await?;
|
||||
let piped_exit = tokio::time::timeout(TERMINATE_TIMEOUT, piped.exit_rx).await?;
|
||||
match pty_exit {
|
||||
Ok(code) => {
|
||||
let piped_code = piped_exit?;
|
||||
assert_eq!(code, piped_code);
|
||||
}
|
||||
Err(pty_err) => {
|
||||
let piped_err = piped_exit.unwrap_err();
|
||||
assert_eq!(pty_err.to_string(), piped_err.to_string());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
// Unix-only: relies on /bin/sh + stty to disable echo; cmd/ConPTY behavior differs.
|
||||
// Verifies reading from stdin works and matches output parity.
|
||||
async fn stdin_read_parity() -> anyhow::Result<()> {
|
||||
let Some(stty) = stty_path() else {
|
||||
return Ok(());
|
||||
};
|
||||
let command = shell_command(&format!(
|
||||
r#"if [ -t 0 ]; then "{stty}" -echo; fi; IFS= read -r line; printf "got:%s\n" "$line""#,
|
||||
));
|
||||
let env = base_env();
|
||||
let cwd = temp_dir();
|
||||
let input = Some(b"hello\n".to_vec());
|
||||
|
||||
let pty = run_pty(&command, &cwd, &env, input.clone()).await?;
|
||||
let piped = run_piped(&command, &cwd, &env, input).await?;
|
||||
|
||||
let expected = vec!["got:hello".to_string()];
|
||||
let pty_lines = strip_echoed_input(normalize_lines(&pty.output), "hello");
|
||||
let piped_lines = strip_echoed_input(normalize_lines(&piped.output), "hello");
|
||||
assert_eq!(pty_lines, expected);
|
||||
assert_eq!(piped_lines, expected);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn stty_path() -> Option<String> {
|
||||
let candidates = ["/bin/stty", "/usr/bin/stty"];
|
||||
candidates
|
||||
.into_iter()
|
||||
.find(|path| Path::new(*path).exists())
|
||||
.map(str::to_string)
|
||||
}
|
||||
Reference in New Issue
Block a user