mirror of
https://github.com/openai/codex.git
synced 2026-05-01 01:47:18 +00:00
fix: preserve zsh-fork escalation fds across unified-exec spawn paths (#13644)
## Why `zsh-fork` sessions launched through unified-exec need the escalation socket to survive the wrapper -> server -> child handoff so later intercepted `exec()` calls can still reach the escalation server. The inherited-fd spawn path also needs to avoid closing Rust's internal exec-error pipe, and the shell-escalation handoff needs to tolerate the receive-side case where a transferred fd is installed into the same stdio slot it will be mapped onto. ## What Changed - Added `SpawnLifecycle::inherited_fds()` in `codex-rs/core/src/unified_exec/process.rs` and threaded inherited fds through `codex-rs/core/src/unified_exec/process_manager.rs` so unified-exec can preserve required descriptors across both PTY and no-stdin pipe spawn paths. - Updated `codex-rs/core/src/tools/runtimes/shell/zsh_fork_backend.rs` to expose the escalation socket fd through the spawn lifecycle. - Added inherited-fd-aware spawn helpers in `codex-rs/utils/pty/src/pty.rs` and `codex-rs/utils/pty/src/pipe.rs`, including Unix pre-exec fd pruning that preserves requested inherited fds while leaving `FD_CLOEXEC` descriptors alone. The pruning helper is now named `close_inherited_fds_except()` to better describe that behavior. - Updated `codex-rs/shell-escalation/src/unix/escalate_client.rs` to duplicate local stdio before transfer and send destination stdio numbers in `SuperExecMessage`, so the wrapper keeps using its own `stdin`/`stdout`/`stderr` until the escalated child takes over. - Updated `codex-rs/shell-escalation/src/unix/escalate_server.rs` so the server accepts the overlap case where a received fd reuses the same stdio descriptor number that the child setup will target with `dup2`. - Added comments around the PTY stdio wiring and the overlap regression helper to make the fd handoff and controlling-terminal setup easier to follow. ## Verification - `cargo test -p codex-utils-pty` - covers preserved-fd PTY spawn behavior, PTY resize, Python REPL continuity, exec-failure reporting, and the no-stdin pipe path - `cargo test -p codex-shell-escalation` - covers duplicated-fd transfer on the client side and verifies the overlap case by passing a pipe-backed stdin payload through the server-side `dup2` path --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13644). * #14624 * __->__ #13644
This commit is contained in:
@@ -4,6 +4,10 @@ use std::path::Path;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use crate::combine_output_receivers;
|
||||
#[cfg(unix)]
|
||||
use crate::pipe::spawn_process_no_stdin_with_inherited_fds;
|
||||
#[cfg(unix)]
|
||||
use crate::pty::spawn_process_with_inherited_fds;
|
||||
use crate::spawn_pipe_process;
|
||||
use crate::spawn_pipe_process_no_stdin;
|
||||
use crate::spawn_pty_process;
|
||||
@@ -135,6 +139,42 @@ async fn collect_output_until_exit(
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn wait_for_output_contains(
|
||||
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
needle: &str,
|
||||
timeout_ms: u64,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let mut collected = Vec::new();
|
||||
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
let now = tokio::time::Instant::now();
|
||||
let remaining = deadline.saturating_duration_since(now);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(needle) {
|
||||
return Ok(collected);
|
||||
}
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for {needle:?}: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!(
|
||||
"timed out waiting for {needle:?} in PTY output: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
|
||||
async fn wait_for_python_repl_ready(
|
||||
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
timeout_ms: u64,
|
||||
@@ -170,6 +210,58 @@ async fn wait_for_python_repl_ready(
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn wait_for_python_repl_ready_via_probe(
|
||||
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
|
||||
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
timeout_ms: u64,
|
||||
newline: &str,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let mut collected = Vec::new();
|
||||
let marker = "__codex_pty_ready__";
|
||||
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
|
||||
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
writer
|
||||
.send(format!("print('{marker}'){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
let probe_deadline = tokio::time::Instant::now() + probe_window;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline || now >= probe_deadline {
|
||||
break;
|
||||
}
|
||||
let remaining = std::cmp::min(
|
||||
deadline.saturating_duration_since(now),
|
||||
probe_deadline.saturating_duration_since(now),
|
||||
);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!(
|
||||
"timed out waiting for Python REPL readiness in PTY: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn process_exists(pid: i32) -> anyhow::Result<bool> {
|
||||
let result = unsafe { libc::kill(pid, 0) };
|
||||
@@ -209,16 +301,26 @@ async fn wait_for_marker_pid(
|
||||
collected.extend_from_slice(&chunk);
|
||||
|
||||
let text = String::from_utf8_lossy(&collected);
|
||||
if let Some(marker_idx) = text.find(marker) {
|
||||
let suffix = &text[marker_idx + marker.len()..];
|
||||
let digits: String = suffix
|
||||
let mut offset = 0;
|
||||
while let Some(pos) = text[offset..].find(marker) {
|
||||
let marker_start = offset + pos;
|
||||
let suffix = &text[marker_start + marker.len()..];
|
||||
let digits_len = suffix
|
||||
.chars()
|
||||
.skip_while(|ch| !ch.is_ascii_digit())
|
||||
.take_while(char::is_ascii_digit)
|
||||
.collect();
|
||||
if !digits.is_empty() {
|
||||
return Ok(digits.parse()?);
|
||||
.map(char::len_utf8)
|
||||
.sum::<usize>();
|
||||
if digits_len == 0 {
|
||||
offset = marker_start + marker.len();
|
||||
continue;
|
||||
}
|
||||
|
||||
let pid_str = &suffix[..digits_len];
|
||||
let trailing = &suffix[digits_len..];
|
||||
if trailing.is_empty() {
|
||||
break;
|
||||
}
|
||||
return Ok(pid_str.parse()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -569,3 +671,276 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_spawn_can_preserve_inherited_fds() -> anyhow::Result<()> {
|
||||
use std::io::Read;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let mut read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let mut env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
env_map.insert(
|
||||
"PRESERVED_FD".to_string(),
|
||||
write_end.as_raw_fd().to_string(),
|
||||
);
|
||||
|
||||
let script = "printf __preserved__ >\"/dev/fd/$PRESERVED_FD\"";
|
||||
let spawned = spawn_process_with_inherited_fds(
|
||||
"/bin/sh",
|
||||
&["-c".to_string(), script.to_string()],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
&[write_end.as_raw_fd()],
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(write_end);
|
||||
|
||||
let (_session, output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let (_, code) = collect_output_until_exit(output_rx, exit_rx, 2_000).await;
|
||||
assert_eq!(code, 0, "expected preserved-fd PTY child to exit cleanly");
|
||||
|
||||
let mut pipe_output = String::new();
|
||||
read_end.read_to_string(&mut pipe_output)?;
|
||||
assert_eq!(pipe_output, "__preserved__");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_preserving_inherited_fds_keeps_python_repl_running() -> anyhow::Result<()> {
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!(
|
||||
"python not found; skipping pty_preserving_inherited_fds_keeps_python_repl_running"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let preserved_fd = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let mut env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
env_map.insert(
|
||||
"PRESERVED_FD".to_string(),
|
||||
preserved_fd.as_raw_fd().to_string(),
|
||||
);
|
||||
|
||||
let spawned = spawn_process_with_inherited_fds(
|
||||
&python,
|
||||
&[],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
&[preserved_fd.as_raw_fd()],
|
||||
)
|
||||
.await?;
|
||||
drop(read_end);
|
||||
drop(preserved_fd);
|
||||
|
||||
let (session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let writer = session.writer_sender();
|
||||
let newline = "\n";
|
||||
let mut output =
|
||||
wait_for_python_repl_ready_via_probe(&writer, &mut output_rx, 5_000, newline).await?;
|
||||
let marker = "__codex_preserved_py_pid:";
|
||||
writer
|
||||
.send(format!("import os; print('{marker}' + str(os.getpid())){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
let python_pid = match wait_for_marker_pid(&mut output_rx, marker, 2_000).await {
|
||||
Ok(pid) => pid,
|
||||
Err(err) => {
|
||||
session.terminate();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
assert!(
|
||||
process_exists(python_pid)?,
|
||||
"expected python pid {python_pid} to stay alive after prompt output"
|
||||
);
|
||||
|
||||
writer.send(format!("exit(){newline}").into_bytes()).await?;
|
||||
let (remaining_output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
|
||||
output.extend_from_slice(&remaining_output);
|
||||
|
||||
assert_eq!(code, 0, "expected python to exit cleanly");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_spawn_with_inherited_fds_reports_exec_failures() -> anyhow::Result<()> {
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawn_result = spawn_process_with_inherited_fds(
|
||||
"/definitely/missing/command",
|
||||
&[],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize::default(),
|
||||
&[write_end.as_raw_fd()],
|
||||
)
|
||||
.await;
|
||||
|
||||
drop(read_end);
|
||||
drop(write_end);
|
||||
|
||||
let err = match spawn_result {
|
||||
Ok(spawned) => {
|
||||
spawned.session.terminate();
|
||||
anyhow::bail!("missing executable unexpectedly spawned");
|
||||
}
|
||||
Err(err) => err,
|
||||
};
|
||||
let err_text = err.to_string();
|
||||
assert!(
|
||||
err_text.contains("No such file")
|
||||
|| err_text.contains("not found")
|
||||
|| err_text.contains("os error 2"),
|
||||
"expected spawn error for missing executable, got: {err_text}",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_spawn_with_inherited_fds_supports_resize() -> anyhow::Result<()> {
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let script =
|
||||
"stty -echo; printf 'start:%s\\n' \"$(stty size)\"; IFS= read _line; printf 'after:%s\\n' \"$(stty size)\"";
|
||||
let spawned = spawn_process_with_inherited_fds(
|
||||
"/bin/sh",
|
||||
&["-c".to_string(), script.to_string()],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
TerminalSize {
|
||||
rows: 31,
|
||||
cols: 101,
|
||||
},
|
||||
&[write_end.as_raw_fd()],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let writer = session.writer_sender();
|
||||
let mut output = wait_for_output_contains(&mut output_rx, "start:31 101\r\n", 5_000).await?;
|
||||
|
||||
session.resize(TerminalSize {
|
||||
rows: 45,
|
||||
cols: 132,
|
||||
})?;
|
||||
writer.send(b"go\n".to_vec()).await?;
|
||||
session.close_stdin();
|
||||
|
||||
let (remaining_output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
|
||||
output.extend_from_slice(&remaining_output);
|
||||
let text = String::from_utf8_lossy(&output);
|
||||
let normalized = text.replace("\r\n", "\n");
|
||||
|
||||
assert!(
|
||||
normalized.contains("after:45 132\n"),
|
||||
"expected resized PTY dimensions in output: {text:?}"
|
||||
);
|
||||
assert_eq!(code, 0, "expected shell to exit cleanly after resize");
|
||||
|
||||
drop(read_end);
|
||||
drop(write_end);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_spawn_no_stdin_can_preserve_inherited_fds() -> anyhow::Result<()> {
|
||||
use std::io::Read;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let mut read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let mut env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
env_map.insert(
|
||||
"PRESERVED_FD".to_string(),
|
||||
write_end.as_raw_fd().to_string(),
|
||||
);
|
||||
|
||||
let script = "printf __pipe_preserved__ >\"/dev/fd/$PRESERVED_FD\"";
|
||||
let spawned = spawn_process_no_stdin_with_inherited_fds(
|
||||
"/bin/sh",
|
||||
&["-c".to_string(), script.to_string()],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
&[write_end.as_raw_fd()],
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(write_end);
|
||||
|
||||
let (_session, output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let (_, code) = collect_output_until_exit(output_rx, exit_rx, 2_000).await;
|
||||
assert_eq!(code, 0, "expected preserved-fd pipe child to exit cleanly");
|
||||
|
||||
let mut pipe_output = String::new();
|
||||
read_end.read_to_string(&mut pipe_output)?;
|
||||
assert_eq!(pipe_output, "__pipe_preserved__");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user