Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
e211b11639 pty: refresh stdin round-trip split for windows 2026-03-09 00:56:29 -07:00
Ahmed Ibrahim
4904ea4d29 codex: close pipe test stdin deterministically 2026-03-08 17:09:40 -07:00

View File

@@ -56,7 +56,13 @@ fn echo_sleep_command(marker: &str) -> String {
}
fn split_stdout_stderr_command() -> String {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
if cfg!(windows) {
// Keep this in cmd.exe syntax so the test does not depend on a runner-local
// PowerShell/Python setup just to produce deterministic split output.
"(echo split-out)&(>&2 echo split-err)".to_string()
} else {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
}
}
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
@@ -130,52 +136,36 @@ async fn collect_output_until_exit(
}
async fn wait_for_python_repl_ready(
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
timeout_ms: u64,
newline: &str,
ready_marker: &str,
) -> anyhow::Result<Vec<u8>> {
let mut collected = Vec::new();
let marker = "__codex_pty_ready__";
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
while tokio::time::Instant::now() < deadline {
writer
.send(format!("print('{marker}'){newline}").into_bytes())
.await?;
let probe_deadline = tokio::time::Instant::now() + probe_window;
loop {
let now = tokio::time::Instant::now();
if now >= deadline || now >= probe_deadline {
break;
}
let remaining = std::cmp::min(
deadline.saturating_duration_since(now),
probe_deadline.saturating_duration_since(now),
);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(marker) {
return Ok(collected);
}
let now = tokio::time::Instant::now();
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(ready_marker) {
return Ok(collected);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
}
anyhow::bail!(
"timed out waiting for Python REPL readiness in PTY: {:?}",
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
String::from_utf8_lossy(&collected)
);
}
@@ -254,10 +244,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
return Ok(());
};
let ready_marker = "__codex_pty_ready__";
let args = vec![
"-i".to_string(),
"-q".to_string(),
"-c".to_string(),
format!("print('{ready_marker}')"),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pty_process(
&python,
&[],
&args,
Path::new("."),
&env_map,
&None,
@@ -269,7 +266,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let mut output =
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
writer
.send(format!("print('hello from pty'){newline}").into_bytes())
.await?;
@@ -306,6 +303,8 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
let (session, output_rx, exit_rx) = combine_spawned_output(spawned);
let writer = session.writer_sender();
writer.send(b"roundtrip\n".to_vec()).await?;
drop(writer);
session.close_stdin();
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
let text = String::from_utf8_lossy(&output);
@@ -427,21 +426,7 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let (program, args) = if cfg!(windows) {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr");
return Ok(());
};
(
python,
vec![
"-c".to_string(),
"import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(),
],
)
} else {
shell_command(&split_stdout_stderr_command())
};
let (program, args) = shell_command(&split_stdout_stderr_command());
let spawned =
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
let SpawnedProcess {
@@ -464,8 +449,19 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
.await
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??;
assert_eq!(stdout, b"split-out\n".to_vec());
assert_eq!(stderr, b"split-err\n".to_vec());
let expected_stdout = if cfg!(windows) {
b"split-out\r\n".to_vec()
} else {
b"split-out\n".to_vec()
};
let expected_stderr = if cfg!(windows) {
b"split-err\r\n".to_vec()
} else {
b"split-err\n".to_vec()
};
assert_eq!(stdout, expected_stdout);
assert_eq!(stderr, expected_stderr);
assert_eq!(code, 0);
Ok(())