draft broken

This commit is contained in:
Ryan Ragona
2025-04-26 09:41:06 -07:00
parent f2b7b14284
commit 1d0d725494
3 changed files with 229 additions and 83 deletions

View File

@@ -45,7 +45,7 @@ serde_yaml = "0.9"
names = "0.14"
# unix-only process helpers
nix = { version = "0.27", default-features = false, features = ["process", "signal", "term"] }
nix = { version = "0.27", default-features = false, features = ["process", "signal", "term", "fs"] }
# Re-use the codex-exec library for its CLI definition
codex_exec = { package = "codex-exec", path = "../exec" }

View File

@@ -56,6 +56,7 @@ impl Cli {
Commands::Delete(x) => x.run().await,
Commands::Logs(x) => x.run().await,
Commands::List(x) => x.run().await,
Commands::Mux(x) => x.run().await,
}
}
}
@@ -72,6 +73,10 @@ enum Commands {
Logs(LogsCmd),
/// List all known sessions.
List(ListCmd),
/// Internal helper process: PTY multiplexer daemon (hidden).
#[command(hide = true, name = "__mux")]
Mux(MuxCmd),
}
// -----------------------------------------------------------------------------
@@ -143,7 +148,7 @@ impl CreateCmd {
}
AgentKind::Tui(cmd) => {
let args = build_tui_args(&cmd.tui_cli);
let child = spawn::spawn_tui(&paths, &args)?;
let child = spawn::spawn_tui(&paths, &args).await?;
let preview = cmd.tui_cli.prompt.as_ref().map(|p| truncate_preview(p));
(child.id().unwrap_or_default(), preview, store::SessionKind::Tui)
}
@@ -164,6 +169,30 @@ impl CreateCmd {
}
}
// -----------------------------------------------------------------------------
// internal mux helper sub-command (hidden)
#[derive(Args)]
pub struct MuxCmd {
/// Raw PTY master file descriptor passed from the parent process.
#[arg(long)]
fd: i32,
/// Path to the Unix-domain socket that clients attach to.
#[arg(long)]
sock: std::path::PathBuf,
/// Path to the binary stdout log file.
#[arg(long)]
log: std::path::PathBuf,
}
impl MuxCmd {
pub async fn run(self) -> Result<()> {
crate::spawn::mux_main(self.fd, self.sock, self.log).await
}
}
fn truncate_preview(p: &str) -> String {
let slice: String = p.chars().take(40).collect();
if p.len() > 40 {

View File

@@ -1,18 +1,21 @@
//! Spawn detached Codex agent processes.
//! Spawn detached Codex agent processes (exec, repl, tui).
//!
//! The session manager supports multiple agent flavors. `codex-exec` requires no interactive
//! stdin so we can safely redirect it to `/dev/null`. `codex-repl` however needs to read user
//! input after it is launched. The background process therefore receives a **named pipe** as
//! its standard input which later `codex-session attach` commands can open for writing.
//! The *exec* and *repl* helpers reuse the original FIFO/pipe strategy while
//! the new **tui** flavour allocates a pseudo-terminal so the crossterm /
//! ratatui application sees a *real* tty. A small socket fan-out forwards raw
//! bytes between the PTY and every `codex-session attach` client.
use crate::store::Paths;
use anyhow::Context;
use anyhow::Result;
use anyhow::{Context, Result};
use std::fs::OpenOptions;
use tokio::process::Child;
use tokio::process::Command;
use tokio::process::{Child, Command};
#[cfg(unix)]
use std::os::unix::process::CommandExt; // for pre_exec
// -----------------------------------------------------------------------------
// exec non-interactive batch agent (stdin = /dev/null)
/// Spawn a `codex-exec` agent.
pub fn spawn_exec(paths: &Paths, exec_args: &[String]) -> Result<Child> {
#[cfg(unix)]
{
@@ -77,9 +80,9 @@ pub fn spawn_exec(paths: &Paths, exec_args: &[String]) -> Result<Child> {
}
}
/// Spawn a `codex-repl` agent. The process is detached like `spawn_exec` but its standard input
/// is connected to a named pipe inside the session directory so additional CLI instances can
/// attach later and feed user input.
// -----------------------------------------------------------------------------
// repl interactive but **line-oriented** (FIFO for stdin)
pub fn spawn_repl(paths: &Paths, repl_args: &[String]) -> Result<Child> {
#[cfg(unix)]
{
@@ -93,16 +96,14 @@ pub fn spawn_repl(paths: &Paths, repl_args: &[String]) -> Result<Child> {
let res = unsafe { libc::mkfifo(c_path.as_ptr(), 0o600) };
if res != 0 {
let err = std::io::Error::last_os_error();
// Ignore EEXIST if some race created it first.
if err.kind() != io::ErrorKind::AlreadyExists {
return Err(err).context("mkfifo failed");
}
}
}
// Open the FIFO read-write so `open()` does **not** block even though no external writer
// is connected yet. Keeping the write end open inside the child prevents an EOF on the
// read end while no `attach` session is active.
// Open the FIFO read-write so `open()` does **not** block even though
// no external writer is connected yet.
let stdin = OpenOptions::new()
.read(true)
.write(true)
@@ -139,61 +140,54 @@ pub fn spawn_repl(paths: &Paths, repl_args: &[String]) -> Result<Child> {
#[cfg(windows)]
{
anyhow::bail!("codex-repl background sessions are not yet supported on Windows");
anyhow::bail!("codex-repl sessions are not yet supported on Windows");
}
}
/// Spawn a `codex-tui` agent **inside a pseudo-terminal (pty)** so that a later
/// `codex-session attach` command can hook up an interactive terminal. The
/// current implementation is intentionally minimal: it only takes care of
/// running the agent detached in the background and redirecting the master
/// side of the pty to `stdout.log`. A future patch will extend this with a
/// proper multi-client socket fan-out as outlined in the design document the
/// extra indirection is *not* required for compilation tests.
pub fn spawn_tui(paths: &Paths, tui_args: &[String]) -> Result<Child> {
// -----------------------------------------------------------------------------
// tui full terminal UI (PTY + socket fan-out)
use bytes::Bytes;
#[cfg(unix)]
use {
nix::unistd::dup,
std::os::unix::io::{FromRawFd, IntoRawFd, RawFd},
};
/// Spawn `codex-tui` inside a pseudo-terminal and start the background
/// multiplexer so future `attach` commands can talk to it.
pub async fn spawn_tui(paths: &Paths, tui_args: &[String]) -> Result<Child> {
#[cfg(unix)]
{
use std::io;
use std::os::unix::io::{FromRawFd, IntoRawFd, RawFd};
// Allocate a new pty.
let pty = nix::pty::openpty(None, None).context("failed to open pty")?;
// 1. PTY allocation ---------------------------------------------------
let pty = nix::pty::openpty(None, None).context("openpty failed")?;
// Safe because we immediately hand the raw fds to Stdio which takes
// ownership.
// Extract *raw* fds from the OwnedFd handles returned by nix.
let slave_fd: RawFd = pty.slave.into_raw_fd();
let master_fd: RawFd = pty.master.into_raw_fd();
// Helper to wrap a raw fd into a Stdio object (takes ownership).
let make_stdio_from_fd = |fd: RawFd| unsafe { std::process::Stdio::from_raw_fd(fd) };
// Ensure master_fd is inheritable (clear FD_CLOEXEC)
{
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
let _ = fcntl(master_fd, FcntlArg::F_SETFD(FdFlag::empty()));
}
// SAFETY: libc::dup returns a new fd or -1 on error (checked).
let dup_fd = |fd: RawFd| -> Result<RawFd> {
let new_fd = unsafe { libc::dup(fd) };
if new_fd == -1 {
Err(anyhow::anyhow!(std::io::Error::last_os_error()))
} else {
Ok(new_fd)
}
};
// 2. Spawn codex-tui --------------------------------------------------
let make_stdio = |fd: RawFd| unsafe { std::process::Stdio::from_raw_fd(fd) };
let stdin = make_stdio(dup(slave_fd)?);
let stdout = make_stdio(dup(slave_fd)?);
let stderr = make_stdio(slave_fd);
let stdin = make_stdio_from_fd(dup_fd(slave_fd)?);
let stdout = make_stdio_from_fd(dup_fd(slave_fd)?);
let stderr = make_stdio_from_fd(slave_fd);
// Spawn the codex-tui process *detached* from our controlling tty so
// the background session survives once the `create` CLI exits.
let mut cmd = Command::new("codex-tui");
cmd.args(tui_args)
let mut tui_cmd = Command::new("codex-tui");
tui_cmd.args(tui_args)
.stdin(stdin)
.stdout(stdout)
.stderr(stderr);
unsafe {
cmd.pre_exec(|| {
// Create new session (like setsid()) so the child is not tied
// to the CLI process.
tui_cmd.pre_exec(|| {
if libc::setsid() == -1 {
return Err(io::Error::last_os_error());
}
@@ -202,39 +196,162 @@ pub fn spawn_tui(paths: &Paths, tui_args: &[String]) -> Result<Child> {
});
}
// Spawn child first so that we know its PID for metadata.
let child = cmd.spawn().context("failed to spawn codex-tui")?;
let child = tui_cmd.spawn().context("failed to spawn codex-tui")?;
// ------------ background copy: master → stdout.log ---------------
// Turn the master fd into a std::fs::File which **owns** the fd.
let master_file = unsafe { std::fs::File::from_raw_fd(master_fd) };
let mut log_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&paths.stdout)?;
// 3. Spawn mux helper process ---------------------------------------
// Spawn blocking thread instead of async; simpler and good enough for
// the build-time smoke tests.
std::thread::spawn(move || {
use std::io::{Read, Write};
let mut r = master_file;
let mut buf = [0u8; 4096];
loop {
match r.read(&mut buf) {
Ok(0) => break, // eof
Ok(n) => {
let _ = log_file.write_all(&buf[..n]);
}
Err(_) => break,
}
}
});
let sock_path = paths.dir.join("sock");
if sock_path.exists() {
let _ = std::fs::remove_file(&sock_path);
}
let current_exe = std::env::current_exe()?;
let mut mux_cmd = std::process::Command::new(current_exe);
mux_cmd.arg("__mux")
.arg("--fd").arg(format!("{master_fd}"))
.arg("--sock").arg(&sock_path)
.arg("--log").arg(&paths.stdout)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
// Detach mux process (own session)
unsafe {
mux_cmd.pre_exec(|| {
libc::setsid();
Ok(())
});
}
let _ = mux_cmd.spawn().context("failed to spawn mux helper")?;
Ok(child)
}
#[cfg(windows)]
#[cfg(not(unix))]
{
anyhow::bail!("codex-tui sessions are not yet supported on Windows");
anyhow::bail!("tui sessions are only supported on Unix right now");
}
}
#[cfg(unix)]
async fn spawn_client(
sock: tokio::net::UnixStream,
pty_write_fd: RawFd,
tx: &tokio::sync::broadcast::Sender<Bytes>,
) {
use tokio::io::AsyncWriteExt;
let (mut s_read, mut s_write) = sock.into_split();
// Clone PTY master *write* side for this client
let pty_write = unsafe {
tokio::fs::File::from_std(std::fs::File::from_raw_fd(pty_write_fd))
};
let mut pty_write = pty_write;
// subscribe
let mut rx = tx.subscribe();
// socket → pty
let to_pty = tokio::spawn(async move {
tokio::io::copy(&mut s_read, &mut pty_write).await.ok();
});
// pty broadcast → socket
let from_pty = tokio::spawn(async move {
while let Ok(bytes) = rx.recv().await {
if s_write.write_all(&bytes).await.is_err() {
break;
}
}
});
let _ = tokio::join!(to_pty, from_pty);
}
/// Actual multiplexer event loop that runs inside the forked daemon process.
#[cfg(unix)]
#[cfg(unix)]
pub async fn mux_main(master_fd: RawFd, sock_path: std::path::PathBuf, stdout_log: std::path::PathBuf) -> anyhow::Result<()> {
use tokio::{net::UnixListener, sync::broadcast};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// Bind socket (should succeed; stale already removed).
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
eprintln!("socket bind failed: {e}");
return Ok(());
}
};
// Async read handle for PTY master.
let master_read = unsafe {
tokio::fs::File::from_std(std::fs::File::from_raw_fd(dup(master_fd).expect("dup master")))
};
// binary log file
let mut log_file = match tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&stdout_log)
.await
{
Ok(f) => f,
Err(e) => {
eprintln!("log open failed: {e}");
return Ok(());
}
};
let (tx, _) = broadcast::channel::<Bytes>(64);
// Reader task
let tx_read = tx.clone();
tokio::spawn(async move {
let mut buf = [0u8; 4096];
let mut r = master_read;
loop {
match r.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let _ = log_file.write_all(&buf[..n]).await;
let _ = tx_read.send(Bytes::copy_from_slice(&buf[..n]));
}
Err(e) => {
eprintln!("pty read error: {e}");
break;
}
}
}
});
// Accept-loop
loop {
match listener.accept().await {
Ok((sock, _)) => {
match dup(master_fd) {
Ok(fd) => {
let tx_c = tx.clone();
tokio::spawn(async move {
spawn_client(sock, fd, &tx_c).await;
});
}
Err(e) => eprintln!("dup failed: {e}"),
}
}
Err(e) => {
eprintln!("accept error: {e}");
break;
}
}
}
Ok(())
}
#[cfg(not(unix))]
pub async fn mux_main(_fd: i32, _sock: std::path::PathBuf, _log: std::path::PathBuf) -> anyhow::Result<()> {
anyhow::bail!("tui sessions are only supported on unix");
}