mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
pty: refresh stdin round-trip split for windows
This commit is contained in:
@@ -56,7 +56,13 @@ fn echo_sleep_command(marker: &str) -> String {
|
||||
}
|
||||
|
||||
fn split_stdout_stderr_command() -> String {
|
||||
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
|
||||
if cfg!(windows) {
|
||||
// Keep this in cmd.exe syntax so the test does not depend on a runner-local
|
||||
// PowerShell/Python setup just to produce deterministic split output.
|
||||
"(echo split-out)&(>&2 echo split-err)".to_string()
|
||||
} else {
|
||||
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
|
||||
@@ -130,52 +136,36 @@ async fn collect_output_until_exit(
|
||||
}
|
||||
|
||||
async fn wait_for_python_repl_ready(
|
||||
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
|
||||
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
timeout_ms: u64,
|
||||
newline: &str,
|
||||
ready_marker: &str,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let mut collected = Vec::new();
|
||||
let marker = "__codex_pty_ready__";
|
||||
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
|
||||
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
writer
|
||||
.send(format!("print('{marker}'){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
let probe_deadline = tokio::time::Instant::now() + probe_window;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline || now >= probe_deadline {
|
||||
break;
|
||||
}
|
||||
let remaining = std::cmp::min(
|
||||
deadline.saturating_duration_since(now),
|
||||
probe_deadline.saturating_duration_since(now),
|
||||
);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
let now = tokio::time::Instant::now();
|
||||
let remaining = deadline.saturating_duration_since(now);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(ready_marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!(
|
||||
"timed out waiting for Python REPL readiness in PTY: {:?}",
|
||||
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
@@ -254,10 +244,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let ready_marker = "__codex_pty_ready__";
|
||||
let args = vec![
|
||||
"-i".to_string(),
|
||||
"-q".to_string(),
|
||||
"-c".to_string(),
|
||||
format!("print('{ready_marker}')"),
|
||||
];
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pty_process(
|
||||
&python,
|
||||
&[],
|
||||
&args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
@@ -269,7 +266,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
|
||||
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
|
||||
let mut output =
|
||||
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
|
||||
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
|
||||
writer
|
||||
.send(format!("print('hello from pty'){newline}").into_bytes())
|
||||
.await?;
|
||||
@@ -429,21 +426,7 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let (program, args) = if cfg!(windows) {
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr");
|
||||
return Ok(());
|
||||
};
|
||||
(
|
||||
python,
|
||||
vec![
|
||||
"-c".to_string(),
|
||||
"import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(),
|
||||
],
|
||||
)
|
||||
} else {
|
||||
shell_command(&split_stdout_stderr_command())
|
||||
};
|
||||
let (program, args) = shell_command(&split_stdout_stderr_command());
|
||||
let spawned =
|
||||
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let SpawnedProcess {
|
||||
@@ -466,8 +449,19 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??;
|
||||
|
||||
assert_eq!(stdout, b"split-out\n".to_vec());
|
||||
assert_eq!(stderr, b"split-err\n".to_vec());
|
||||
let expected_stdout = if cfg!(windows) {
|
||||
b"split-out\r\n".to_vec()
|
||||
} else {
|
||||
b"split-out\n".to_vec()
|
||||
};
|
||||
let expected_stderr = if cfg!(windows) {
|
||||
b"split-err\r\n".to_vec()
|
||||
} else {
|
||||
b"split-err\n".to_vec()
|
||||
};
|
||||
|
||||
assert_eq!(stdout, expected_stdout);
|
||||
assert_eq!(stderr, expected_stderr);
|
||||
assert_eq!(code, 0);
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user