This commit is contained in:
jif-oai
2025-12-17 15:15:43 +00:00
parent 2e7e4f6ea6
commit 0c34d6633f
6 changed files with 212 additions and 17 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1879,6 +1879,7 @@ dependencies = [
"anyhow",
"filedescriptor",
"lazy_static",
"libc",
"log",
"portable-pty",
"shared_library",

View File

@@ -43,12 +43,7 @@ impl ToolsConfig {
let shell_type = if !features.enabled(Feature::ShellTool) {
ConfigShellToolType::Disabled
} else if features.enabled(Feature::UnifiedExec) {
// If ConPTY not supported (for old Windows versions), fallback on ShellCommand.
if codex_utils_pty::conpty_supported() {
ConfigShellToolType::UnifiedExec
} else {
ConfigShellToolType::ShellCommand
}
ConfigShellToolType::UnifiedExec
} else {
model_family.shell_type
};

View File

@@ -460,7 +460,7 @@ impl UnifiedExecSessionManager {
.split_first()
.ok_or(UnifiedExecError::MissingCommandLine)?;
let spawned = codex_utils_pty::spawn_pty_process(
let spawned = codex_utils_pty::spawn_exec_session(
program,
args,
env.cwd.as_path(),

View File

@@ -12,6 +12,9 @@ anyhow = { workspace = true }
portable-pty = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] }
[target.'cfg(unix)'.dependencies]
libc = { workspace = true }
[target.'cfg(windows)'.dependencies]
filedescriptor = "0.8.3"
lazy_static = { workspace = true }

View File

@@ -0,0 +1,180 @@
use std::collections::HashMap;
use std::io;
use std::path::Path;
use std::process::Command as StdCommand;
use std::process::Stdio;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use anyhow::Result;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use super::ExecCommandSession;
use super::SpawnedPty;
pub async fn spawn_piped_process(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedPty> {
if program.is_empty() {
anyhow::bail!("missing program for exec spawn");
}
let program = arg0.as_deref().unwrap_or(program);
let mut command = StdCommand::new(program);
command.args(args);
command.current_dir(cwd);
command.env_clear();
command.envs(env);
command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = command.spawn()?;
let pid = child.id();
let stdin = child.stdin.take().ok_or_else(|| {
anyhow::anyhow!("stdin pipe was unexpectedly not available for exec spawn")
})?;
let stdout = child.stdout.take().ok_or_else(|| {
anyhow::anyhow!("stdout pipe was unexpectedly not available for exec spawn")
})?;
let stderr = child.stderr.take().ok_or_else(|| {
anyhow::anyhow!("stderr pipe was unexpectedly not available for exec spawn")
})?;
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let initial_output_rx = output_tx.subscribe();
// Pipes separate stdout and stderr; merge to match PTY semantics.
let stdout_handle = spawn_pipe_reader(stdout, output_tx.clone());
let stderr_handle = spawn_pipe_reader(stderr, output_tx.clone());
let writer_handle = tokio::task::spawn_blocking(move || {
let mut stdin = stdin;
use std::io::Write;
while let Some(bytes) = writer_rx.blocking_recv() {
let _ = stdin.write_all(&bytes);
let _ = stdin.flush();
}
});
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let exit_code = Arc::new(StdMutex::new(None));
let wait_exit_code = Arc::clone(&exit_code);
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.code().unwrap_or(-1),
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut guard) = wait_exit_code.lock() {
*guard = Some(code);
}
let _ = exit_tx.send(code);
});
let (session, output_rx) = ExecCommandSession::new(
writer_tx,
output_tx,
initial_output_rx,
Box::new(PipedChildKiller::new(pid)),
vec![stdout_handle, stderr_handle],
writer_handle,
wait_handle,
exit_status,
exit_code,
None,
);
Ok(SpawnedPty {
session,
output_rx,
exit_rx,
})
}
fn spawn_pipe_reader<R: std::io::Read + Send + 'static>(
mut reader: R,
output_tx: broadcast::Sender<Vec<u8>>,
) -> JoinHandle<()> {
tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8_192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
})
}
#[derive(Debug)]
struct PipedChildKiller {
pid: u32,
}
impl PipedChildKiller {
fn new(pid: u32) -> Self {
Self { pid }
}
}
impl portable_pty::ChildKiller for PipedChildKiller {
fn kill(&mut self) -> io::Result<()> {
terminate_pid(self.pid)
}
fn clone_killer(&self) -> Box<dyn portable_pty::ChildKiller + Send + Sync> {
Box::new(Self { pid: self.pid })
}
}
#[cfg(unix)]
fn terminate_pid(pid: u32) -> io::Result<()> {
let result = unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
if result == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
#[cfg(windows)]
fn terminate_pid(pid: u32) -> io::Result<()> {
use winapi::shared::minwindef::FALSE;
use winapi::um::handleapi::CloseHandle;
use winapi::um::processthreadsapi::OpenProcess;
use winapi::um::processthreadsapi::TerminateProcess;
use winapi::um::winnt::PROCESS_TERMINATE;
unsafe {
let handle = OpenProcess(PROCESS_TERMINATE, FALSE, pid);
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let ok = TerminateProcess(handle, 1) != 0;
let err = io::Error::last_os_error();
CloseHandle(handle);
if ok {
Ok(())
} else {
Err(err)
}
}
}

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
mod fallback;
#[cfg(windows)]
mod win;
@@ -33,14 +34,14 @@ pub struct ExecCommandSession {
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
killer: StdMutex<Option<Box<dyn portable_pty::ChildKiller + Send + Sync>>>,
reader_handle: StdMutex<Option<JoinHandle<()>>>,
reader_handles: StdMutex<Vec<JoinHandle<()>>>,
writer_handle: StdMutex<Option<JoinHandle<()>>>,
wait_handle: StdMutex<Option<JoinHandle<()>>>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
// PtyPair must be preserved because the process will receive Control+C if the
// slave is closed
_pair: StdMutex<PtyPairWrapper>,
_pair: StdMutex<Option<PtyPairWrapper>>,
}
impl fmt::Debug for PtyPairWrapper {
@@ -56,19 +57,19 @@ impl ExecCommandSession {
output_tx: broadcast::Sender<Vec<u8>>,
initial_output_rx: broadcast::Receiver<Vec<u8>>,
killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
reader_handle: JoinHandle<()>,
reader_handles: Vec<JoinHandle<()>>,
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
pair: PtyPairWrapper,
pair: Option<PtyPairWrapper>,
) -> (Self, broadcast::Receiver<Vec<u8>>) {
(
Self {
writer_tx,
output_tx,
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_handles: StdMutex::new(reader_handles),
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
@@ -102,8 +103,8 @@ impl ExecCommandSession {
}
}
if let Ok(mut h) = self.reader_handle.lock() {
if let Some(handle) = h.take() {
if let Ok(mut handles) = self.reader_handles.lock() {
for handle in handles.drain(..) {
handle.abort();
}
}
@@ -152,6 +153,20 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
native_pty_system()
}
pub async fn spawn_exec_session(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedPty> {
if cfg!(windows) && !conpty_supported() {
return fallback::spawn_piped_process(program, args, cwd, env, arg0).await;
}
spawn_pty_process(program, args, cwd, env, arg0).await
}
pub async fn spawn_pty_process(
program: &str,
args: &[String],
@@ -171,7 +186,8 @@ pub async fn spawn_pty_process(
pixel_height: 0,
})?;
let mut command_builder = CommandBuilder::new(arg0.as_ref().unwrap_or(&program.to_string()));
let program = arg0.as_deref().unwrap_or(program);
let mut command_builder = CommandBuilder::new(program);
command_builder.cwd(cwd);
command_builder.env_clear();
for arg in args {
@@ -255,12 +271,12 @@ pub async fn spawn_pty_process(
output_tx,
initial_output_rx,
killer,
reader_handle,
vec![reader_handle],
writer_handle,
wait_handle,
exit_status,
exit_code,
pair,
Some(pair),
);
Ok(SpawnedPty {