diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d7fce5bd7a..8ea52e9304 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1979,6 +1979,26 @@ dependencies = [ "url", ] +[[package]] +name = "codex-app-server-daemon" +version = "0.0.0" +dependencies = [ + "anyhow", + "codex-app-server-protocol", + "codex-app-server-transport", + "codex-core", + "codex-uds", + "futures", + "libc", + "pretty_assertions", + "reqwest", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-tungstenite", +] + [[package]] name = "codex-app-server-protocol" version = "0.0.0" @@ -2206,6 +2226,7 @@ dependencies = [ "clap", "clap_complete", "codex-app-server", + "codex-app-server-daemon", "codex-app-server-protocol", "codex-app-server-test-client", "codex-arg0", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index adbcdb14ea..9e203e9f8a 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -11,6 +11,7 @@ members = [ "async-utils", "app-server", "app-server-transport", + "app-server-daemon", "app-server-client", "app-server-protocol", "app-server-test-client", @@ -131,6 +132,7 @@ codex-api = { path = "codex-api" } codex-aws-auth = { path = "aws-auth" } codex-app-server = { path = "app-server" } codex-app-server-transport = { path = "app-server-transport" } +codex-app-server-daemon = { path = "app-server-daemon" } codex-app-server-client = { path = "app-server-client" } codex-app-server-protocol = { path = "app-server-protocol" } codex-app-server-test-client = { path = "app-server-test-client" } diff --git a/codex-rs/app-server-daemon/BUILD.bazel b/codex-rs/app-server-daemon/BUILD.bazel new file mode 100644 index 0000000000..1bca6d55db --- /dev/null +++ b/codex-rs/app-server-daemon/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "app-server-daemon", + crate_name = "codex_app_server_daemon", +) diff --git a/codex-rs/app-server-daemon/Cargo.toml b/codex-rs/app-server-daemon/Cargo.toml new file mode 100644 index 0000000000..5085d6bd7c --- /dev/null +++ b/codex-rs/app-server-daemon/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "codex-app-server-daemon" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lib] +name = "codex_app_server_daemon" +path = "src/lib.rs" +doctest = false + +[lints] +workspace = true + +[dependencies] +anyhow = { workspace = true } +codex-app-server-protocol = { workspace = true } +codex-app-server-transport = { workspace = true } +codex-core = { workspace = true } +codex-uds = { workspace = true } +futures = { workspace = true } +libc = { workspace = true } +reqwest = { workspace = true, features = ["rustls-tls"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tokio = { workspace = true, features = [ + "fs", + "io-util", + "macros", + "process", + "rt-multi-thread", + "signal", + "time", +] } +tokio-tungstenite = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tempfile = { workspace = true } diff --git a/codex-rs/app-server-daemon/README.md b/codex-rs/app-server-daemon/README.md new file mode 100644 index 0000000000..c343d5c849 --- /dev/null +++ b/codex-rs/app-server-daemon/README.md @@ -0,0 +1,104 @@ +# codex-app-server-daemon + +> `codex-app-server-daemon` is experimental and its lifecycle contract may +> change while the remote-management flow is still being developed. + +`codex-app-server-daemon` backs the machine-readable `codex app-server` +lifecycle commands used by remote clients such as the desktop and mobile apps. +It is intended for Codex instances launched over SSH, including fresh developer +machines that should expose app-server with `remote_control` enabled. + +## Platform support + +The current daemon implementation is Unix-only. It uses pidfile-backed +daemonization plus Unix process and file-locking primitives, and does not yet +support Windows lifecycle management. + +## Commands + +```sh +codex app-server daemon start +codex app-server daemon restart +codex app-server daemon enable-remote-control +codex app-server daemon disable-remote-control +codex app-server daemon stop +codex app-server daemon version +codex app-server daemon bootstrap --remote-control +``` + +On success, every command writes exactly one JSON object to stdout. Consumers +should parse that JSON rather than relying on human-readable text. Lifecycle +responses report the resolved backend, socket path, local CLI version, and +running app-server version when applicable. + +## Bootstrap flow + +For a new remote machine: + +```sh +curl -fsSL https://chatgpt.com/codex/install.sh | sh +$HOME/.codex/packages/standalone/current/codex app-server daemon bootstrap --remote-control +``` + +`bootstrap` requires the standalone managed install. It records the daemon +settings under `CODEX_HOME/app-server-daemon/`, starts app-server as a +pidfile-backed detached process, and launches a detached updater loop. + +## Installation and update cases + +The daemon assumes Codex is installed through `install.sh` and always launches +the standalone managed binary under `CODEX_HOME`. + +| Situation | What starts | Does this daemon fetch new binaries? | Does a running app-server eventually move to a newer binary on its own? | +| --- | --- | --- | --- | +| `install.sh` has run, but only `start` is used | `start` uses `CODEX_HOME/packages/standalone/current/codex` | No | No. The managed path is used when starting or restarting, but no updater is installed. | +| `install.sh` has run, then `bootstrap` is used | The pidfile backend uses `CODEX_HOME/packages/standalone/current/codex` | Yes. Bootstrap launches a detached updater loop that runs `install.sh` hourly. | Yes, while that updater process is alive. After a successful fetch, it restarts a currently running app-server only when the managed binary reports a different version. | +| Some other tool updates the managed binary path | The next fresh start or restart uses the updated file at that path | No | Not automatically. The existing process keeps the old executable image until an explicit `restart`. | + +### Standalone installs + +For installs created by `install.sh`: + +- lifecycle commands always use the standalone managed binary path +- `bootstrap` is supported +- `bootstrap` starts a detached pid-backed updater loop that fetches via + `install.sh`, then restarts app-server if it is running on a different version +- the updater loop is not reboot-persistent; it must be started again by + rerunning `bootstrap` after a reboot + +### Out-of-band updates + +This daemon does not watch arbitrary executable files for replacement. If some +other tool updates a binary that the daemon would use on its next launch: + +- a currently running app-server remains on the old executable image +- `restart` will launch the updated binary +- for bootstrapped daemons, the detached updater loop only reacts to updates it + fetched itself; it does not watch arbitrary file replacement + +## Lifecycle semantics + +`start` is idempotent and returns after app-server is ready to answer the normal +JSON-RPC initialize handshake on the Unix control socket. + +`restart` stops any managed daemon and starts it again. + +`enable-remote-control` and `disable-remote-control` persist the launch setting +for future starts. If a managed app-server is already running, they restart it +so the new setting takes effect immediately. + +`stop` sends a graceful termination request first, then sends a second +termination signal after the grace window if the process is still alive. + +All mutating lifecycle commands are serialized per `CODEX_HOME`, so a concurrent +`start`, `restart`, `enable-remote-control`, `disable-remote-control`, `stop`, +or `bootstrap` does not race another in-flight lifecycle operation. + +## State + +The daemon stores its local state under `CODEX_HOME/app-server-daemon/`: + +- `settings.json` for persisted launch settings +- `app-server.pid` for the app-server process record +- `app-server-updater.pid` for the pid-backed standalone updater loop +- `daemon.lock` for daemon-wide lifecycle serialization diff --git a/codex-rs/app-server-daemon/src/backend/mod.rs b/codex-rs/app-server-daemon/src/backend/mod.rs new file mode 100644 index 0000000000..5e92dd5261 --- /dev/null +++ b/codex-rs/app-server-daemon/src/backend/mod.rs @@ -0,0 +1,33 @@ +mod pid; + +use std::path::PathBuf; + +use serde::Serialize; + +pub(crate) use pid::PidBackend; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum BackendKind { + Pid, +} + +#[derive(Debug, Clone)] +pub(crate) struct BackendPaths { + pub(crate) codex_bin: PathBuf, + pub(crate) pid_file: PathBuf, + pub(crate) update_pid_file: PathBuf, + pub(crate) remote_control_enabled: bool, +} + +pub(crate) fn pid_backend(paths: BackendPaths) -> PidBackend { + PidBackend::new( + paths.codex_bin, + paths.pid_file, + paths.remote_control_enabled, + ) +} + +pub(crate) fn pid_update_loop_backend(paths: BackendPaths) -> PidBackend { + PidBackend::new_update_loop(paths.codex_bin, paths.update_pid_file) +} diff --git a/codex-rs/app-server-daemon/src/backend/pid.rs b/codex-rs/app-server-daemon/src/backend/pid.rs new file mode 100644 index 0000000000..64228d9c9b --- /dev/null +++ b/codex-rs/app-server-daemon/src/backend/pid.rs @@ -0,0 +1,600 @@ +use std::path::Path; +use std::path::PathBuf; +#[cfg(unix)] +use std::process::Stdio; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use serde::Deserialize; +use serde::Serialize; +use tokio::fs; +#[cfg(unix)] +use tokio::process::Command; +use tokio::time::sleep; + +const STOP_POLL_INTERVAL: Duration = Duration::from_millis(50); +const STOP_GRACE_PERIOD: Duration = Duration::from_secs(60); +const STOP_TIMEOUT: Duration = Duration::from_secs(70); +const START_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug)] +#[cfg_attr(not(unix), allow(dead_code))] +pub(crate) struct PidBackend { + codex_bin: PathBuf, + pid_file: PathBuf, + lock_file: PathBuf, + command_kind: PidCommandKind, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PidRecord { + pid: u32, + process_start_time: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum PidFileState { + Missing, + Starting, + Running(PidRecord), +} + +#[derive(Debug, Clone, Copy)] +#[cfg_attr(not(unix), allow(dead_code))] +enum PidCommandKind { + AppServer { remote_control_enabled: bool }, + UpdateLoop, +} + +impl PidBackend { + pub(crate) fn new(codex_bin: PathBuf, pid_file: PathBuf, remote_control_enabled: bool) -> Self { + let lock_file = pid_file.with_extension("pid.lock"); + Self { + codex_bin, + pid_file, + lock_file, + command_kind: PidCommandKind::AppServer { + remote_control_enabled, + }, + } + } + + pub(crate) fn new_update_loop(codex_bin: PathBuf, pid_file: PathBuf) -> Self { + let lock_file = pid_file.with_extension("pid.lock"); + Self { + codex_bin, + pid_file, + lock_file, + command_kind: PidCommandKind::UpdateLoop, + } + } + + pub(crate) async fn is_starting_or_running(&self) -> Result { + loop { + match self.read_pid_file_state().await? { + PidFileState::Missing => return Ok(false), + PidFileState::Starting => return Ok(true), + PidFileState::Running(record) => { + if self.record_is_active(&record).await? { + return Ok(true); + } + match self.refresh_after_stale_record(&record).await? { + PidFileState::Missing => return Ok(false), + PidFileState::Starting | PidFileState::Running(_) => continue, + } + } + } + } + } + + #[cfg(unix)] + pub(crate) async fn start(&self) -> Result> { + if let Some(parent) = self.pid_file.parent() { + fs::create_dir_all(parent) + .await + .with_context(|| format!("failed to create pid directory {}", parent.display()))?; + } + let reservation_lock = self.acquire_reservation_lock().await?; + let _pid_file = loop { + match fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.pid_file) + .await + { + Ok(pid_file) => break pid_file, + Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { + match self.read_pid_file_state_with_lock_held().await? { + PidFileState::Missing => continue, + PidFileState::Running(record) => { + if self.record_is_active(&record).await? { + return Ok(None); + } + let _ = fs::remove_file(&self.pid_file).await; + continue; + } + PidFileState::Starting => { + unreachable!("lock holder cannot observe starting") + } + } + } + Err(err) => { + return Err(err).with_context(|| { + format!("failed to reserve pid file {}", self.pid_file.display()) + }); + } + } + }; + let mut command = Command::new(&self.codex_bin); + command + .args(self.command_args()) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()); + + #[cfg(unix)] + { + unsafe { + command.pre_exec(|| { + if libc::setsid() == -1 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) + }); + } + } + + let child = match command.spawn() { + Ok(child) => child, + Err(err) => { + let _ = fs::remove_file(&self.pid_file).await; + return Err(err).with_context(|| { + format!( + "failed to spawn detached app-server process using {}", + self.codex_bin.display() + ) + }); + } + }; + let pid = child + .id() + .context("spawned app-server process has no pid")?; + let record = match read_process_start_time(pid).await { + Ok(process_start_time) => PidRecord { + pid, + process_start_time, + }, + Err(err) => { + let _ = self.terminate_process(pid); + let _ = fs::remove_file(&self.pid_file).await; + return Err(err); + } + }; + let contents = serde_json::to_vec(&record).context("failed to serialize pid record")?; + let temp_pid_file = self.pid_file.with_extension("pid.tmp"); + if let Err(err) = fs::write(&temp_pid_file, &contents).await { + let _ = self.terminate_process(pid); + let _ = fs::remove_file(&self.pid_file).await; + return Err(err).with_context(|| { + format!("failed to write pid temp file {}", temp_pid_file.display()) + }); + } + if let Err(err) = fs::rename(&temp_pid_file, &self.pid_file).await { + let _ = self.terminate_process(pid); + let _ = fs::remove_file(&temp_pid_file).await; + let _ = fs::remove_file(&self.pid_file).await; + return Err(err).with_context(|| { + format!("failed to publish pid file {}", self.pid_file.display()) + }); + } + drop(reservation_lock); + Ok(Some(pid)) + } + + #[cfg(not(unix))] + pub(crate) async fn start(&self) -> Result> { + bail!("pid-managed app-server startup is unsupported on this platform") + } + + pub(crate) async fn stop(&self) -> Result<()> { + loop { + let Some(record) = self.wait_for_pid_start().await? else { + return Ok(()); + }; + if !self.record_is_active(&record).await? { + match self.refresh_after_stale_record(&record).await? { + PidFileState::Missing => return Ok(()), + PidFileState::Starting | PidFileState::Running(_) => continue, + } + } + + let pid = record.pid; + self.terminate_process(pid)?; + let started_at = tokio::time::Instant::now(); + let deadline = tokio::time::Instant::now() + STOP_TIMEOUT; + let mut forced = false; + while tokio::time::Instant::now() < deadline { + if !self.record_is_active(&record).await? { + match self.refresh_after_stale_record(&record).await? { + PidFileState::Missing => return Ok(()), + PidFileState::Starting | PidFileState::Running(_) => break, + } + } + if !forced && started_at.elapsed() >= STOP_GRACE_PERIOD { + self.force_terminate_process(pid)?; + forced = true; + } + sleep(STOP_POLL_INTERVAL).await; + } + + if self.record_is_active(&record).await? { + bail!("timed out waiting for pid-managed app server {pid} to stop"); + } + } + } + + async fn wait_for_pid_start(&self) -> Result> { + let deadline = tokio::time::Instant::now() + START_TIMEOUT; + loop { + match self.read_pid_file_state().await? { + PidFileState::Missing => return Ok(None), + PidFileState::Running(record) => return Ok(Some(record)), + PidFileState::Starting if tokio::time::Instant::now() < deadline => { + sleep(STOP_POLL_INTERVAL).await; + } + PidFileState::Starting => { + bail!( + "timed out waiting for pid reservation in {} to finish initializing", + self.pid_file.display() + ); + } + } + } + } + + async fn read_pid_file_state(&self) -> Result { + let contents = match fs::read_to_string(&self.pid_file).await { + Ok(contents) => contents, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return if reservation_lock_is_active(&self.lock_file).await? { + Ok(PidFileState::Starting) + } else { + Ok(PidFileState::Missing) + }; + } + Err(err) => { + return Err(err).with_context(|| { + format!("failed to read pid file {}", self.pid_file.display()) + }); + } + }; + if contents.trim().is_empty() { + match inspect_empty_pid_reservation(&self.pid_file, &self.lock_file).await? { + EmptyPidReservation::Active => { + return Ok(PidFileState::Starting); + } + EmptyPidReservation::Stale => { + return Ok(PidFileState::Missing); + } + EmptyPidReservation::Record(record) => return Ok(PidFileState::Running(record)), + } + } + let record = serde_json::from_str(&contents) + .with_context(|| format!("invalid pid file contents in {}", self.pid_file.display()))?; + Ok(PidFileState::Running(record)) + } + + async fn read_pid_file_state_with_lock_held(&self) -> Result { + let contents = match fs::read_to_string(&self.pid_file).await { + Ok(contents) => contents, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return Ok(PidFileState::Missing); + } + Err(err) => { + return Err(err).with_context(|| { + format!("failed to read pid file {}", self.pid_file.display()) + }); + } + }; + if contents.trim().is_empty() { + let _ = fs::remove_file(&self.pid_file).await; + return Ok(PidFileState::Missing); + } + let record = serde_json::from_str(&contents) + .with_context(|| format!("invalid pid file contents in {}", self.pid_file.display()))?; + Ok(PidFileState::Running(record)) + } + + async fn refresh_after_stale_record(&self, expected: &PidRecord) -> Result { + let reservation_lock = self.acquire_reservation_lock().await?; + let state = match self.read_pid_file_state_with_lock_held().await? { + PidFileState::Running(record) if record == *expected => { + let _ = fs::remove_file(&self.pid_file).await; + PidFileState::Missing + } + state => state, + }; + drop(reservation_lock); + Ok(state) + } + + async fn acquire_reservation_lock(&self) -> Result { + let reservation_lock = fs::OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&self.lock_file) + .await + .with_context(|| { + format!("failed to open pid lock file {}", self.lock_file.display()) + })?; + let lock_deadline = tokio::time::Instant::now() + START_TIMEOUT; + while !try_lock_file(&reservation_lock)? { + if tokio::time::Instant::now() >= lock_deadline { + bail!( + "timed out waiting for pid lock {}", + self.lock_file.display() + ); + } + sleep(STOP_POLL_INTERVAL).await; + } + Ok(reservation_lock) + } + + #[cfg(unix)] + fn command_args(&self) -> Vec<&'static str> { + match self.command_kind { + PidCommandKind::AppServer { + remote_control_enabled: true, + } => vec![ + "--enable", + "remote_control", + "app-server", + "--listen", + "unix://", + ], + PidCommandKind::AppServer { + remote_control_enabled: false, + } => vec!["app-server", "--listen", "unix://"], + PidCommandKind::UpdateLoop => vec!["app-server", "daemon", "pid-update-loop"], + } + } + + fn terminate_process(&self, pid: u32) -> Result<()> { + match self.command_kind { + PidCommandKind::AppServer { .. } => terminate_process(pid), + PidCommandKind::UpdateLoop => terminate_process(pid), + } + } + + fn force_terminate_process(&self, pid: u32) -> Result<()> { + match self.command_kind { + PidCommandKind::AppServer { .. } => force_terminate_process(pid), + PidCommandKind::UpdateLoop => force_terminate_process_group(pid), + } + } + + async fn record_is_active(&self, record: &PidRecord) -> Result { + process_matches_record(record).await + } +} + +#[cfg(unix)] +fn process_exists(pid: u32) -> bool { + let Ok(pid) = libc::pid_t::try_from(pid) else { + return false; + }; + let result = unsafe { libc::kill(pid, 0) }; + result == 0 || std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM) +} + +#[cfg(unix)] +fn terminate_process(pid: u32) -> Result<()> { + let raw_pid = libc::pid_t::try_from(pid) + .with_context(|| format!("pid-managed app server pid {pid} is out of range"))?; + let result = unsafe { libc::kill(raw_pid, libc::SIGTERM) }; + if result == 0 { + return Ok(()); + } + let err = std::io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ESRCH) { + return Ok(()); + } + Err(err).with_context(|| format!("failed to terminate pid-managed app server {pid}")) +} + +#[cfg(unix)] +fn force_terminate_process(pid: u32) -> Result<()> { + let raw_pid = libc::pid_t::try_from(pid) + .with_context(|| format!("pid-managed app server pid {pid} is out of range"))?; + let result = unsafe { libc::kill(raw_pid, libc::SIGKILL) }; + if result == 0 { + return Ok(()); + } + let err = std::io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ESRCH) { + return Ok(()); + } + Err(err).with_context(|| format!("failed to force terminate pid-managed app server {pid}")) +} + +#[cfg(unix)] +fn force_terminate_process_group(pid: u32) -> Result<()> { + let raw_pid = libc::pid_t::try_from(pid) + .with_context(|| format!("pid-managed updater pid {pid} is out of range"))?; + let result = unsafe { libc::kill(-raw_pid, libc::SIGKILL) }; + if result == 0 { + return Ok(()); + } + let err = std::io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ESRCH) { + return Ok(()); + } + Err(err).with_context(|| format!("failed to force terminate pid-managed updater group {pid}")) +} + +#[cfg(not(unix))] +fn terminate_process(_pid: u32) -> Result<()> { + bail!("pid-managed app-server shutdown is unsupported on this platform") +} + +#[cfg(not(unix))] +fn force_terminate_process(_pid: u32) -> Result<()> { + bail!("pid-managed app-server shutdown is unsupported on this platform") +} + +#[cfg(not(unix))] +fn force_terminate_process_group(_pid: u32) -> Result<()> { + bail!("pid-managed updater shutdown is unsupported on this platform") +} + +#[cfg(unix)] +async fn process_matches_record(record: &PidRecord) -> Result { + if !process_exists(record.pid) { + return Ok(false); + } + + match read_process_start_time(record.pid).await { + Ok(start_time) => Ok(start_time == record.process_start_time), + Err(_err) if !process_exists(record.pid) => Ok(false), + Err(err) => Err(err), + } +} + +#[cfg(not(unix))] +async fn process_matches_record(_record: &PidRecord) -> Result { + Ok(false) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(not(unix), allow(dead_code))] +enum EmptyPidReservation { + Active, + Stale, + Record(PidRecord), +} + +#[cfg(unix)] +fn try_lock_file(file: &fs::File) -> Result { + use std::os::fd::AsRawFd; + + let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + if result == 0 { + return Ok(true); + } + + let err = std::io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EWOULDBLOCK) { + return Ok(false); + } + Err(err).context("failed to lock pid reservation") +} + +#[cfg(not(unix))] +fn try_lock_file(_file: &fs::File) -> Result { + bail!("pid-managed app-server startup is unsupported on this platform") +} + +#[cfg(unix)] +async fn reservation_lock_is_active(path: &Path) -> Result { + let file = match fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path) + .await + { + Ok(file) => file, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return Ok(false); + } + Err(err) => { + return Err(err) + .with_context(|| format!("failed to inspect pid lock file {}", path.display())); + } + }; + Ok(!try_lock_file(&file)?) +} + +#[cfg(not(unix))] +async fn reservation_lock_is_active(_path: &Path) -> Result { + Ok(false) +} + +#[cfg(unix)] +async fn inspect_empty_pid_reservation( + pid_path: &Path, + lock_path: &Path, +) -> Result { + let file = match fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(lock_path) + .await + { + Ok(file) => file, + Err(err) => { + return Err(err).with_context(|| { + format!("failed to inspect pid lock file {}", lock_path.display()) + }); + } + }; + if !try_lock_file(&file)? { + return Ok(EmptyPidReservation::Active); + } + + let contents = match fs::read_to_string(pid_path).await { + Ok(contents) => contents, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return Ok(EmptyPidReservation::Stale); + } + Err(err) => { + return Err(err) + .with_context(|| format!("failed to reread pid file {}", pid_path.display())); + } + }; + if contents.trim().is_empty() { + let _ = fs::remove_file(pid_path).await; + return Ok(EmptyPidReservation::Stale); + } + + let record = serde_json::from_str(&contents) + .with_context(|| format!("invalid pid file contents in {}", pid_path.display()))?; + Ok(EmptyPidReservation::Record(record)) +} + +#[cfg(not(unix))] +async fn inspect_empty_pid_reservation( + _pid_path: &Path, + _lock_path: &Path, +) -> Result { + Ok(EmptyPidReservation::Stale) +} + +#[cfg(unix)] +async fn read_process_start_time(pid: u32) -> Result { + let output = Command::new("ps") + .args(["-p", &pid.to_string(), "-o", "lstart="]) + .output() + .await + .context("failed to invoke ps for pid-managed app server")?; + if !output.status.success() { + bail!("failed to read start time for pid-managed app server {pid}"); + } + + let start_time = String::from_utf8(output.stdout) + .context("pid-managed app server start time was not utf-8")?; + let start_time = start_time.trim(); + if start_time.is_empty() { + bail!("pid-managed app server {pid} has no recorded start time"); + } + Ok(start_time.to_string()) +} + +#[cfg(all(test, unix))] +#[path = "pid_tests.rs"] +mod tests; diff --git a/codex-rs/app-server-daemon/src/backend/pid_tests.rs b/codex-rs/app-server-daemon/src/backend/pid_tests.rs new file mode 100644 index 0000000000..dc279794c2 --- /dev/null +++ b/codex-rs/app-server-daemon/src/backend/pid_tests.rs @@ -0,0 +1,158 @@ +use std::time::Duration; + +use pretty_assertions::assert_eq; +use tempfile::TempDir; + +use super::PidBackend; +use super::PidCommandKind; +use super::PidFileState; +use super::PidRecord; +use super::try_lock_file; + +#[tokio::test] +async fn locked_empty_pid_file_is_treated_as_active_reservation() { + let temp_dir = TempDir::new().expect("temp dir"); + let pid_file = temp_dir.path().join("app-server.pid"); + tokio::fs::write(&pid_file, "") + .await + .expect("write pid file"); + let backend = PidBackend::new( + temp_dir.path().join("codex"), + pid_file.clone(), + /*remote_control_enabled*/ false, + ); + let reservation = tokio::fs::OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&backend.lock_file) + .await + .expect("open pid lock file"); + assert!(try_lock_file(&reservation).expect("lock reservation")); + + assert_eq!( + backend.read_pid_file_state().await.expect("read pid"), + PidFileState::Starting + ); + assert!(pid_file.exists()); +} + +#[tokio::test] +async fn unlocked_empty_pid_file_is_treated_as_stale_reservation() { + let temp_dir = TempDir::new().expect("temp dir"); + let pid_file = temp_dir.path().join("app-server.pid"); + tokio::fs::write(&pid_file, "") + .await + .expect("write pid file"); + let backend = PidBackend::new( + temp_dir.path().join("codex"), + pid_file.clone(), + /*remote_control_enabled*/ false, + ); + + assert_eq!( + backend.read_pid_file_state().await.expect("read pid"), + PidFileState::Missing + ); + assert!(!pid_file.exists()); +} + +#[tokio::test] +async fn stop_waits_for_live_reservation_to_resolve() { + let temp_dir = TempDir::new().expect("temp dir"); + let pid_file = temp_dir.path().join("app-server.pid"); + tokio::fs::write(&pid_file, "") + .await + .expect("write pid file"); + let backend = PidBackend::new( + temp_dir.path().join("codex"), + pid_file.clone(), + /*remote_control_enabled*/ false, + ); + let reservation = tokio::fs::OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&backend.lock_file) + .await + .expect("open pid lock file"); + assert!(try_lock_file(&reservation).expect("lock reservation")); + let cleanup = tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(50)).await; + drop(reservation); + tokio::fs::remove_file(pid_file) + .await + .expect("remove pid file"); + }); + + backend.stop().await.expect("stop"); + cleanup.await.expect("cleanup task"); +} + +#[tokio::test] +async fn start_retries_stale_empty_pid_file_under_its_own_lock() { + let temp_dir = TempDir::new().expect("temp dir"); + let pid_file = temp_dir.path().join("app-server.pid"); + tokio::fs::write(&pid_file, "") + .await + .expect("write pid file"); + let backend = PidBackend::new( + temp_dir.path().join("missing-codex"), + pid_file, + /*remote_control_enabled*/ false, + ); + + let err = backend.start().await.expect_err("start"); + assert!( + err.to_string() + .starts_with("failed to spawn detached app-server process using ") + ); +} + +#[tokio::test] +async fn stale_record_cleanup_preserves_replacement_record() { + let temp_dir = TempDir::new().expect("temp dir"); + let pid_file = temp_dir.path().join("app-server.pid"); + let backend = PidBackend::new( + temp_dir.path().join("codex"), + pid_file.clone(), + /*remote_control_enabled*/ false, + ); + let stale = PidRecord { + pid: 1, + process_start_time: "old".to_string(), + }; + let replacement = PidRecord { + pid: 2, + process_start_time: "new".to_string(), + }; + tokio::fs::write( + &pid_file, + serde_json::to_vec(&replacement).expect("serialize replacement"), + ) + .await + .expect("write replacement pid file"); + + assert_eq!( + backend + .refresh_after_stale_record(&stale) + .await + .expect("cleanup"), + PidFileState::Running(replacement) + ); +} + +#[test] +fn update_loop_uses_hidden_app_server_subcommand() { + let backend = PidBackend { + codex_bin: "codex".into(), + pid_file: "updater.pid".into(), + lock_file: "updater.pid.lock".into(), + command_kind: PidCommandKind::UpdateLoop, + }; + + assert_eq!( + backend.command_args(), + vec!["app-server", "daemon", "pid-update-loop"] + ); +} diff --git a/codex-rs/app-server-daemon/src/client.rs b/codex-rs/app-server-daemon/src/client.rs new file mode 100644 index 0000000000..44fccda394 --- /dev/null +++ b/codex-rs/app-server-daemon/src/client.rs @@ -0,0 +1,131 @@ +use std::path::Path; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use anyhow::anyhow; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::InitializeResponse; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::RequestId; +use codex_uds::UnixStream; +use futures::SinkExt; +use futures::StreamExt; +use tokio::time::timeout; +use tokio_tungstenite::client_async; +use tokio_tungstenite::tungstenite::Message; + +const PROBE_TIMEOUT: Duration = Duration::from_secs(2); +const CLIENT_NAME: &str = "codex_app_server_daemon"; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ProbeInfo { + pub(crate) app_server_version: String, +} + +pub(crate) async fn probe(socket_path: &Path) -> Result { + timeout(PROBE_TIMEOUT, probe_inner(socket_path)) + .await + .with_context(|| { + format!( + "timed out probing app-server control socket {}", + socket_path.display() + ) + })? +} + +async fn probe_inner(socket_path: &Path) -> Result { + let stream = UnixStream::connect(socket_path) + .await + .with_context(|| format!("failed to connect to {}", socket_path.display()))?; + let (mut websocket, _response) = client_async("ws://localhost/", stream) + .await + .with_context(|| format!("failed to upgrade {}", socket_path.display()))?; + + let initialize = JSONRPCMessage::Request(JSONRPCRequest { + id: RequestId::Integer(1), + method: "initialize".to_string(), + params: Some(serde_json::to_value(InitializeParams { + client_info: ClientInfo { + name: CLIENT_NAME.to_string(), + title: Some("Codex App Server Daemon".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + capabilities: None, + })?), + trace: None, + }); + websocket + .send(Message::Text(serde_json::to_string(&initialize)?.into())) + .await + .context("failed to send initialize request")?; + + let response = loop { + let frame = websocket + .next() + .await + .ok_or_else(|| anyhow!("app-server closed before initialize response"))??; + let Message::Text(payload) = frame else { + continue; + }; + let message = serde_json::from_str::(&payload)?; + if let JSONRPCMessage::Response(response) = message + && response.id == RequestId::Integer(1) + { + break response; + } + }; + let initialize_response = serde_json::from_value::(response.result)?; + + let initialized = JSONRPCMessage::Notification(JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + websocket + .send(Message::Text(serde_json::to_string(&initialized)?.into())) + .await + .context("failed to send initialized notification")?; + websocket.close(None).await.ok(); + + Ok(ProbeInfo { + app_server_version: parse_version_from_user_agent(&initialize_response.user_agent)?, + }) +} + +fn parse_version_from_user_agent(user_agent: &str) -> Result { + let (_originator, rest) = user_agent + .split_once('/') + .ok_or_else(|| anyhow!("app-server user-agent omitted version separator"))?; + let version = rest + .split_whitespace() + .next() + .filter(|version| !version.is_empty()) + .ok_or_else(|| anyhow!("app-server user-agent omitted version"))?; + Ok(version.to_string()) +} + +#[cfg(all(test, unix))] +mod tests { + use pretty_assertions::assert_eq; + + use super::parse_version_from_user_agent; + + #[test] + fn parses_version_from_codex_user_agent() { + assert_eq!( + parse_version_from_user_agent( + "codex_app_server_daemon/1.2.3 (Linux 6.8.0; x86_64) codex_cli_rs/1.2.3", + ) + .expect("version"), + "1.2.3" + ); + } + + #[test] + fn rejects_user_agent_without_version() { + assert!(parse_version_from_user_agent("codex_app_server_daemon").is_err()); + } +} diff --git a/codex-rs/app-server-daemon/src/lib.rs b/codex-rs/app-server-daemon/src/lib.rs new file mode 100644 index 0000000000..053bcdc354 --- /dev/null +++ b/codex-rs/app-server-daemon/src/lib.rs @@ -0,0 +1,630 @@ +mod backend; +mod client; +mod managed_install; +mod settings; +mod update_loop; + +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use anyhow::anyhow; +pub use backend::BackendKind; +use backend::BackendPaths; +use codex_app_server_transport::app_server_control_socket_path; +use codex_core::config::find_codex_home; +use managed_install::managed_codex_bin; +#[cfg(unix)] +use managed_install::managed_codex_version; +use serde::Serialize; +use settings::DaemonSettings; +use tokio::time::sleep; + +const START_POLL_INTERVAL: Duration = Duration::from_millis(50); +const START_TIMEOUT: Duration = Duration::from_secs(10); +const OPERATION_LOCK_TIMEOUT: Duration = Duration::from_secs(75); +const PID_FILE_NAME: &str = "app-server.pid"; +const UPDATE_PID_FILE_NAME: &str = "app-server-updater.pid"; +const OPERATION_LOCK_FILE_NAME: &str = "daemon.lock"; +const SETTINGS_FILE_NAME: &str = "settings.json"; +const STATE_DIR_NAME: &str = "app-server-daemon"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LifecycleCommand { + Start, + Restart, + Stop, + Version, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum LifecycleStatus { + AlreadyRunning, + Started, + Restarted, + Stopped, + NotRunning, + Running, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LifecycleOutput { + pub status: LifecycleStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub backend: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pid: Option, + pub socket_path: PathBuf, + #[serde(skip_serializing_if = "Option::is_none")] + pub cli_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub app_server_version: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BootstrapOptions { + pub remote_control_enabled: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum BootstrapStatus { + Bootstrapped, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BootstrapOutput { + pub status: BootstrapStatus, + pub backend: BackendKind, + pub auto_update_enabled: bool, + pub remote_control_enabled: bool, + pub managed_codex_path: PathBuf, + pub socket_path: PathBuf, + pub cli_version: String, + pub app_server_version: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RemoteControlMode { + Enabled, + Disabled, +} + +impl RemoteControlMode { + fn is_enabled(self) -> bool { + matches!(self, Self::Enabled) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum RemoteControlStatus { + Enabled, + Disabled, + AlreadyEnabled, + AlreadyDisabled, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RemoteControlOutput { + pub status: RemoteControlStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub backend: Option, + pub remote_control_enabled: bool, + pub socket_path: PathBuf, + pub cli_version: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub app_server_version: Option, +} + +#[cfg(unix)] +pub(crate) enum RestartIfRunningOutcome { + Completed, + Busy, +} + +pub async fn run(command: LifecycleCommand) -> Result { + ensure_supported_platform()?; + Daemon::from_environment()?.run(command).await +} + +pub async fn bootstrap(options: BootstrapOptions) -> Result { + ensure_supported_platform()?; + Daemon::from_environment()?.bootstrap(options).await +} + +pub async fn set_remote_control(mode: RemoteControlMode) -> Result { + ensure_supported_platform()?; + Daemon::from_environment()?.set_remote_control(mode).await +} + +pub async fn run_pid_update_loop() -> Result<()> { + ensure_supported_platform()?; + update_loop::run().await +} + +#[cfg(unix)] +fn ensure_supported_platform() -> Result<()> { + Ok(()) +} + +#[cfg(not(unix))] +fn ensure_supported_platform() -> Result<()> { + Err(anyhow!( + "codex app-server daemon lifecycle is only supported on Unix platforms" + )) +} + +struct Daemon { + socket_path: PathBuf, + pid_file: PathBuf, + update_pid_file: PathBuf, + operation_lock_file: PathBuf, + settings_file: PathBuf, + managed_codex_bin: PathBuf, +} + +impl Daemon { + fn from_environment() -> Result { + let codex_home = find_codex_home().context("failed to resolve CODEX_HOME")?; + let socket_path = app_server_control_socket_path(codex_home.as_path())? + .as_path() + .to_path_buf(); + let state_dir = codex_home.as_path().join(STATE_DIR_NAME); + Ok(Self { + socket_path, + pid_file: state_dir.join(PID_FILE_NAME), + update_pid_file: state_dir.join(UPDATE_PID_FILE_NAME), + operation_lock_file: state_dir.join(OPERATION_LOCK_FILE_NAME), + settings_file: state_dir.join(SETTINGS_FILE_NAME), + managed_codex_bin: managed_codex_bin(codex_home.as_path()), + }) + } + + async fn run(&self, command: LifecycleCommand) -> Result { + match command { + LifecycleCommand::Start => { + let _operation_lock = self.acquire_operation_lock().await?; + self.start().await + } + LifecycleCommand::Restart => { + let _operation_lock = self.acquire_operation_lock().await?; + self.restart().await + } + LifecycleCommand::Stop => { + let _operation_lock = self.acquire_operation_lock().await?; + self.stop().await + } + LifecycleCommand::Version => self.version().await, + } + } + + async fn start(&self) -> Result { + let settings = self.load_settings().await?; + if let Ok(info) = client::probe(&self.socket_path).await { + return Ok(self.output( + LifecycleStatus::AlreadyRunning, + self.running_backend(&settings).await?, + /*pid*/ None, + Some(info.app_server_version), + )); + } + + if self.running_backend_instance(&settings).await?.is_some() { + let info = self.wait_until_ready().await?; + return Ok(self.output( + LifecycleStatus::AlreadyRunning, + Some(BackendKind::Pid), + /*pid*/ None, + Some(info.app_server_version), + )); + } + + self.ensure_managed_codex_bin()?; + let pid = self.start_managed_backend(&settings).await?; + let info = self.wait_until_ready().await?; + Ok(self.output( + LifecycleStatus::Started, + Some(BackendKind::Pid), + pid, + Some(info.app_server_version), + )) + } + + async fn restart(&self) -> Result { + let settings = self.load_settings().await?; + if client::probe(&self.socket_path).await.is_ok() + && self.running_backend(&settings).await?.is_none() + { + return Err(anyhow!( + "app server is running but is not managed by codex app-server daemon" + )); + } + + self.ensure_managed_codex_bin()?; + if let Some(backend) = self.running_backend_instance(&settings).await? { + backend.stop().await?; + } + + let pid = self.start_managed_backend(&settings).await?; + let info = self.wait_until_ready().await?; + Ok(self.output( + LifecycleStatus::Restarted, + Some(BackendKind::Pid), + pid, + Some(info.app_server_version), + )) + } + + #[cfg(unix)] + pub(crate) async fn try_restart_if_running(&self) -> Result { + let operation_lock = self.open_operation_lock_file().await?; + if !try_lock_file(&operation_lock)? { + return Ok(RestartIfRunningOutcome::Busy); + } + let settings = self.load_settings().await?; + if let Some(backend) = self.running_backend_instance(&settings).await? { + let Ok(info) = client::probe(&self.socket_path).await else { + return Ok(RestartIfRunningOutcome::Completed); + }; + let managed_version = managed_codex_version(&self.managed_codex_bin).await?; + if info.app_server_version == managed_version { + return Ok(RestartIfRunningOutcome::Completed); + } + backend.stop().await?; + let _ = self.start_managed_backend(&settings).await?; + self.wait_until_ready().await?; + return Ok(RestartIfRunningOutcome::Completed); + } + + if client::probe(&self.socket_path).await.is_ok() { + return Err(anyhow!( + "app server is running but is not managed by codex app-server daemon" + )); + } + + Ok(RestartIfRunningOutcome::Completed) + } + + async fn stop(&self) -> Result { + let settings = self.load_settings().await?; + if let Some(backend) = self.running_backend_instance(&settings).await? { + backend.stop().await?; + return Ok(self.output( + LifecycleStatus::Stopped, + Some(BackendKind::Pid), + /*pid*/ None, + /*app_server_version*/ None, + )); + } + + if client::probe(&self.socket_path).await.is_ok() { + return Err(anyhow!( + "app server is running but is not managed by codex app-server daemon" + )); + } + + Ok(self.output( + LifecycleStatus::NotRunning, + /*backend*/ None, + /*pid*/ None, + /*app_server_version*/ None, + )) + } + + async fn version(&self) -> Result { + let settings = self.load_settings().await?; + let info = client::probe(&self.socket_path).await?; + Ok(self.output( + LifecycleStatus::Running, + self.running_backend(&settings).await?, + /*pid*/ None, + Some(info.app_server_version), + )) + } + + async fn wait_until_ready(&self) -> Result { + let deadline = tokio::time::Instant::now() + START_TIMEOUT; + loop { + match client::probe(&self.socket_path).await { + Ok(info) => return Ok(info), + Err(err) if tokio::time::Instant::now() < deadline => { + let _ = err; + sleep(START_POLL_INTERVAL).await; + } + Err(err) => { + return Err(err).with_context(|| { + format!( + "app server did not become ready on {}", + self.socket_path.display() + ) + }); + } + } + } + } + + async fn bootstrap(&self, options: BootstrapOptions) -> Result { + let _operation_lock = self.acquire_operation_lock().await?; + self.bootstrap_locked(options).await + } + + async fn set_remote_control(&self, mode: RemoteControlMode) -> Result { + let _operation_lock = self.acquire_operation_lock().await?; + let previous_settings = self.load_settings().await?; + let mut settings = previous_settings.clone(); + let remote_control_enabled = mode.is_enabled(); + let backend = self.running_backend_instance(&previous_settings).await?; + + if backend.is_none() && client::probe(&self.socket_path).await.is_ok() { + return Err(anyhow!( + "app server is running but is not managed by codex app-server daemon" + )); + } + + if settings.remote_control_enabled == remote_control_enabled { + let info = if backend.is_some() { + Some(self.wait_until_ready().await?) + } else { + None + }; + return Ok(self.remote_control_output( + already_remote_control_status(mode), + backend.map(|_| BackendKind::Pid), + remote_control_enabled, + info.map(|info| info.app_server_version), + )); + } + + settings.remote_control_enabled = remote_control_enabled; + settings.save(&self.settings_file).await?; + + let app_server_version = if let Some(backend) = backend { + self.ensure_managed_codex_bin()?; + backend.stop().await?; + let _ = self.start_managed_backend(&settings).await?; + Some(self.wait_until_ready().await?.app_server_version) + } else { + None + }; + + Ok(self.remote_control_output( + remote_control_status(mode), + app_server_version.as_ref().map(|_| BackendKind::Pid), + remote_control_enabled, + app_server_version, + )) + } + + async fn bootstrap_locked(&self, options: BootstrapOptions) -> Result { + self.ensure_managed_codex_bin()?; + + let settings = DaemonSettings { + remote_control_enabled: options.remote_control_enabled, + }; + if client::probe(&self.socket_path).await.is_ok() + && self.running_backend(&settings).await?.is_none() + { + return Err(anyhow!( + "app server is running but is not managed by codex app-server daemon" + )); + } + settings.save(&self.settings_file).await?; + + if let Some(backend) = self.running_backend_instance(&settings).await? { + backend.stop().await?; + } + + let backend = backend::pid_backend(self.backend_paths(&settings)); + backend.start().await?; + let updater = backend::pid_update_loop_backend(self.backend_paths(&settings)); + if updater.is_starting_or_running().await? { + updater.stop().await?; + } + updater.start().await?; + + let info = self.wait_until_ready().await?; + Ok(BootstrapOutput { + status: BootstrapStatus::Bootstrapped, + backend: BackendKind::Pid, + auto_update_enabled: true, + remote_control_enabled: settings.remote_control_enabled, + managed_codex_path: self.managed_codex_bin.clone(), + socket_path: self.socket_path.clone(), + cli_version: env!("CARGO_PKG_VERSION").to_string(), + app_server_version: info.app_server_version, + }) + } + + async fn running_backend(&self, settings: &DaemonSettings) -> Result> { + Ok(self + .running_backend_instance(settings) + .await? + .map(|_| BackendKind::Pid)) + } + + async fn running_backend_instance( + &self, + settings: &DaemonSettings, + ) -> Result> { + let backend = backend::pid_backend(self.backend_paths(settings)); + if backend.is_starting_or_running().await? { + return Ok(Some(backend)); + } + Ok(None) + } + + async fn start_managed_backend(&self, settings: &DaemonSettings) -> Result> { + let backend = backend::pid_backend(self.backend_paths(settings)); + backend.start().await + } + + fn ensure_managed_codex_bin(&self) -> Result<()> { + if self.managed_codex_bin.is_file() { + return Ok(()); + } + + Err(anyhow!( + "managed standalone Codex install not found at {}; install Codex first", + self.managed_codex_bin.display() + )) + } + + fn backend_paths(&self, settings: &DaemonSettings) -> BackendPaths { + BackendPaths { + codex_bin: self.managed_codex_bin.clone(), + pid_file: self.pid_file.clone(), + update_pid_file: self.update_pid_file.clone(), + remote_control_enabled: settings.remote_control_enabled, + } + } + + async fn load_settings(&self) -> Result { + DaemonSettings::load(&self.settings_file).await + } + + async fn acquire_operation_lock(&self) -> Result { + let operation_lock = self.open_operation_lock_file().await?; + let deadline = tokio::time::Instant::now() + OPERATION_LOCK_TIMEOUT; + while !try_lock_file(&operation_lock)? { + if tokio::time::Instant::now() >= deadline { + return Err(anyhow!( + "timed out waiting for daemon operation lock {}", + self.operation_lock_file.display() + )); + } + sleep(START_POLL_INTERVAL).await; + } + Ok(operation_lock) + } + + async fn open_operation_lock_file(&self) -> Result { + if let Some(parent) = self.operation_lock_file.parent() { + tokio::fs::create_dir_all(parent).await.with_context(|| { + format!( + "failed to create daemon state directory {}", + parent.display() + ) + })?; + } + tokio::fs::OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&self.operation_lock_file) + .await + .with_context(|| { + format!( + "failed to open daemon operation lock {}", + self.operation_lock_file.display() + ) + }) + } + + fn output( + &self, + status: LifecycleStatus, + backend: Option, + pid: Option, + app_server_version: Option, + ) -> LifecycleOutput { + LifecycleOutput { + status, + backend, + pid, + socket_path: self.socket_path.clone(), + cli_version: Some(env!("CARGO_PKG_VERSION").to_string()), + app_server_version, + } + } + + fn remote_control_output( + &self, + status: RemoteControlStatus, + backend: Option, + remote_control_enabled: bool, + app_server_version: Option, + ) -> RemoteControlOutput { + RemoteControlOutput { + status, + backend, + remote_control_enabled, + socket_path: self.socket_path.clone(), + cli_version: env!("CARGO_PKG_VERSION").to_string(), + app_server_version, + } + } +} + +fn remote_control_status(mode: RemoteControlMode) -> RemoteControlStatus { + match mode { + RemoteControlMode::Enabled => RemoteControlStatus::Enabled, + RemoteControlMode::Disabled => RemoteControlStatus::Disabled, + } +} + +fn already_remote_control_status(mode: RemoteControlMode) -> RemoteControlStatus { + match mode { + RemoteControlMode::Enabled => RemoteControlStatus::AlreadyEnabled, + RemoteControlMode::Disabled => RemoteControlStatus::AlreadyDisabled, + } +} + +#[cfg(unix)] +fn try_lock_file(file: &tokio::fs::File) -> Result { + use std::os::fd::AsRawFd; + + let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + if result == 0 { + return Ok(true); + } + + let err = std::io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EWOULDBLOCK) { + return Ok(false); + } + Err(err).context("failed to lock daemon operation") +} + +#[cfg(not(unix))] +fn try_lock_file(_file: &tokio::fs::File) -> Result { + Ok(true) +} + +#[cfg(all(test, unix))] +mod tests { + use pretty_assertions::assert_eq; + + use super::BootstrapStatus; + use super::LifecycleStatus; + use super::RemoteControlStatus; + + #[test] + fn lifecycle_status_uses_camel_case_json() { + assert_eq!( + serde_json::to_string(&LifecycleStatus::AlreadyRunning).expect("serialize"), + "\"alreadyRunning\"" + ); + } + + #[test] + fn bootstrap_status_uses_camel_case_json() { + assert_eq!( + serde_json::to_string(&BootstrapStatus::Bootstrapped).expect("serialize"), + "\"bootstrapped\"" + ); + } + + #[test] + fn remote_control_status_uses_camel_case_json() { + assert_eq!( + serde_json::to_string(&RemoteControlStatus::AlreadyEnabled).expect("serialize"), + "\"alreadyEnabled\"" + ); + } +} diff --git a/codex-rs/app-server-daemon/src/managed_install.rs b/codex-rs/app-server-daemon/src/managed_install.rs new file mode 100644 index 0000000000..83debb24e5 --- /dev/null +++ b/codex-rs/app-server-daemon/src/managed_install.rs @@ -0,0 +1,66 @@ +use std::path::Path; +use std::path::PathBuf; + +#[cfg(unix)] +use anyhow::Context; +#[cfg(unix)] +use anyhow::Result; +#[cfg(unix)] +use anyhow::anyhow; +#[cfg(unix)] +use tokio::process::Command; + +pub(crate) fn managed_codex_bin(codex_home: &Path) -> PathBuf { + codex_home + .join("packages") + .join("standalone") + .join("current") + .join(managed_codex_file_name()) +} + +#[cfg(unix)] +pub(crate) async fn managed_codex_version(codex_bin: &Path) -> Result { + let output = Command::new(codex_bin) + .arg("--version") + .output() + .await + .with_context(|| { + format!( + "failed to invoke managed Codex binary {}", + codex_bin.display() + ) + })?; + if !output.status.success() { + return Err(anyhow!( + "managed Codex binary {} exited with status {}", + codex_bin.display(), + output.status + )); + } + + let stdout = String::from_utf8(output.stdout).with_context(|| { + format!( + "managed Codex version was not utf-8: {}", + codex_bin.display() + ) + })?; + parse_codex_version(&stdout) +} + +fn managed_codex_file_name() -> &'static str { + if cfg!(windows) { "codex.exe" } else { "codex" } +} + +#[cfg(unix)] +fn parse_codex_version(output: &str) -> Result { + let version = output + .split_whitespace() + .nth(1) + .filter(|version| !version.is_empty()) + .ok_or_else(|| anyhow!("managed Codex version output was malformed"))?; + Ok(version.to_string()) +} + +#[cfg(all(test, unix))] +#[path = "managed_install_tests.rs"] +mod tests; diff --git a/codex-rs/app-server-daemon/src/managed_install_tests.rs b/codex-rs/app-server-daemon/src/managed_install_tests.rs new file mode 100644 index 0000000000..b7d5cccc4f --- /dev/null +++ b/codex-rs/app-server-daemon/src/managed_install_tests.rs @@ -0,0 +1,16 @@ +use pretty_assertions::assert_eq; + +use super::parse_codex_version; + +#[test] +fn parses_codex_cli_version_output() { + assert_eq!( + parse_codex_version("codex 1.2.3\n").expect("version"), + "1.2.3" + ); +} + +#[test] +fn rejects_malformed_codex_cli_version_output() { + assert!(parse_codex_version("codex\n").is_err()); +} diff --git a/codex-rs/app-server-daemon/src/settings.rs b/codex-rs/app-server-daemon/src/settings.rs new file mode 100644 index 0000000000..18b9c9faf7 --- /dev/null +++ b/codex-rs/app-server-daemon/src/settings.rs @@ -0,0 +1,63 @@ +use std::path::Path; + +use anyhow::Context; +use anyhow::Result; +use serde::Deserialize; +use serde::Serialize; +use tokio::fs; + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct DaemonSettings { + pub(crate) remote_control_enabled: bool, +} + +impl DaemonSettings { + pub(crate) async fn load(path: &Path) -> Result { + let contents = match fs::read_to_string(path).await { + Ok(contents) => contents, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Self::default()), + Err(err) => { + return Err(err) + .with_context(|| format!("failed to read daemon settings {}", path.display())); + } + }; + + serde_json::from_str(&contents) + .with_context(|| format!("failed to parse daemon settings {}", path.display())) + } + + pub(crate) async fn save(&self, path: &Path) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await.with_context(|| { + format!( + "failed to create daemon settings directory {}", + parent.display() + ) + })?; + } + + let contents = serde_json::to_vec_pretty(self).context("failed to serialize settings")?; + fs::write(path, contents) + .await + .with_context(|| format!("failed to write daemon settings {}", path.display())) + } +} + +#[cfg(all(test, unix))] +mod tests { + use pretty_assertions::assert_eq; + + use super::DaemonSettings; + + #[test] + fn daemon_settings_use_camel_case_json() { + assert_eq!( + serde_json::to_string(&DaemonSettings { + remote_control_enabled: true, + }) + .expect("serialize"), + r#"{"remoteControlEnabled":true}"# + ); + } +} diff --git a/codex-rs/app-server-daemon/src/update_loop.rs b/codex-rs/app-server-daemon/src/update_loop.rs new file mode 100644 index 0000000000..91193e0af5 --- /dev/null +++ b/codex-rs/app-server-daemon/src/update_loop.rs @@ -0,0 +1,132 @@ +#[cfg(unix)] +use std::process::Stdio; +#[cfg(unix)] +use std::time::Duration; + +#[cfg(unix)] +use anyhow::Context; +use anyhow::Result; +#[cfg(not(unix))] +use anyhow::bail; +#[cfg(unix)] +use futures::FutureExt; +#[cfg(unix)] +use tokio::io::AsyncWriteExt; +#[cfg(unix)] +use tokio::process::Command; +#[cfg(unix)] +use tokio::signal::unix::Signal; +#[cfg(unix)] +use tokio::signal::unix::SignalKind; +#[cfg(unix)] +use tokio::signal::unix::signal; +#[cfg(unix)] +use tokio::time::sleep; + +#[cfg(unix)] +use crate::Daemon; +#[cfg(unix)] +use crate::RestartIfRunningOutcome; + +#[cfg(unix)] +const INITIAL_UPDATE_DELAY: Duration = Duration::from_secs(5 * 60); +#[cfg(unix)] +const RESTART_RETRY_INTERVAL: Duration = Duration::from_millis(50); +#[cfg(unix)] +const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); + +#[cfg(unix)] +pub(crate) async fn run() -> Result<()> { + let mut terminate = + signal(SignalKind::terminate()).context("failed to install updater shutdown handler")?; + if sleep_or_terminate(INITIAL_UPDATE_DELAY, &mut terminate).await { + return Ok(()); + } + loop { + match update_once(&mut terminate).await { + Ok(UpdateLoopControl::Continue) | Err(_) => {} + Ok(UpdateLoopControl::Stop) => return Ok(()), + } + if sleep_or_terminate(UPDATE_INTERVAL, &mut terminate).await { + return Ok(()); + } + } +} + +#[cfg(not(unix))] +pub(crate) async fn run() -> Result<()> { + bail!("pid-managed updater loop is unsupported on this platform") +} + +#[cfg(unix)] +async fn sleep_or_terminate(duration: Duration, terminate: &mut Signal) -> bool { + tokio::select! { + _ = sleep(duration) => false, + _ = terminate.recv() => true, + } +} + +#[cfg(unix)] +enum UpdateLoopControl { + Continue, + Stop, +} + +#[cfg(unix)] +async fn update_once(terminate: &mut Signal) -> Result { + install_latest_standalone().await?; + + let daemon = Daemon::from_environment()?; + loop { + if terminate.recv().now_or_never().flatten().is_some() { + return Ok(UpdateLoopControl::Stop); + } + match daemon.try_restart_if_running().await? { + RestartIfRunningOutcome::Completed => return Ok(UpdateLoopControl::Continue), + RestartIfRunningOutcome::Busy => { + if sleep_or_terminate(RESTART_RETRY_INTERVAL, terminate).await { + return Ok(UpdateLoopControl::Stop); + } + } + } + } +} + +#[cfg(unix)] +async fn install_latest_standalone() -> Result<()> { + let script = reqwest::get("https://chatgpt.com/codex/install.sh") + .await + .context("failed to fetch standalone Codex updater")? + .error_for_status() + .context("standalone Codex updater request failed")? + .bytes() + .await + .context("failed to read standalone Codex updater")?; + + let mut child = Command::new("/bin/sh") + .arg("-s") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .context("failed to invoke standalone Codex updater")?; + let mut stdin = child + .stdin + .take() + .context("standalone Codex updater stdin was unavailable")?; + stdin + .write_all(&script) + .await + .context("failed to pass standalone Codex updater to shell")?; + drop(stdin); + let status = child + .wait() + .await + .context("failed to wait for standalone Codex updater")?; + + if status.success() { + Ok(()) + } else { + anyhow::bail!("standalone Codex updater exited with status {status}") + } +} diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index f2a289bf62..120d2e497a 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -22,6 +22,7 @@ anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } clap_complete = { workspace = true } codex-app-server = { workspace = true } +codex-app-server-daemon = { workspace = true } codex-app-server-protocol = { workspace = true } codex-app-server-test-client = { workspace = true } codex-arg0 = { workspace = true } diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index dbe6b7605c..d70e401664 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -3,6 +3,9 @@ use clap::CommandFactory; use clap::Parser; use clap_complete::Shell; use clap_complete::generate; +use codex_app_server_daemon::BootstrapOptions as AppServerBootstrapOptions; +use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand; +use codex_app_server_daemon::RemoteControlMode as AppServerRemoteControlMode; use codex_arg0::Arg0DispatchPaths; use codex_arg0::arg0_dispatch_or_else; use codex_chatgpt::apply_command::ApplyCommand; @@ -469,6 +472,9 @@ struct ExecServerCommand { #[derive(Debug, clap::Subcommand)] #[allow(clippy::enum_variant_names)] enum AppServerSubcommand { + /// Manage the local app-server daemon. + Daemon(AppServerDaemonCommand), + /// Proxy stdio bytes to the running app-server control socket. Proxy(AppServerProxyCommand), @@ -483,6 +489,40 @@ enum AppServerSubcommand { GenerateInternalJsonSchema(GenerateInternalJsonSchemaCommand), } +#[derive(Debug, Args)] +struct AppServerDaemonCommand { + #[command(subcommand)] + subcommand: AppServerDaemonSubcommand, +} + +#[derive(Debug, clap::Subcommand)] +enum AppServerDaemonSubcommand { + /// Install durable local app-server management for SSH-driven use. + Bootstrap(AppServerBootstrapCommand), + + /// Start the local app server daemon if it is not already running. + Start, + + /// Restart the local app server daemon. + Restart, + + /// Enable remote_control for future starts and a currently running managed daemon. + EnableRemoteControl, + + /// Disable remote_control for future starts and a currently running managed daemon. + DisableRemoteControl, + + /// Stop the local app server daemon. + Stop, + + /// Print local CLI and running app-server versions as JSON. + Version, + + /// [internal] Run the detached pid-backed standalone updater loop. + #[clap(hide = true)] + PidUpdateLoop, +} + #[derive(Debug, Args)] struct AppServerProxyCommand { /// Path to the app-server Unix domain socket to connect to. @@ -490,6 +530,13 @@ struct AppServerProxyCommand { socket_path: Option, } +#[derive(Debug, Args)] +struct AppServerBootstrapCommand { + /// Launch the managed app-server with remote_control enabled. + #[arg(long = "remote-control")] + remote_control: bool, +} + #[derive(Debug, Args)] struct GenerateTsCommand { /// Output directory where .ts files will be written @@ -875,6 +922,41 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { ) .await?; } + Some(AppServerSubcommand::Daemon(daemon_cli)) => match daemon_cli.subcommand { + AppServerDaemonSubcommand::Start => { + print_app_server_daemon_output(AppServerLifecycleCommand::Start).await?; + } + AppServerDaemonSubcommand::Bootstrap(bootstrap_cli) => { + let output = + codex_app_server_daemon::bootstrap(AppServerBootstrapOptions { + remote_control_enabled: bootstrap_cli.remote_control, + }) + .await?; + println!("{}", serde_json::to_string(&output)?); + } + AppServerDaemonSubcommand::Restart => { + print_app_server_daemon_output(AppServerLifecycleCommand::Restart).await?; + } + AppServerDaemonSubcommand::EnableRemoteControl => { + print_app_server_remote_control_output(AppServerRemoteControlMode::Enabled) + .await?; + } + AppServerDaemonSubcommand::DisableRemoteControl => { + print_app_server_remote_control_output( + AppServerRemoteControlMode::Disabled, + ) + .await?; + } + AppServerDaemonSubcommand::Stop => { + print_app_server_daemon_output(AppServerLifecycleCommand::Stop).await?; + } + AppServerDaemonSubcommand::Version => { + print_app_server_daemon_output(AppServerLifecycleCommand::Version).await?; + } + AppServerDaemonSubcommand::PidUpdateLoop => { + codex_app_server_daemon::run_pid_update_loop().await?; + } + }, Some(AppServerSubcommand::Proxy(proxy_cli)) => { let socket_path = match proxy_cli.socket_path { Some(socket_path) => socket_path, @@ -1547,6 +1629,20 @@ fn reject_remote_mode_for_app_server_subcommand( ) -> anyhow::Result<()> { let subcommand_name = match subcommand { None => "app-server", + Some(AppServerSubcommand::Daemon(daemon)) => match daemon.subcommand { + AppServerDaemonSubcommand::Bootstrap(_) => "app-server daemon bootstrap", + AppServerDaemonSubcommand::Start => "app-server daemon start", + AppServerDaemonSubcommand::Restart => "app-server daemon restart", + AppServerDaemonSubcommand::EnableRemoteControl => { + "app-server daemon enable-remote-control" + } + AppServerDaemonSubcommand::DisableRemoteControl => { + "app-server daemon disable-remote-control" + } + AppServerDaemonSubcommand::Stop => "app-server daemon stop", + AppServerDaemonSubcommand::Version => "app-server daemon version", + AppServerDaemonSubcommand::PidUpdateLoop => "app-server daemon pid-update-loop", + }, Some(AppServerSubcommand::Proxy(_)) => "app-server proxy", Some(AppServerSubcommand::GenerateTs(_)) => "app-server generate-ts", Some(AppServerSubcommand::GenerateJsonSchema(_)) => "app-server generate-json-schema", @@ -1557,6 +1653,20 @@ fn reject_remote_mode_for_app_server_subcommand( reject_remote_mode_for_subcommand(remote, remote_auth_token_env, subcommand_name) } +async fn print_app_server_daemon_output(command: AppServerLifecycleCommand) -> anyhow::Result<()> { + let output = codex_app_server_daemon::run(command).await?; + println!("{}", serde_json::to_string(&output)?); + Ok(()) +} + +async fn print_app_server_remote_control_output( + mode: AppServerRemoteControlMode, +) -> anyhow::Result<()> { + let output = codex_app_server_daemon::set_remote_control(mode).await?; + println!("{}", serde_json::to_string(&output)?); + Ok(()) +} + fn read_remote_auth_token_from_env_var_with( env_var_name: &str, get_var: F, @@ -2524,6 +2634,70 @@ mod tests { )); } + #[test] + fn app_server_daemon_subcommands_parse() { + assert!(matches!( + app_server_from_args( + [ + "codex", + "app-server", + "daemon", + "bootstrap", + "--remote-control" + ] + .as_ref() + ) + .subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::Bootstrap(AppServerBootstrapCommand { + remote_control: true + }) + })) + )); + assert!(matches!( + app_server_from_args(["codex", "app-server", "daemon", "start"].as_ref()).subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::Start + })) + )); + assert!(matches!( + app_server_from_args(["codex", "app-server", "daemon", "restart"].as_ref()).subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::Restart + })) + )); + assert!(matches!( + app_server_from_args( + ["codex", "app-server", "daemon", "enable-remote-control"].as_ref() + ) + .subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::EnableRemoteControl + })) + )); + assert!(matches!( + app_server_from_args( + ["codex", "app-server", "daemon", "disable-remote-control"].as_ref() + ) + .subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::DisableRemoteControl + })) + )); + assert!(matches!( + app_server_from_args(["codex", "app-server", "daemon", "stop"].as_ref()).subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::Stop + })) + )); + assert!(matches!( + app_server_from_args(["codex", "app-server", "daemon", "version"].as_ref()).subcommand, + Some(AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::Version + })) + )); + } + #[test] fn app_server_proxy_sock_path_parses() { let app_server = @@ -2552,6 +2726,20 @@ mod tests { assert!(err.to_string().contains("app-server proxy")); } + #[test] + fn reject_remote_auth_token_env_for_app_server_version() { + let subcommand = AppServerSubcommand::Daemon(AppServerDaemonCommand { + subcommand: AppServerDaemonSubcommand::Version, + }); + let err = reject_remote_mode_for_app_server_subcommand( + /*remote*/ None, + Some("CODEX_REMOTE_AUTH_TOKEN"), + Some(&subcommand), + ) + .expect_err("app-server daemon version should reject --remote-auth-token-env"); + assert!(err.to_string().contains("app-server daemon version")); + } + #[test] fn app_server_capability_token_flags_parse() { let app_server = app_server_from_args(