mirror of
https://github.com/openai/codex.git
synced 2026-05-19 10:43:38 +00:00
## Summary This is the runtime/foundation half of the Windows sandbox unified-exec work. - add Windows sandbox `unified_exec` session support in `windows-sandbox-rs` for both: - the legacy restricted-token backend - the elevated runner backend - extend the PTY/process runtime so driver-backed sessions can support: - stdin streaming - stdout/stderr separation - exit propagation - PTY resize hooks - add Windows sandbox runtime coverage in `codex-windows-sandbox` / `codex-utils-pty` This PR does **not** enable Windows sandbox `UnifiedExec` for product callers yet because hooking this up to app-server comes in the next PR. Windows sandbox advertising is intentionally kept aligned with `main`, so sandboxed Windows callers still fall back to `ShellCommand`. This PR isolates the runtime/session layer so it can be reviewed independently from product-surface enablement. --------- Co-authored-by: jif-oai <jif@openai.com> Co-authored-by: Codex <noreply@openai.com>
411 lines
13 KiB
Rust
411 lines
13 KiB
Rust
use core::fmt;
|
|
use std::io;
|
|
#[cfg(unix)]
|
|
use std::os::fd::RawFd;
|
|
use std::sync::Arc;
|
|
use std::sync::Mutex as StdMutex;
|
|
use std::sync::atomic::AtomicBool;
|
|
|
|
use anyhow::anyhow;
|
|
use portable_pty::MasterPty;
|
|
use portable_pty::PtySize;
|
|
use portable_pty::SlavePty;
|
|
use tokio::sync::broadcast;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::sync::watch;
|
|
use tokio::task::AbortHandle;
|
|
use tokio::task::JoinHandle;
|
|
|
|
pub(crate) trait ChildTerminator: Send + Sync {
|
|
fn kill(&mut self) -> io::Result<()>;
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
|
pub struct TerminalSize {
|
|
pub rows: u16,
|
|
pub cols: u16,
|
|
}
|
|
|
|
impl Default for TerminalSize {
|
|
fn default() -> Self {
|
|
Self { rows: 24, cols: 80 }
|
|
}
|
|
}
|
|
|
|
impl From<TerminalSize> for PtySize {
|
|
fn from(value: TerminalSize) -> Self {
|
|
Self {
|
|
rows: value.rows,
|
|
cols: value.cols,
|
|
pixel_width: 0,
|
|
pixel_height: 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
pub(crate) trait PtyHandleKeepAlive: Send {}
|
|
|
|
#[cfg(unix)]
|
|
impl<T: Send + ?Sized> PtyHandleKeepAlive for T {}
|
|
|
|
pub(crate) enum PtyMasterHandle {
|
|
Resizable(Box<dyn MasterPty + Send>),
|
|
#[cfg(unix)]
|
|
Opaque {
|
|
raw_fd: RawFd,
|
|
_handle: Box<dyn PtyHandleKeepAlive>,
|
|
},
|
|
}
|
|
|
|
pub struct PtyHandles {
|
|
pub _slave: Option<Box<dyn SlavePty + Send>>,
|
|
pub(crate) _master: PtyMasterHandle,
|
|
}
|
|
|
|
impl fmt::Debug for PtyHandles {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("PtyHandles").finish()
|
|
}
|
|
}
|
|
|
|
/// Callback used by driver-backed sessions to resize a PTY-like backend when
|
|
/// there is no local `PtyHandles` instance to resize directly.
|
|
type ResizeFn = Box<dyn FnMut(TerminalSize) -> anyhow::Result<()> + Send>;
|
|
|
|
/// Handle for driving an interactive process (PTY or pipe).
|
|
pub struct ProcessHandle {
|
|
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
|
|
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
|
|
reader_handle: StdMutex<Option<JoinHandle<()>>>,
|
|
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
|
|
writer_handle: StdMutex<Option<JoinHandle<()>>>,
|
|
wait_handle: StdMutex<Option<JoinHandle<()>>>,
|
|
exit_status: Arc<AtomicBool>,
|
|
exit_code: Arc<StdMutex<Option<i32>>>,
|
|
// PtyHandles must be preserved because the process will receive Control+C if the
|
|
// slave is closed
|
|
_pty_handles: StdMutex<Option<PtyHandles>>,
|
|
// Optional resize hook for driver-backed sessions that proxy PTY control to
|
|
// another backend instead of owning local PTY handles.
|
|
resizer: StdMutex<Option<ResizeFn>>,
|
|
}
|
|
|
|
impl fmt::Debug for ProcessHandle {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("ProcessHandle").finish()
|
|
}
|
|
}
|
|
|
|
impl ProcessHandle {
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(crate) fn new(
|
|
writer_tx: mpsc::Sender<Vec<u8>>,
|
|
killer: Box<dyn ChildTerminator>,
|
|
reader_handle: JoinHandle<()>,
|
|
reader_abort_handles: Vec<AbortHandle>,
|
|
writer_handle: JoinHandle<()>,
|
|
wait_handle: JoinHandle<()>,
|
|
exit_status: Arc<AtomicBool>,
|
|
exit_code: Arc<StdMutex<Option<i32>>>,
|
|
pty_handles: Option<PtyHandles>,
|
|
resizer: Option<ResizeFn>,
|
|
) -> Self {
|
|
Self {
|
|
writer_tx: StdMutex::new(Some(writer_tx)),
|
|
killer: StdMutex::new(Some(killer)),
|
|
reader_handle: StdMutex::new(Some(reader_handle)),
|
|
reader_abort_handles: StdMutex::new(reader_abort_handles),
|
|
writer_handle: StdMutex::new(Some(writer_handle)),
|
|
wait_handle: StdMutex::new(Some(wait_handle)),
|
|
exit_status,
|
|
exit_code,
|
|
_pty_handles: StdMutex::new(pty_handles),
|
|
resizer: StdMutex::new(resizer),
|
|
}
|
|
}
|
|
|
|
/// Returns a channel sender for writing raw bytes to the child stdin.
|
|
pub fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
|
|
if let Ok(writer_tx) = self.writer_tx.lock()
|
|
&& let Some(writer_tx) = writer_tx.as_ref()
|
|
{
|
|
return writer_tx.clone();
|
|
}
|
|
|
|
let (writer_tx, writer_rx) = mpsc::channel(1);
|
|
drop(writer_rx);
|
|
writer_tx
|
|
}
|
|
|
|
/// True if the child process has exited.
|
|
pub fn has_exited(&self) -> bool {
|
|
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
|
|
}
|
|
|
|
/// Returns the exit code if known.
|
|
pub fn exit_code(&self) -> Option<i32> {
|
|
self.exit_code.lock().ok().and_then(|guard| *guard)
|
|
}
|
|
|
|
/// Resize the PTY in character cells.
|
|
pub fn resize(&self, size: TerminalSize) -> anyhow::Result<()> {
|
|
{
|
|
let handles = self
|
|
._pty_handles
|
|
.lock()
|
|
.map_err(|_| anyhow!("failed to lock PTY handles"))?;
|
|
if let Some(handles) = handles.as_ref() {
|
|
return match &handles._master {
|
|
PtyMasterHandle::Resizable(master) => master.resize(size.into()),
|
|
#[cfg(unix)]
|
|
PtyMasterHandle::Opaque { raw_fd, .. } => resize_raw_pty(*raw_fd, size),
|
|
};
|
|
}
|
|
}
|
|
|
|
let mut resizer = self
|
|
.resizer
|
|
.lock()
|
|
.map_err(|_| anyhow!("failed to lock PTY resizer"))?;
|
|
if let Some(resizer) = resizer.as_mut() {
|
|
resizer(size)
|
|
} else {
|
|
Err(anyhow!("process is not attached to a PTY"))
|
|
}
|
|
}
|
|
|
|
/// Close the child's stdin channel.
|
|
pub fn close_stdin(&self) {
|
|
if let Ok(mut writer_tx) = self.writer_tx.lock() {
|
|
writer_tx.take();
|
|
}
|
|
}
|
|
|
|
/// Attempts to kill the child while leaving the reader/writer tasks alive
|
|
/// so callers can still drain output until EOF.
|
|
pub fn request_terminate(&self) {
|
|
if let Ok(mut killer_opt) = self.killer.lock()
|
|
&& let Some(mut killer) = killer_opt.take()
|
|
{
|
|
let _ = killer.kill();
|
|
}
|
|
}
|
|
|
|
/// Attempts to kill the child and abort helper tasks.
|
|
pub fn terminate(&self) {
|
|
self.request_terminate();
|
|
|
|
if let Ok(mut h) = self.reader_handle.lock()
|
|
&& let Some(handle) = h.take()
|
|
{
|
|
handle.abort();
|
|
}
|
|
if let Ok(mut handles) = self.reader_abort_handles.lock() {
|
|
for handle in handles.drain(..) {
|
|
handle.abort();
|
|
}
|
|
}
|
|
if let Ok(mut h) = self.writer_handle.lock()
|
|
&& let Some(handle) = h.take()
|
|
{
|
|
handle.abort();
|
|
}
|
|
if let Ok(mut h) = self.wait_handle.lock()
|
|
&& let Some(handle) = h.take()
|
|
{
|
|
handle.abort();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for ProcessHandle {
|
|
fn drop(&mut self) {
|
|
self.terminate();
|
|
}
|
|
}
|
|
|
|
/// Adapts a closure into a `ChildTerminator` implementation.
|
|
struct ClosureTerminator {
|
|
inner: Option<Box<dyn FnMut() + Send + Sync>>,
|
|
}
|
|
|
|
impl ChildTerminator for ClosureTerminator {
|
|
fn kill(&mut self) -> io::Result<()> {
|
|
if let Some(inner) = self.inner.as_mut() {
|
|
(inner)();
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
fn resize_raw_pty(raw_fd: RawFd, size: TerminalSize) -> anyhow::Result<()> {
|
|
let mut winsize = libc::winsize {
|
|
ws_row: size.rows,
|
|
ws_col: size.cols,
|
|
ws_xpixel: 0,
|
|
ws_ypixel: 0,
|
|
};
|
|
let result = unsafe { libc::ioctl(raw_fd, libc::TIOCSWINSZ, &mut winsize) };
|
|
if result == -1 {
|
|
return Err(std::io::Error::last_os_error().into());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Combine split stdout/stderr receivers into a single broadcast receiver.
|
|
pub fn combine_output_receivers(
|
|
mut stdout_rx: mpsc::Receiver<Vec<u8>>,
|
|
mut stderr_rx: mpsc::Receiver<Vec<u8>>,
|
|
) -> broadcast::Receiver<Vec<u8>> {
|
|
let (combined_tx, combined_rx) = broadcast::channel(256);
|
|
tokio::spawn(async move {
|
|
let mut stdout_open = true;
|
|
let mut stderr_open = true;
|
|
|
|
loop {
|
|
tokio::select! {
|
|
stdout = stdout_rx.recv(), if stdout_open => match stdout {
|
|
Some(chunk) => {
|
|
let _ = combined_tx.send(chunk);
|
|
}
|
|
None => {
|
|
stdout_open = false;
|
|
}
|
|
},
|
|
stderr = stderr_rx.recv(), if stderr_open => match stderr {
|
|
Some(chunk) => {
|
|
let _ = combined_tx.send(chunk);
|
|
}
|
|
None => {
|
|
stderr_open = false;
|
|
}
|
|
},
|
|
else => break,
|
|
}
|
|
}
|
|
});
|
|
combined_rx
|
|
}
|
|
|
|
/// Return value from PTY or pipe spawn helpers.
|
|
#[derive(Debug)]
|
|
pub struct SpawnedProcess {
|
|
pub session: ProcessHandle,
|
|
pub stdout_rx: mpsc::Receiver<Vec<u8>>,
|
|
pub stderr_rx: mpsc::Receiver<Vec<u8>>,
|
|
pub exit_rx: oneshot::Receiver<i32>,
|
|
}
|
|
|
|
/// Driver-backed process handles for non-standard spawn backends.
|
|
pub struct ProcessDriver {
|
|
pub writer_tx: mpsc::Sender<Vec<u8>>,
|
|
pub stdout_rx: broadcast::Receiver<Vec<u8>>,
|
|
pub stderr_rx: Option<broadcast::Receiver<Vec<u8>>>,
|
|
pub exit_rx: oneshot::Receiver<i32>,
|
|
pub terminator: Option<Box<dyn FnMut() + Send + Sync>>,
|
|
pub writer_handle: Option<JoinHandle<()>>,
|
|
pub resizer: Option<ResizeFn>,
|
|
}
|
|
|
|
/// Build a `SpawnedProcess` from a driver that supplies stdin/output/exit channels.
|
|
pub fn spawn_from_driver(driver: ProcessDriver) -> SpawnedProcess {
|
|
let ProcessDriver {
|
|
writer_tx,
|
|
stdout_rx: stdout_driver_rx,
|
|
stderr_rx: mut stderr_driver_rx,
|
|
exit_rx,
|
|
terminator,
|
|
writer_handle,
|
|
resizer,
|
|
} = driver;
|
|
|
|
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(256);
|
|
let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(256);
|
|
let (exit_seen_tx, exit_seen_rx) = watch::channel(false);
|
|
let spawn_stream_reader =
|
|
|mut output_rx: broadcast::Receiver<Vec<u8>>,
|
|
output_tx: mpsc::Sender<Vec<u8>>,
|
|
mut exit_seen_rx: watch::Receiver<bool>| {
|
|
tokio::spawn(async move {
|
|
let mut process_exited = false;
|
|
loop {
|
|
let recv_result = if process_exited {
|
|
match tokio::time::timeout(
|
|
std::time::Duration::from_millis(200),
|
|
output_rx.recv(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(result) => result,
|
|
Err(_) => break,
|
|
}
|
|
} else {
|
|
tokio::select! {
|
|
_ = exit_seen_rx.changed() => {
|
|
process_exited = *exit_seen_rx.borrow();
|
|
continue;
|
|
}
|
|
result = output_rx.recv() => result,
|
|
}
|
|
};
|
|
match recv_result {
|
|
Ok(chunk) => {
|
|
if output_tx.send(chunk).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
|
}
|
|
}
|
|
})
|
|
};
|
|
let reader_handle = spawn_stream_reader(stdout_driver_rx, stdout_tx, exit_seen_rx.clone());
|
|
let stderr_reader_handle = stderr_driver_rx
|
|
.take()
|
|
.map(|rx| spawn_stream_reader(rx, stderr_tx, exit_seen_rx));
|
|
|
|
let writer_handle = writer_handle.unwrap_or_else(|| tokio::spawn(async {}));
|
|
|
|
let (exit_tx, exit_rx_out) = oneshot::channel::<i32>();
|
|
let exit_status = Arc::new(AtomicBool::new(false));
|
|
let wait_exit_status = Arc::clone(&exit_status);
|
|
let exit_code = Arc::new(StdMutex::new(None));
|
|
let wait_exit_code = Arc::clone(&exit_code);
|
|
let wait_handle = tokio::spawn(async move {
|
|
let code = exit_rx.await.unwrap_or(-1);
|
|
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
|
|
if let Ok(mut guard) = wait_exit_code.lock() {
|
|
*guard = Some(code);
|
|
}
|
|
let _ = exit_seen_tx.send(true);
|
|
let _ = exit_tx.send(code);
|
|
});
|
|
|
|
let handle = ProcessHandle::new(
|
|
writer_tx,
|
|
Box::new(ClosureTerminator { inner: terminator }),
|
|
reader_handle,
|
|
stderr_reader_handle
|
|
.map(|handle| handle.abort_handle())
|
|
.into_iter()
|
|
.collect(),
|
|
writer_handle,
|
|
wait_handle,
|
|
exit_status,
|
|
exit_code,
|
|
/*pty_handles*/ None,
|
|
resizer,
|
|
);
|
|
|
|
SpawnedProcess {
|
|
session: handle,
|
|
stdout_rx,
|
|
stderr_rx,
|
|
exit_rx: exit_rx_out,
|
|
}
|
|
}
|