[daemon] Add app-server daemon lifecycle management (#20718)

## Why

Desktop and mobile Codex clients need a machine-readable way to
bootstrap and manage `codex app-server` on remote machines reached over
SSH. The same flow is also useful for bringing up app-server with
`remote_control` enabled on a fresh developer machine and keeping that
managed install current without requiring a human session.

## What changed

- add the new experimental `codex-app-server-daemon` crate and wire it
into `codex app-server daemon` lifecycle commands: `start`, `restart`,
`stop`, `version`, and `bootstrap`
- add explicit `enable-remote-control` and `disable-remote-control`
commands that persist the launch setting and restart a running managed
daemon so the change takes effect immediately
- emit JSON success responses for daemon commands so remote callers can
consume them directly
- support a Unix-only pidfile-backed detached backend for lifecycle
management
- assume the standalone `install.sh` layout for daemon-managed binaries
and always launch `CODEX_HOME/packages/standalone/current/codex`
- add bootstrap support for the standalone managed install plus a
detached hourly updater loop
- harden lifecycle management around concurrent operations, pidfile
ownership, stale state cleanup, updater ownership, managed-binary
preflight, Unix-only rejection, forced shutdown after the graceful
window, and updater process-group tracking/cleanup
- document the experimental Unix-only support boundary plus the
standalone bootstrap/update flow in
`codex-rs/app-server-daemon/README.md`

## Verification

- `cargo test -p codex-app-server-daemon -p codex-cli`
- live pid validation on `cb4`: `bootstrap --remote-control`, `restart`,
`version`, `stop`

## Follow-up

- Add updater self-refresh so the long-lived `pid-update-loop` can
replace its own executable image after installing a newer managed Codex
binary.
This commit is contained in:
Ruslan Nigmatullin
2026-05-08 16:51:16 -07:00
committed by GitHub
parent faa5d4a5e2
commit 0c8d42525e
16 changed files with 2190 additions and 0 deletions

21
codex-rs/Cargo.lock generated
View File

@@ -1979,6 +1979,26 @@ dependencies = [
"url",
]
[[package]]
name = "codex-app-server-daemon"
version = "0.0.0"
dependencies = [
"anyhow",
"codex-app-server-protocol",
"codex-app-server-transport",
"codex-core",
"codex-uds",
"futures",
"libc",
"pretty_assertions",
"reqwest",
"serde",
"serde_json",
"tempfile",
"tokio",
"tokio-tungstenite",
]
[[package]]
name = "codex-app-server-protocol"
version = "0.0.0"
@@ -2206,6 +2226,7 @@ dependencies = [
"clap",
"clap_complete",
"codex-app-server",
"codex-app-server-daemon",
"codex-app-server-protocol",
"codex-app-server-test-client",
"codex-arg0",

View File

@@ -11,6 +11,7 @@ members = [
"async-utils",
"app-server",
"app-server-transport",
"app-server-daemon",
"app-server-client",
"app-server-protocol",
"app-server-test-client",
@@ -131,6 +132,7 @@ codex-api = { path = "codex-api" }
codex-aws-auth = { path = "aws-auth" }
codex-app-server = { path = "app-server" }
codex-app-server-transport = { path = "app-server-transport" }
codex-app-server-daemon = { path = "app-server-daemon" }
codex-app-server-client = { path = "app-server-client" }
codex-app-server-protocol = { path = "app-server-protocol" }
codex-app-server-test-client = { path = "app-server-test-client" }

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "app-server-daemon",
crate_name = "codex_app_server_daemon",
)

View File

@@ -0,0 +1,39 @@
[package]
name = "codex-app-server-daemon"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
name = "codex_app_server_daemon"
path = "src/lib.rs"
doctest = false
[lints]
workspace = true
[dependencies]
anyhow = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-app-server-transport = { workspace = true }
codex-core = { workspace = true }
codex-uds = { workspace = true }
futures = { workspace = true }
libc = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = [
"fs",
"io-util",
"macros",
"process",
"rt-multi-thread",
"signal",
"time",
] }
tokio-tungstenite = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View File

@@ -0,0 +1,104 @@
# codex-app-server-daemon
> `codex-app-server-daemon` is experimental and its lifecycle contract may
> change while the remote-management flow is still being developed.
`codex-app-server-daemon` backs the machine-readable `codex app-server`
lifecycle commands used by remote clients such as the desktop and mobile apps.
It is intended for Codex instances launched over SSH, including fresh developer
machines that should expose app-server with `remote_control` enabled.
## Platform support
The current daemon implementation is Unix-only. It uses pidfile-backed
daemonization plus Unix process and file-locking primitives, and does not yet
support Windows lifecycle management.
## Commands
```sh
codex app-server daemon start
codex app-server daemon restart
codex app-server daemon enable-remote-control
codex app-server daemon disable-remote-control
codex app-server daemon stop
codex app-server daemon version
codex app-server daemon bootstrap --remote-control
```
On success, every command writes exactly one JSON object to stdout. Consumers
should parse that JSON rather than relying on human-readable text. Lifecycle
responses report the resolved backend, socket path, local CLI version, and
running app-server version when applicable.
## Bootstrap flow
For a new remote machine:
```sh
curl -fsSL https://chatgpt.com/codex/install.sh | sh
$HOME/.codex/packages/standalone/current/codex app-server daemon bootstrap --remote-control
```
`bootstrap` requires the standalone managed install. It records the daemon
settings under `CODEX_HOME/app-server-daemon/`, starts app-server as a
pidfile-backed detached process, and launches a detached updater loop.
## Installation and update cases
The daemon assumes Codex is installed through `install.sh` and always launches
the standalone managed binary under `CODEX_HOME`.
| Situation | What starts | Does this daemon fetch new binaries? | Does a running app-server eventually move to a newer binary on its own? |
| --- | --- | --- | --- |
| `install.sh` has run, but only `start` is used | `start` uses `CODEX_HOME/packages/standalone/current/codex` | No | No. The managed path is used when starting or restarting, but no updater is installed. |
| `install.sh` has run, then `bootstrap` is used | The pidfile backend uses `CODEX_HOME/packages/standalone/current/codex` | Yes. Bootstrap launches a detached updater loop that runs `install.sh` hourly. | Yes, while that updater process is alive. After a successful fetch, it restarts a currently running app-server only when the managed binary reports a different version. |
| Some other tool updates the managed binary path | The next fresh start or restart uses the updated file at that path | No | Not automatically. The existing process keeps the old executable image until an explicit `restart`. |
### Standalone installs
For installs created by `install.sh`:
- lifecycle commands always use the standalone managed binary path
- `bootstrap` is supported
- `bootstrap` starts a detached pid-backed updater loop that fetches via
`install.sh`, then restarts app-server if it is running on a different version
- the updater loop is not reboot-persistent; it must be started again by
rerunning `bootstrap` after a reboot
### Out-of-band updates
This daemon does not watch arbitrary executable files for replacement. If some
other tool updates a binary that the daemon would use on its next launch:
- a currently running app-server remains on the old executable image
- `restart` will launch the updated binary
- for bootstrapped daemons, the detached updater loop only reacts to updates it
fetched itself; it does not watch arbitrary file replacement
## Lifecycle semantics
`start` is idempotent and returns after app-server is ready to answer the normal
JSON-RPC initialize handshake on the Unix control socket.
`restart` stops any managed daemon and starts it again.
`enable-remote-control` and `disable-remote-control` persist the launch setting
for future starts. If a managed app-server is already running, they restart it
so the new setting takes effect immediately.
`stop` sends a graceful termination request first, then sends a second
termination signal after the grace window if the process is still alive.
All mutating lifecycle commands are serialized per `CODEX_HOME`, so a concurrent
`start`, `restart`, `enable-remote-control`, `disable-remote-control`, `stop`,
or `bootstrap` does not race another in-flight lifecycle operation.
## State
The daemon stores its local state under `CODEX_HOME/app-server-daemon/`:
- `settings.json` for persisted launch settings
- `app-server.pid` for the app-server process record
- `app-server-updater.pid` for the pid-backed standalone updater loop
- `daemon.lock` for daemon-wide lifecycle serialization

View File

@@ -0,0 +1,33 @@
mod pid;
use std::path::PathBuf;
use serde::Serialize;
pub(crate) use pid::PidBackend;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum BackendKind {
Pid,
}
#[derive(Debug, Clone)]
pub(crate) struct BackendPaths {
pub(crate) codex_bin: PathBuf,
pub(crate) pid_file: PathBuf,
pub(crate) update_pid_file: PathBuf,
pub(crate) remote_control_enabled: bool,
}
pub(crate) fn pid_backend(paths: BackendPaths) -> PidBackend {
PidBackend::new(
paths.codex_bin,
paths.pid_file,
paths.remote_control_enabled,
)
}
pub(crate) fn pid_update_loop_backend(paths: BackendPaths) -> PidBackend {
PidBackend::new_update_loop(paths.codex_bin, paths.update_pid_file)
}

View File

@@ -0,0 +1,600 @@
use std::path::Path;
use std::path::PathBuf;
#[cfg(unix)]
use std::process::Stdio;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use serde::Deserialize;
use serde::Serialize;
use tokio::fs;
#[cfg(unix)]
use tokio::process::Command;
use tokio::time::sleep;
const STOP_POLL_INTERVAL: Duration = Duration::from_millis(50);
const STOP_GRACE_PERIOD: Duration = Duration::from_secs(60);
const STOP_TIMEOUT: Duration = Duration::from_secs(70);
const START_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug)]
#[cfg_attr(not(unix), allow(dead_code))]
pub(crate) struct PidBackend {
codex_bin: PathBuf,
pid_file: PathBuf,
lock_file: PathBuf,
command_kind: PidCommandKind,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PidRecord {
pid: u32,
process_start_time: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum PidFileState {
Missing,
Starting,
Running(PidRecord),
}
#[derive(Debug, Clone, Copy)]
#[cfg_attr(not(unix), allow(dead_code))]
enum PidCommandKind {
AppServer { remote_control_enabled: bool },
UpdateLoop,
}
impl PidBackend {
pub(crate) fn new(codex_bin: PathBuf, pid_file: PathBuf, remote_control_enabled: bool) -> Self {
let lock_file = pid_file.with_extension("pid.lock");
Self {
codex_bin,
pid_file,
lock_file,
command_kind: PidCommandKind::AppServer {
remote_control_enabled,
},
}
}
pub(crate) fn new_update_loop(codex_bin: PathBuf, pid_file: PathBuf) -> Self {
let lock_file = pid_file.with_extension("pid.lock");
Self {
codex_bin,
pid_file,
lock_file,
command_kind: PidCommandKind::UpdateLoop,
}
}
pub(crate) async fn is_starting_or_running(&self) -> Result<bool> {
loop {
match self.read_pid_file_state().await? {
PidFileState::Missing => return Ok(false),
PidFileState::Starting => return Ok(true),
PidFileState::Running(record) => {
if self.record_is_active(&record).await? {
return Ok(true);
}
match self.refresh_after_stale_record(&record).await? {
PidFileState::Missing => return Ok(false),
PidFileState::Starting | PidFileState::Running(_) => continue,
}
}
}
}
}
#[cfg(unix)]
pub(crate) async fn start(&self) -> Result<Option<u32>> {
if let Some(parent) = self.pid_file.parent() {
fs::create_dir_all(parent)
.await
.with_context(|| format!("failed to create pid directory {}", parent.display()))?;
}
let reservation_lock = self.acquire_reservation_lock().await?;
let _pid_file = loop {
match fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&self.pid_file)
.await
{
Ok(pid_file) => break pid_file,
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
match self.read_pid_file_state_with_lock_held().await? {
PidFileState::Missing => continue,
PidFileState::Running(record) => {
if self.record_is_active(&record).await? {
return Ok(None);
}
let _ = fs::remove_file(&self.pid_file).await;
continue;
}
PidFileState::Starting => {
unreachable!("lock holder cannot observe starting")
}
}
}
Err(err) => {
return Err(err).with_context(|| {
format!("failed to reserve pid file {}", self.pid_file.display())
});
}
}
};
let mut command = Command::new(&self.codex_bin);
command
.args(self.command_args())
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null());
#[cfg(unix)]
{
unsafe {
command.pre_exec(|| {
if libc::setsid() == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
}
let child = match command.spawn() {
Ok(child) => child,
Err(err) => {
let _ = fs::remove_file(&self.pid_file).await;
return Err(err).with_context(|| {
format!(
"failed to spawn detached app-server process using {}",
self.codex_bin.display()
)
});
}
};
let pid = child
.id()
.context("spawned app-server process has no pid")?;
let record = match read_process_start_time(pid).await {
Ok(process_start_time) => PidRecord {
pid,
process_start_time,
},
Err(err) => {
let _ = self.terminate_process(pid);
let _ = fs::remove_file(&self.pid_file).await;
return Err(err);
}
};
let contents = serde_json::to_vec(&record).context("failed to serialize pid record")?;
let temp_pid_file = self.pid_file.with_extension("pid.tmp");
if let Err(err) = fs::write(&temp_pid_file, &contents).await {
let _ = self.terminate_process(pid);
let _ = fs::remove_file(&self.pid_file).await;
return Err(err).with_context(|| {
format!("failed to write pid temp file {}", temp_pid_file.display())
});
}
if let Err(err) = fs::rename(&temp_pid_file, &self.pid_file).await {
let _ = self.terminate_process(pid);
let _ = fs::remove_file(&temp_pid_file).await;
let _ = fs::remove_file(&self.pid_file).await;
return Err(err).with_context(|| {
format!("failed to publish pid file {}", self.pid_file.display())
});
}
drop(reservation_lock);
Ok(Some(pid))
}
#[cfg(not(unix))]
pub(crate) async fn start(&self) -> Result<Option<u32>> {
bail!("pid-managed app-server startup is unsupported on this platform")
}
pub(crate) async fn stop(&self) -> Result<()> {
loop {
let Some(record) = self.wait_for_pid_start().await? else {
return Ok(());
};
if !self.record_is_active(&record).await? {
match self.refresh_after_stale_record(&record).await? {
PidFileState::Missing => return Ok(()),
PidFileState::Starting | PidFileState::Running(_) => continue,
}
}
let pid = record.pid;
self.terminate_process(pid)?;
let started_at = tokio::time::Instant::now();
let deadline = tokio::time::Instant::now() + STOP_TIMEOUT;
let mut forced = false;
while tokio::time::Instant::now() < deadline {
if !self.record_is_active(&record).await? {
match self.refresh_after_stale_record(&record).await? {
PidFileState::Missing => return Ok(()),
PidFileState::Starting | PidFileState::Running(_) => break,
}
}
if !forced && started_at.elapsed() >= STOP_GRACE_PERIOD {
self.force_terminate_process(pid)?;
forced = true;
}
sleep(STOP_POLL_INTERVAL).await;
}
if self.record_is_active(&record).await? {
bail!("timed out waiting for pid-managed app server {pid} to stop");
}
}
}
async fn wait_for_pid_start(&self) -> Result<Option<PidRecord>> {
let deadline = tokio::time::Instant::now() + START_TIMEOUT;
loop {
match self.read_pid_file_state().await? {
PidFileState::Missing => return Ok(None),
PidFileState::Running(record) => return Ok(Some(record)),
PidFileState::Starting if tokio::time::Instant::now() < deadline => {
sleep(STOP_POLL_INTERVAL).await;
}
PidFileState::Starting => {
bail!(
"timed out waiting for pid reservation in {} to finish initializing",
self.pid_file.display()
);
}
}
}
}
async fn read_pid_file_state(&self) -> Result<PidFileState> {
let contents = match fs::read_to_string(&self.pid_file).await {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return if reservation_lock_is_active(&self.lock_file).await? {
Ok(PidFileState::Starting)
} else {
Ok(PidFileState::Missing)
};
}
Err(err) => {
return Err(err).with_context(|| {
format!("failed to read pid file {}", self.pid_file.display())
});
}
};
if contents.trim().is_empty() {
match inspect_empty_pid_reservation(&self.pid_file, &self.lock_file).await? {
EmptyPidReservation::Active => {
return Ok(PidFileState::Starting);
}
EmptyPidReservation::Stale => {
return Ok(PidFileState::Missing);
}
EmptyPidReservation::Record(record) => return Ok(PidFileState::Running(record)),
}
}
let record = serde_json::from_str(&contents)
.with_context(|| format!("invalid pid file contents in {}", self.pid_file.display()))?;
Ok(PidFileState::Running(record))
}
async fn read_pid_file_state_with_lock_held(&self) -> Result<PidFileState> {
let contents = match fs::read_to_string(&self.pid_file).await {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(PidFileState::Missing);
}
Err(err) => {
return Err(err).with_context(|| {
format!("failed to read pid file {}", self.pid_file.display())
});
}
};
if contents.trim().is_empty() {
let _ = fs::remove_file(&self.pid_file).await;
return Ok(PidFileState::Missing);
}
let record = serde_json::from_str(&contents)
.with_context(|| format!("invalid pid file contents in {}", self.pid_file.display()))?;
Ok(PidFileState::Running(record))
}
async fn refresh_after_stale_record(&self, expected: &PidRecord) -> Result<PidFileState> {
let reservation_lock = self.acquire_reservation_lock().await?;
let state = match self.read_pid_file_state_with_lock_held().await? {
PidFileState::Running(record) if record == *expected => {
let _ = fs::remove_file(&self.pid_file).await;
PidFileState::Missing
}
state => state,
};
drop(reservation_lock);
Ok(state)
}
async fn acquire_reservation_lock(&self) -> Result<fs::File> {
let reservation_lock = fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&self.lock_file)
.await
.with_context(|| {
format!("failed to open pid lock file {}", self.lock_file.display())
})?;
let lock_deadline = tokio::time::Instant::now() + START_TIMEOUT;
while !try_lock_file(&reservation_lock)? {
if tokio::time::Instant::now() >= lock_deadline {
bail!(
"timed out waiting for pid lock {}",
self.lock_file.display()
);
}
sleep(STOP_POLL_INTERVAL).await;
}
Ok(reservation_lock)
}
#[cfg(unix)]
fn command_args(&self) -> Vec<&'static str> {
match self.command_kind {
PidCommandKind::AppServer {
remote_control_enabled: true,
} => vec![
"--enable",
"remote_control",
"app-server",
"--listen",
"unix://",
],
PidCommandKind::AppServer {
remote_control_enabled: false,
} => vec!["app-server", "--listen", "unix://"],
PidCommandKind::UpdateLoop => vec!["app-server", "daemon", "pid-update-loop"],
}
}
fn terminate_process(&self, pid: u32) -> Result<()> {
match self.command_kind {
PidCommandKind::AppServer { .. } => terminate_process(pid),
PidCommandKind::UpdateLoop => terminate_process(pid),
}
}
fn force_terminate_process(&self, pid: u32) -> Result<()> {
match self.command_kind {
PidCommandKind::AppServer { .. } => force_terminate_process(pid),
PidCommandKind::UpdateLoop => force_terminate_process_group(pid),
}
}
async fn record_is_active(&self, record: &PidRecord) -> Result<bool> {
process_matches_record(record).await
}
}
#[cfg(unix)]
fn process_exists(pid: u32) -> bool {
let Ok(pid) = libc::pid_t::try_from(pid) else {
return false;
};
let result = unsafe { libc::kill(pid, 0) };
result == 0 || std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
#[cfg(unix)]
fn terminate_process(pid: u32) -> Result<()> {
let raw_pid = libc::pid_t::try_from(pid)
.with_context(|| format!("pid-managed app server pid {pid} is out of range"))?;
let result = unsafe { libc::kill(raw_pid, libc::SIGTERM) };
if result == 0 {
return Ok(());
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
return Ok(());
}
Err(err).with_context(|| format!("failed to terminate pid-managed app server {pid}"))
}
#[cfg(unix)]
fn force_terminate_process(pid: u32) -> Result<()> {
let raw_pid = libc::pid_t::try_from(pid)
.with_context(|| format!("pid-managed app server pid {pid} is out of range"))?;
let result = unsafe { libc::kill(raw_pid, libc::SIGKILL) };
if result == 0 {
return Ok(());
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
return Ok(());
}
Err(err).with_context(|| format!("failed to force terminate pid-managed app server {pid}"))
}
#[cfg(unix)]
fn force_terminate_process_group(pid: u32) -> Result<()> {
let raw_pid = libc::pid_t::try_from(pid)
.with_context(|| format!("pid-managed updater pid {pid} is out of range"))?;
let result = unsafe { libc::kill(-raw_pid, libc::SIGKILL) };
if result == 0 {
return Ok(());
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
return Ok(());
}
Err(err).with_context(|| format!("failed to force terminate pid-managed updater group {pid}"))
}
#[cfg(not(unix))]
fn terminate_process(_pid: u32) -> Result<()> {
bail!("pid-managed app-server shutdown is unsupported on this platform")
}
#[cfg(not(unix))]
fn force_terminate_process(_pid: u32) -> Result<()> {
bail!("pid-managed app-server shutdown is unsupported on this platform")
}
#[cfg(not(unix))]
fn force_terminate_process_group(_pid: u32) -> Result<()> {
bail!("pid-managed updater shutdown is unsupported on this platform")
}
#[cfg(unix)]
async fn process_matches_record(record: &PidRecord) -> Result<bool> {
if !process_exists(record.pid) {
return Ok(false);
}
match read_process_start_time(record.pid).await {
Ok(start_time) => Ok(start_time == record.process_start_time),
Err(_err) if !process_exists(record.pid) => Ok(false),
Err(err) => Err(err),
}
}
#[cfg(not(unix))]
async fn process_matches_record(_record: &PidRecord) -> Result<bool> {
Ok(false)
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(not(unix), allow(dead_code))]
enum EmptyPidReservation {
Active,
Stale,
Record(PidRecord),
}
#[cfg(unix)]
fn try_lock_file(file: &fs::File) -> Result<bool> {
use std::os::fd::AsRawFd;
let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if result == 0 {
return Ok(true);
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EWOULDBLOCK) {
return Ok(false);
}
Err(err).context("failed to lock pid reservation")
}
#[cfg(not(unix))]
fn try_lock_file(_file: &fs::File) -> Result<bool> {
bail!("pid-managed app-server startup is unsupported on this platform")
}
#[cfg(unix)]
async fn reservation_lock_is_active(path: &Path) -> Result<bool> {
let file = match fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)
.await
{
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(false);
}
Err(err) => {
return Err(err)
.with_context(|| format!("failed to inspect pid lock file {}", path.display()));
}
};
Ok(!try_lock_file(&file)?)
}
#[cfg(not(unix))]
async fn reservation_lock_is_active(_path: &Path) -> Result<bool> {
Ok(false)
}
#[cfg(unix)]
async fn inspect_empty_pid_reservation(
pid_path: &Path,
lock_path: &Path,
) -> Result<EmptyPidReservation> {
let file = match fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(lock_path)
.await
{
Ok(file) => file,
Err(err) => {
return Err(err).with_context(|| {
format!("failed to inspect pid lock file {}", lock_path.display())
});
}
};
if !try_lock_file(&file)? {
return Ok(EmptyPidReservation::Active);
}
let contents = match fs::read_to_string(pid_path).await {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(EmptyPidReservation::Stale);
}
Err(err) => {
return Err(err)
.with_context(|| format!("failed to reread pid file {}", pid_path.display()));
}
};
if contents.trim().is_empty() {
let _ = fs::remove_file(pid_path).await;
return Ok(EmptyPidReservation::Stale);
}
let record = serde_json::from_str(&contents)
.with_context(|| format!("invalid pid file contents in {}", pid_path.display()))?;
Ok(EmptyPidReservation::Record(record))
}
#[cfg(not(unix))]
async fn inspect_empty_pid_reservation(
_pid_path: &Path,
_lock_path: &Path,
) -> Result<EmptyPidReservation> {
Ok(EmptyPidReservation::Stale)
}
#[cfg(unix)]
async fn read_process_start_time(pid: u32) -> Result<String> {
let output = Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "lstart="])
.output()
.await
.context("failed to invoke ps for pid-managed app server")?;
if !output.status.success() {
bail!("failed to read start time for pid-managed app server {pid}");
}
let start_time = String::from_utf8(output.stdout)
.context("pid-managed app server start time was not utf-8")?;
let start_time = start_time.trim();
if start_time.is_empty() {
bail!("pid-managed app server {pid} has no recorded start time");
}
Ok(start_time.to_string())
}
#[cfg(all(test, unix))]
#[path = "pid_tests.rs"]
mod tests;

View File

@@ -0,0 +1,158 @@
use std::time::Duration;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use super::PidBackend;
use super::PidCommandKind;
use super::PidFileState;
use super::PidRecord;
use super::try_lock_file;
#[tokio::test]
async fn locked_empty_pid_file_is_treated_as_active_reservation() {
let temp_dir = TempDir::new().expect("temp dir");
let pid_file = temp_dir.path().join("app-server.pid");
tokio::fs::write(&pid_file, "")
.await
.expect("write pid file");
let backend = PidBackend::new(
temp_dir.path().join("codex"),
pid_file.clone(),
/*remote_control_enabled*/ false,
);
let reservation = tokio::fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&backend.lock_file)
.await
.expect("open pid lock file");
assert!(try_lock_file(&reservation).expect("lock reservation"));
assert_eq!(
backend.read_pid_file_state().await.expect("read pid"),
PidFileState::Starting
);
assert!(pid_file.exists());
}
#[tokio::test]
async fn unlocked_empty_pid_file_is_treated_as_stale_reservation() {
let temp_dir = TempDir::new().expect("temp dir");
let pid_file = temp_dir.path().join("app-server.pid");
tokio::fs::write(&pid_file, "")
.await
.expect("write pid file");
let backend = PidBackend::new(
temp_dir.path().join("codex"),
pid_file.clone(),
/*remote_control_enabled*/ false,
);
assert_eq!(
backend.read_pid_file_state().await.expect("read pid"),
PidFileState::Missing
);
assert!(!pid_file.exists());
}
#[tokio::test]
async fn stop_waits_for_live_reservation_to_resolve() {
let temp_dir = TempDir::new().expect("temp dir");
let pid_file = temp_dir.path().join("app-server.pid");
tokio::fs::write(&pid_file, "")
.await
.expect("write pid file");
let backend = PidBackend::new(
temp_dir.path().join("codex"),
pid_file.clone(),
/*remote_control_enabled*/ false,
);
let reservation = tokio::fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&backend.lock_file)
.await
.expect("open pid lock file");
assert!(try_lock_file(&reservation).expect("lock reservation"));
let cleanup = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
drop(reservation);
tokio::fs::remove_file(pid_file)
.await
.expect("remove pid file");
});
backend.stop().await.expect("stop");
cleanup.await.expect("cleanup task");
}
#[tokio::test]
async fn start_retries_stale_empty_pid_file_under_its_own_lock() {
let temp_dir = TempDir::new().expect("temp dir");
let pid_file = temp_dir.path().join("app-server.pid");
tokio::fs::write(&pid_file, "")
.await
.expect("write pid file");
let backend = PidBackend::new(
temp_dir.path().join("missing-codex"),
pid_file,
/*remote_control_enabled*/ false,
);
let err = backend.start().await.expect_err("start");
assert!(
err.to_string()
.starts_with("failed to spawn detached app-server process using ")
);
}
#[tokio::test]
async fn stale_record_cleanup_preserves_replacement_record() {
let temp_dir = TempDir::new().expect("temp dir");
let pid_file = temp_dir.path().join("app-server.pid");
let backend = PidBackend::new(
temp_dir.path().join("codex"),
pid_file.clone(),
/*remote_control_enabled*/ false,
);
let stale = PidRecord {
pid: 1,
process_start_time: "old".to_string(),
};
let replacement = PidRecord {
pid: 2,
process_start_time: "new".to_string(),
};
tokio::fs::write(
&pid_file,
serde_json::to_vec(&replacement).expect("serialize replacement"),
)
.await
.expect("write replacement pid file");
assert_eq!(
backend
.refresh_after_stale_record(&stale)
.await
.expect("cleanup"),
PidFileState::Running(replacement)
);
}
#[test]
fn update_loop_uses_hidden_app_server_subcommand() {
let backend = PidBackend {
codex_bin: "codex".into(),
pid_file: "updater.pid".into(),
lock_file: "updater.pid.lock".into(),
command_kind: PidCommandKind::UpdateLoop,
};
assert_eq!(
backend.command_args(),
vec!["app-server", "daemon", "pid-update-loop"]
);
}

View File

@@ -0,0 +1,131 @@
use std::path::Path;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::InitializeResponse;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::RequestId;
use codex_uds::UnixStream;
use futures::SinkExt;
use futures::StreamExt;
use tokio::time::timeout;
use tokio_tungstenite::client_async;
use tokio_tungstenite::tungstenite::Message;
const PROBE_TIMEOUT: Duration = Duration::from_secs(2);
const CLIENT_NAME: &str = "codex_app_server_daemon";
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ProbeInfo {
pub(crate) app_server_version: String,
}
pub(crate) async fn probe(socket_path: &Path) -> Result<ProbeInfo> {
timeout(PROBE_TIMEOUT, probe_inner(socket_path))
.await
.with_context(|| {
format!(
"timed out probing app-server control socket {}",
socket_path.display()
)
})?
}
async fn probe_inner(socket_path: &Path) -> Result<ProbeInfo> {
let stream = UnixStream::connect(socket_path)
.await
.with_context(|| format!("failed to connect to {}", socket_path.display()))?;
let (mut websocket, _response) = client_async("ws://localhost/", stream)
.await
.with_context(|| format!("failed to upgrade {}", socket_path.display()))?;
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(serde_json::to_value(InitializeParams {
client_info: ClientInfo {
name: CLIENT_NAME.to_string(),
title: Some("Codex App Server Daemon".to_string()),
version: env!("CARGO_PKG_VERSION").to_string(),
},
capabilities: None,
})?),
trace: None,
});
websocket
.send(Message::Text(serde_json::to_string(&initialize)?.into()))
.await
.context("failed to send initialize request")?;
let response = loop {
let frame = websocket
.next()
.await
.ok_or_else(|| anyhow!("app-server closed before initialize response"))??;
let Message::Text(payload) = frame else {
continue;
};
let message = serde_json::from_str::<JSONRPCMessage>(&payload)?;
if let JSONRPCMessage::Response(response) = message
&& response.id == RequestId::Integer(1)
{
break response;
}
};
let initialize_response = serde_json::from_value::<InitializeResponse>(response.result)?;
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
method: "initialized".to_string(),
params: None,
});
websocket
.send(Message::Text(serde_json::to_string(&initialized)?.into()))
.await
.context("failed to send initialized notification")?;
websocket.close(None).await.ok();
Ok(ProbeInfo {
app_server_version: parse_version_from_user_agent(&initialize_response.user_agent)?,
})
}
fn parse_version_from_user_agent(user_agent: &str) -> Result<String> {
let (_originator, rest) = user_agent
.split_once('/')
.ok_or_else(|| anyhow!("app-server user-agent omitted version separator"))?;
let version = rest
.split_whitespace()
.next()
.filter(|version| !version.is_empty())
.ok_or_else(|| anyhow!("app-server user-agent omitted version"))?;
Ok(version.to_string())
}
#[cfg(all(test, unix))]
mod tests {
use pretty_assertions::assert_eq;
use super::parse_version_from_user_agent;
#[test]
fn parses_version_from_codex_user_agent() {
assert_eq!(
parse_version_from_user_agent(
"codex_app_server_daemon/1.2.3 (Linux 6.8.0; x86_64) codex_cli_rs/1.2.3",
)
.expect("version"),
"1.2.3"
);
}
#[test]
fn rejects_user_agent_without_version() {
assert!(parse_version_from_user_agent("codex_app_server_daemon").is_err());
}
}

View File

@@ -0,0 +1,630 @@
mod backend;
mod client;
mod managed_install;
mod settings;
mod update_loop;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
pub use backend::BackendKind;
use backend::BackendPaths;
use codex_app_server_transport::app_server_control_socket_path;
use codex_core::config::find_codex_home;
use managed_install::managed_codex_bin;
#[cfg(unix)]
use managed_install::managed_codex_version;
use serde::Serialize;
use settings::DaemonSettings;
use tokio::time::sleep;
const START_POLL_INTERVAL: Duration = Duration::from_millis(50);
const START_TIMEOUT: Duration = Duration::from_secs(10);
const OPERATION_LOCK_TIMEOUT: Duration = Duration::from_secs(75);
const PID_FILE_NAME: &str = "app-server.pid";
const UPDATE_PID_FILE_NAME: &str = "app-server-updater.pid";
const OPERATION_LOCK_FILE_NAME: &str = "daemon.lock";
const SETTINGS_FILE_NAME: &str = "settings.json";
const STATE_DIR_NAME: &str = "app-server-daemon";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LifecycleCommand {
Start,
Restart,
Stop,
Version,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum LifecycleStatus {
AlreadyRunning,
Started,
Restarted,
Stopped,
NotRunning,
Running,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct LifecycleOutput {
pub status: LifecycleStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub backend: Option<BackendKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
pub socket_path: PathBuf,
#[serde(skip_serializing_if = "Option::is_none")]
pub cli_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub app_server_version: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BootstrapOptions {
pub remote_control_enabled: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum BootstrapStatus {
Bootstrapped,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BootstrapOutput {
pub status: BootstrapStatus,
pub backend: BackendKind,
pub auto_update_enabled: bool,
pub remote_control_enabled: bool,
pub managed_codex_path: PathBuf,
pub socket_path: PathBuf,
pub cli_version: String,
pub app_server_version: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RemoteControlMode {
Enabled,
Disabled,
}
impl RemoteControlMode {
fn is_enabled(self) -> bool {
matches!(self, Self::Enabled)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum RemoteControlStatus {
Enabled,
Disabled,
AlreadyEnabled,
AlreadyDisabled,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RemoteControlOutput {
pub status: RemoteControlStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub backend: Option<BackendKind>,
pub remote_control_enabled: bool,
pub socket_path: PathBuf,
pub cli_version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub app_server_version: Option<String>,
}
#[cfg(unix)]
pub(crate) enum RestartIfRunningOutcome {
Completed,
Busy,
}
pub async fn run(command: LifecycleCommand) -> Result<LifecycleOutput> {
ensure_supported_platform()?;
Daemon::from_environment()?.run(command).await
}
pub async fn bootstrap(options: BootstrapOptions) -> Result<BootstrapOutput> {
ensure_supported_platform()?;
Daemon::from_environment()?.bootstrap(options).await
}
pub async fn set_remote_control(mode: RemoteControlMode) -> Result<RemoteControlOutput> {
ensure_supported_platform()?;
Daemon::from_environment()?.set_remote_control(mode).await
}
pub async fn run_pid_update_loop() -> Result<()> {
ensure_supported_platform()?;
update_loop::run().await
}
#[cfg(unix)]
fn ensure_supported_platform() -> Result<()> {
Ok(())
}
#[cfg(not(unix))]
fn ensure_supported_platform() -> Result<()> {
Err(anyhow!(
"codex app-server daemon lifecycle is only supported on Unix platforms"
))
}
struct Daemon {
socket_path: PathBuf,
pid_file: PathBuf,
update_pid_file: PathBuf,
operation_lock_file: PathBuf,
settings_file: PathBuf,
managed_codex_bin: PathBuf,
}
impl Daemon {
fn from_environment() -> Result<Self> {
let codex_home = find_codex_home().context("failed to resolve CODEX_HOME")?;
let socket_path = app_server_control_socket_path(codex_home.as_path())?
.as_path()
.to_path_buf();
let state_dir = codex_home.as_path().join(STATE_DIR_NAME);
Ok(Self {
socket_path,
pid_file: state_dir.join(PID_FILE_NAME),
update_pid_file: state_dir.join(UPDATE_PID_FILE_NAME),
operation_lock_file: state_dir.join(OPERATION_LOCK_FILE_NAME),
settings_file: state_dir.join(SETTINGS_FILE_NAME),
managed_codex_bin: managed_codex_bin(codex_home.as_path()),
})
}
async fn run(&self, command: LifecycleCommand) -> Result<LifecycleOutput> {
match command {
LifecycleCommand::Start => {
let _operation_lock = self.acquire_operation_lock().await?;
self.start().await
}
LifecycleCommand::Restart => {
let _operation_lock = self.acquire_operation_lock().await?;
self.restart().await
}
LifecycleCommand::Stop => {
let _operation_lock = self.acquire_operation_lock().await?;
self.stop().await
}
LifecycleCommand::Version => self.version().await,
}
}
async fn start(&self) -> Result<LifecycleOutput> {
let settings = self.load_settings().await?;
if let Ok(info) = client::probe(&self.socket_path).await {
return Ok(self.output(
LifecycleStatus::AlreadyRunning,
self.running_backend(&settings).await?,
/*pid*/ None,
Some(info.app_server_version),
));
}
if self.running_backend_instance(&settings).await?.is_some() {
let info = self.wait_until_ready().await?;
return Ok(self.output(
LifecycleStatus::AlreadyRunning,
Some(BackendKind::Pid),
/*pid*/ None,
Some(info.app_server_version),
));
}
self.ensure_managed_codex_bin()?;
let pid = self.start_managed_backend(&settings).await?;
let info = self.wait_until_ready().await?;
Ok(self.output(
LifecycleStatus::Started,
Some(BackendKind::Pid),
pid,
Some(info.app_server_version),
))
}
async fn restart(&self) -> Result<LifecycleOutput> {
let settings = self.load_settings().await?;
if client::probe(&self.socket_path).await.is_ok()
&& self.running_backend(&settings).await?.is_none()
{
return Err(anyhow!(
"app server is running but is not managed by codex app-server daemon"
));
}
self.ensure_managed_codex_bin()?;
if let Some(backend) = self.running_backend_instance(&settings).await? {
backend.stop().await?;
}
let pid = self.start_managed_backend(&settings).await?;
let info = self.wait_until_ready().await?;
Ok(self.output(
LifecycleStatus::Restarted,
Some(BackendKind::Pid),
pid,
Some(info.app_server_version),
))
}
#[cfg(unix)]
pub(crate) async fn try_restart_if_running(&self) -> Result<RestartIfRunningOutcome> {
let operation_lock = self.open_operation_lock_file().await?;
if !try_lock_file(&operation_lock)? {
return Ok(RestartIfRunningOutcome::Busy);
}
let settings = self.load_settings().await?;
if let Some(backend) = self.running_backend_instance(&settings).await? {
let Ok(info) = client::probe(&self.socket_path).await else {
return Ok(RestartIfRunningOutcome::Completed);
};
let managed_version = managed_codex_version(&self.managed_codex_bin).await?;
if info.app_server_version == managed_version {
return Ok(RestartIfRunningOutcome::Completed);
}
backend.stop().await?;
let _ = self.start_managed_backend(&settings).await?;
self.wait_until_ready().await?;
return Ok(RestartIfRunningOutcome::Completed);
}
if client::probe(&self.socket_path).await.is_ok() {
return Err(anyhow!(
"app server is running but is not managed by codex app-server daemon"
));
}
Ok(RestartIfRunningOutcome::Completed)
}
async fn stop(&self) -> Result<LifecycleOutput> {
let settings = self.load_settings().await?;
if let Some(backend) = self.running_backend_instance(&settings).await? {
backend.stop().await?;
return Ok(self.output(
LifecycleStatus::Stopped,
Some(BackendKind::Pid),
/*pid*/ None,
/*app_server_version*/ None,
));
}
if client::probe(&self.socket_path).await.is_ok() {
return Err(anyhow!(
"app server is running but is not managed by codex app-server daemon"
));
}
Ok(self.output(
LifecycleStatus::NotRunning,
/*backend*/ None,
/*pid*/ None,
/*app_server_version*/ None,
))
}
async fn version(&self) -> Result<LifecycleOutput> {
let settings = self.load_settings().await?;
let info = client::probe(&self.socket_path).await?;
Ok(self.output(
LifecycleStatus::Running,
self.running_backend(&settings).await?,
/*pid*/ None,
Some(info.app_server_version),
))
}
async fn wait_until_ready(&self) -> Result<client::ProbeInfo> {
let deadline = tokio::time::Instant::now() + START_TIMEOUT;
loop {
match client::probe(&self.socket_path).await {
Ok(info) => return Ok(info),
Err(err) if tokio::time::Instant::now() < deadline => {
let _ = err;
sleep(START_POLL_INTERVAL).await;
}
Err(err) => {
return Err(err).with_context(|| {
format!(
"app server did not become ready on {}",
self.socket_path.display()
)
});
}
}
}
}
async fn bootstrap(&self, options: BootstrapOptions) -> Result<BootstrapOutput> {
let _operation_lock = self.acquire_operation_lock().await?;
self.bootstrap_locked(options).await
}
async fn set_remote_control(&self, mode: RemoteControlMode) -> Result<RemoteControlOutput> {
let _operation_lock = self.acquire_operation_lock().await?;
let previous_settings = self.load_settings().await?;
let mut settings = previous_settings.clone();
let remote_control_enabled = mode.is_enabled();
let backend = self.running_backend_instance(&previous_settings).await?;
if backend.is_none() && client::probe(&self.socket_path).await.is_ok() {
return Err(anyhow!(
"app server is running but is not managed by codex app-server daemon"
));
}
if settings.remote_control_enabled == remote_control_enabled {
let info = if backend.is_some() {
Some(self.wait_until_ready().await?)
} else {
None
};
return Ok(self.remote_control_output(
already_remote_control_status(mode),
backend.map(|_| BackendKind::Pid),
remote_control_enabled,
info.map(|info| info.app_server_version),
));
}
settings.remote_control_enabled = remote_control_enabled;
settings.save(&self.settings_file).await?;
let app_server_version = if let Some(backend) = backend {
self.ensure_managed_codex_bin()?;
backend.stop().await?;
let _ = self.start_managed_backend(&settings).await?;
Some(self.wait_until_ready().await?.app_server_version)
} else {
None
};
Ok(self.remote_control_output(
remote_control_status(mode),
app_server_version.as_ref().map(|_| BackendKind::Pid),
remote_control_enabled,
app_server_version,
))
}
async fn bootstrap_locked(&self, options: BootstrapOptions) -> Result<BootstrapOutput> {
self.ensure_managed_codex_bin()?;
let settings = DaemonSettings {
remote_control_enabled: options.remote_control_enabled,
};
if client::probe(&self.socket_path).await.is_ok()
&& self.running_backend(&settings).await?.is_none()
{
return Err(anyhow!(
"app server is running but is not managed by codex app-server daemon"
));
}
settings.save(&self.settings_file).await?;
if let Some(backend) = self.running_backend_instance(&settings).await? {
backend.stop().await?;
}
let backend = backend::pid_backend(self.backend_paths(&settings));
backend.start().await?;
let updater = backend::pid_update_loop_backend(self.backend_paths(&settings));
if updater.is_starting_or_running().await? {
updater.stop().await?;
}
updater.start().await?;
let info = self.wait_until_ready().await?;
Ok(BootstrapOutput {
status: BootstrapStatus::Bootstrapped,
backend: BackendKind::Pid,
auto_update_enabled: true,
remote_control_enabled: settings.remote_control_enabled,
managed_codex_path: self.managed_codex_bin.clone(),
socket_path: self.socket_path.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
app_server_version: info.app_server_version,
})
}
async fn running_backend(&self, settings: &DaemonSettings) -> Result<Option<BackendKind>> {
Ok(self
.running_backend_instance(settings)
.await?
.map(|_| BackendKind::Pid))
}
async fn running_backend_instance(
&self,
settings: &DaemonSettings,
) -> Result<Option<backend::PidBackend>> {
let backend = backend::pid_backend(self.backend_paths(settings));
if backend.is_starting_or_running().await? {
return Ok(Some(backend));
}
Ok(None)
}
async fn start_managed_backend(&self, settings: &DaemonSettings) -> Result<Option<u32>> {
let backend = backend::pid_backend(self.backend_paths(settings));
backend.start().await
}
fn ensure_managed_codex_bin(&self) -> Result<()> {
if self.managed_codex_bin.is_file() {
return Ok(());
}
Err(anyhow!(
"managed standalone Codex install not found at {}; install Codex first",
self.managed_codex_bin.display()
))
}
fn backend_paths(&self, settings: &DaemonSettings) -> BackendPaths {
BackendPaths {
codex_bin: self.managed_codex_bin.clone(),
pid_file: self.pid_file.clone(),
update_pid_file: self.update_pid_file.clone(),
remote_control_enabled: settings.remote_control_enabled,
}
}
async fn load_settings(&self) -> Result<DaemonSettings> {
DaemonSettings::load(&self.settings_file).await
}
async fn acquire_operation_lock(&self) -> Result<tokio::fs::File> {
let operation_lock = self.open_operation_lock_file().await?;
let deadline = tokio::time::Instant::now() + OPERATION_LOCK_TIMEOUT;
while !try_lock_file(&operation_lock)? {
if tokio::time::Instant::now() >= deadline {
return Err(anyhow!(
"timed out waiting for daemon operation lock {}",
self.operation_lock_file.display()
));
}
sleep(START_POLL_INTERVAL).await;
}
Ok(operation_lock)
}
async fn open_operation_lock_file(&self) -> Result<tokio::fs::File> {
if let Some(parent) = self.operation_lock_file.parent() {
tokio::fs::create_dir_all(parent).await.with_context(|| {
format!(
"failed to create daemon state directory {}",
parent.display()
)
})?;
}
tokio::fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&self.operation_lock_file)
.await
.with_context(|| {
format!(
"failed to open daemon operation lock {}",
self.operation_lock_file.display()
)
})
}
fn output(
&self,
status: LifecycleStatus,
backend: Option<BackendKind>,
pid: Option<u32>,
app_server_version: Option<String>,
) -> LifecycleOutput {
LifecycleOutput {
status,
backend,
pid,
socket_path: self.socket_path.clone(),
cli_version: Some(env!("CARGO_PKG_VERSION").to_string()),
app_server_version,
}
}
fn remote_control_output(
&self,
status: RemoteControlStatus,
backend: Option<BackendKind>,
remote_control_enabled: bool,
app_server_version: Option<String>,
) -> RemoteControlOutput {
RemoteControlOutput {
status,
backend,
remote_control_enabled,
socket_path: self.socket_path.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
app_server_version,
}
}
}
fn remote_control_status(mode: RemoteControlMode) -> RemoteControlStatus {
match mode {
RemoteControlMode::Enabled => RemoteControlStatus::Enabled,
RemoteControlMode::Disabled => RemoteControlStatus::Disabled,
}
}
fn already_remote_control_status(mode: RemoteControlMode) -> RemoteControlStatus {
match mode {
RemoteControlMode::Enabled => RemoteControlStatus::AlreadyEnabled,
RemoteControlMode::Disabled => RemoteControlStatus::AlreadyDisabled,
}
}
#[cfg(unix)]
fn try_lock_file(file: &tokio::fs::File) -> Result<bool> {
use std::os::fd::AsRawFd;
let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if result == 0 {
return Ok(true);
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EWOULDBLOCK) {
return Ok(false);
}
Err(err).context("failed to lock daemon operation")
}
#[cfg(not(unix))]
fn try_lock_file(_file: &tokio::fs::File) -> Result<bool> {
Ok(true)
}
#[cfg(all(test, unix))]
mod tests {
use pretty_assertions::assert_eq;
use super::BootstrapStatus;
use super::LifecycleStatus;
use super::RemoteControlStatus;
#[test]
fn lifecycle_status_uses_camel_case_json() {
assert_eq!(
serde_json::to_string(&LifecycleStatus::AlreadyRunning).expect("serialize"),
"\"alreadyRunning\""
);
}
#[test]
fn bootstrap_status_uses_camel_case_json() {
assert_eq!(
serde_json::to_string(&BootstrapStatus::Bootstrapped).expect("serialize"),
"\"bootstrapped\""
);
}
#[test]
fn remote_control_status_uses_camel_case_json() {
assert_eq!(
serde_json::to_string(&RemoteControlStatus::AlreadyEnabled).expect("serialize"),
"\"alreadyEnabled\""
);
}
}

View File

@@ -0,0 +1,66 @@
use std::path::Path;
use std::path::PathBuf;
#[cfg(unix)]
use anyhow::Context;
#[cfg(unix)]
use anyhow::Result;
#[cfg(unix)]
use anyhow::anyhow;
#[cfg(unix)]
use tokio::process::Command;
pub(crate) fn managed_codex_bin(codex_home: &Path) -> PathBuf {
codex_home
.join("packages")
.join("standalone")
.join("current")
.join(managed_codex_file_name())
}
#[cfg(unix)]
pub(crate) async fn managed_codex_version(codex_bin: &Path) -> Result<String> {
let output = Command::new(codex_bin)
.arg("--version")
.output()
.await
.with_context(|| {
format!(
"failed to invoke managed Codex binary {}",
codex_bin.display()
)
})?;
if !output.status.success() {
return Err(anyhow!(
"managed Codex binary {} exited with status {}",
codex_bin.display(),
output.status
));
}
let stdout = String::from_utf8(output.stdout).with_context(|| {
format!(
"managed Codex version was not utf-8: {}",
codex_bin.display()
)
})?;
parse_codex_version(&stdout)
}
fn managed_codex_file_name() -> &'static str {
if cfg!(windows) { "codex.exe" } else { "codex" }
}
#[cfg(unix)]
fn parse_codex_version(output: &str) -> Result<String> {
let version = output
.split_whitespace()
.nth(1)
.filter(|version| !version.is_empty())
.ok_or_else(|| anyhow!("managed Codex version output was malformed"))?;
Ok(version.to_string())
}
#[cfg(all(test, unix))]
#[path = "managed_install_tests.rs"]
mod tests;

View File

@@ -0,0 +1,16 @@
use pretty_assertions::assert_eq;
use super::parse_codex_version;
#[test]
fn parses_codex_cli_version_output() {
assert_eq!(
parse_codex_version("codex 1.2.3\n").expect("version"),
"1.2.3"
);
}
#[test]
fn rejects_malformed_codex_cli_version_output() {
assert!(parse_codex_version("codex\n").is_err());
}

View File

@@ -0,0 +1,63 @@
use std::path::Path;
use anyhow::Context;
use anyhow::Result;
use serde::Deserialize;
use serde::Serialize;
use tokio::fs;
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct DaemonSettings {
pub(crate) remote_control_enabled: bool,
}
impl DaemonSettings {
pub(crate) async fn load(path: &Path) -> Result<Self> {
let contents = match fs::read_to_string(path).await {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Self::default()),
Err(err) => {
return Err(err)
.with_context(|| format!("failed to read daemon settings {}", path.display()));
}
};
serde_json::from_str(&contents)
.with_context(|| format!("failed to parse daemon settings {}", path.display()))
}
pub(crate) async fn save(&self, path: &Path) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await.with_context(|| {
format!(
"failed to create daemon settings directory {}",
parent.display()
)
})?;
}
let contents = serde_json::to_vec_pretty(self).context("failed to serialize settings")?;
fs::write(path, contents)
.await
.with_context(|| format!("failed to write daemon settings {}", path.display()))
}
}
#[cfg(all(test, unix))]
mod tests {
use pretty_assertions::assert_eq;
use super::DaemonSettings;
#[test]
fn daemon_settings_use_camel_case_json() {
assert_eq!(
serde_json::to_string(&DaemonSettings {
remote_control_enabled: true,
})
.expect("serialize"),
r#"{"remoteControlEnabled":true}"#
);
}
}

View File

@@ -0,0 +1,132 @@
#[cfg(unix)]
use std::process::Stdio;
#[cfg(unix)]
use std::time::Duration;
#[cfg(unix)]
use anyhow::Context;
use anyhow::Result;
#[cfg(not(unix))]
use anyhow::bail;
#[cfg(unix)]
use futures::FutureExt;
#[cfg(unix)]
use tokio::io::AsyncWriteExt;
#[cfg(unix)]
use tokio::process::Command;
#[cfg(unix)]
use tokio::signal::unix::Signal;
#[cfg(unix)]
use tokio::signal::unix::SignalKind;
#[cfg(unix)]
use tokio::signal::unix::signal;
#[cfg(unix)]
use tokio::time::sleep;
#[cfg(unix)]
use crate::Daemon;
#[cfg(unix)]
use crate::RestartIfRunningOutcome;
#[cfg(unix)]
const INITIAL_UPDATE_DELAY: Duration = Duration::from_secs(5 * 60);
#[cfg(unix)]
const RESTART_RETRY_INTERVAL: Duration = Duration::from_millis(50);
#[cfg(unix)]
const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
#[cfg(unix)]
pub(crate) async fn run() -> Result<()> {
let mut terminate =
signal(SignalKind::terminate()).context("failed to install updater shutdown handler")?;
if sleep_or_terminate(INITIAL_UPDATE_DELAY, &mut terminate).await {
return Ok(());
}
loop {
match update_once(&mut terminate).await {
Ok(UpdateLoopControl::Continue) | Err(_) => {}
Ok(UpdateLoopControl::Stop) => return Ok(()),
}
if sleep_or_terminate(UPDATE_INTERVAL, &mut terminate).await {
return Ok(());
}
}
}
#[cfg(not(unix))]
pub(crate) async fn run() -> Result<()> {
bail!("pid-managed updater loop is unsupported on this platform")
}
#[cfg(unix)]
async fn sleep_or_terminate(duration: Duration, terminate: &mut Signal) -> bool {
tokio::select! {
_ = sleep(duration) => false,
_ = terminate.recv() => true,
}
}
#[cfg(unix)]
enum UpdateLoopControl {
Continue,
Stop,
}
#[cfg(unix)]
async fn update_once(terminate: &mut Signal) -> Result<UpdateLoopControl> {
install_latest_standalone().await?;
let daemon = Daemon::from_environment()?;
loop {
if terminate.recv().now_or_never().flatten().is_some() {
return Ok(UpdateLoopControl::Stop);
}
match daemon.try_restart_if_running().await? {
RestartIfRunningOutcome::Completed => return Ok(UpdateLoopControl::Continue),
RestartIfRunningOutcome::Busy => {
if sleep_or_terminate(RESTART_RETRY_INTERVAL, terminate).await {
return Ok(UpdateLoopControl::Stop);
}
}
}
}
}
#[cfg(unix)]
async fn install_latest_standalone() -> Result<()> {
let script = reqwest::get("https://chatgpt.com/codex/install.sh")
.await
.context("failed to fetch standalone Codex updater")?
.error_for_status()
.context("standalone Codex updater request failed")?
.bytes()
.await
.context("failed to read standalone Codex updater")?;
let mut child = Command::new("/bin/sh")
.arg("-s")
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.context("failed to invoke standalone Codex updater")?;
let mut stdin = child
.stdin
.take()
.context("standalone Codex updater stdin was unavailable")?;
stdin
.write_all(&script)
.await
.context("failed to pass standalone Codex updater to shell")?;
drop(stdin);
let status = child
.wait()
.await
.context("failed to wait for standalone Codex updater")?;
if status.success() {
Ok(())
} else {
anyhow::bail!("standalone Codex updater exited with status {status}")
}
}

View File

@@ -22,6 +22,7 @@ anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
clap_complete = { workspace = true }
codex-app-server = { workspace = true }
codex-app-server-daemon = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-app-server-test-client = { workspace = true }
codex-arg0 = { workspace = true }

View File

@@ -3,6 +3,9 @@ use clap::CommandFactory;
use clap::Parser;
use clap_complete::Shell;
use clap_complete::generate;
use codex_app_server_daemon::BootstrapOptions as AppServerBootstrapOptions;
use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand;
use codex_app_server_daemon::RemoteControlMode as AppServerRemoteControlMode;
use codex_arg0::Arg0DispatchPaths;
use codex_arg0::arg0_dispatch_or_else;
use codex_chatgpt::apply_command::ApplyCommand;
@@ -469,6 +472,9 @@ struct ExecServerCommand {
#[derive(Debug, clap::Subcommand)]
#[allow(clippy::enum_variant_names)]
enum AppServerSubcommand {
/// Manage the local app-server daemon.
Daemon(AppServerDaemonCommand),
/// Proxy stdio bytes to the running app-server control socket.
Proxy(AppServerProxyCommand),
@@ -483,6 +489,40 @@ enum AppServerSubcommand {
GenerateInternalJsonSchema(GenerateInternalJsonSchemaCommand),
}
#[derive(Debug, Args)]
struct AppServerDaemonCommand {
#[command(subcommand)]
subcommand: AppServerDaemonSubcommand,
}
#[derive(Debug, clap::Subcommand)]
enum AppServerDaemonSubcommand {
/// Install durable local app-server management for SSH-driven use.
Bootstrap(AppServerBootstrapCommand),
/// Start the local app server daemon if it is not already running.
Start,
/// Restart the local app server daemon.
Restart,
/// Enable remote_control for future starts and a currently running managed daemon.
EnableRemoteControl,
/// Disable remote_control for future starts and a currently running managed daemon.
DisableRemoteControl,
/// Stop the local app server daemon.
Stop,
/// Print local CLI and running app-server versions as JSON.
Version,
/// [internal] Run the detached pid-backed standalone updater loop.
#[clap(hide = true)]
PidUpdateLoop,
}
#[derive(Debug, Args)]
struct AppServerProxyCommand {
/// Path to the app-server Unix domain socket to connect to.
@@ -490,6 +530,13 @@ struct AppServerProxyCommand {
socket_path: Option<AbsolutePathBuf>,
}
#[derive(Debug, Args)]
struct AppServerBootstrapCommand {
/// Launch the managed app-server with remote_control enabled.
#[arg(long = "remote-control")]
remote_control: bool,
}
#[derive(Debug, Args)]
struct GenerateTsCommand {
/// Output directory where .ts files will be written
@@ -875,6 +922,41 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
)
.await?;
}
Some(AppServerSubcommand::Daemon(daemon_cli)) => match daemon_cli.subcommand {
AppServerDaemonSubcommand::Start => {
print_app_server_daemon_output(AppServerLifecycleCommand::Start).await?;
}
AppServerDaemonSubcommand::Bootstrap(bootstrap_cli) => {
let output =
codex_app_server_daemon::bootstrap(AppServerBootstrapOptions {
remote_control_enabled: bootstrap_cli.remote_control,
})
.await?;
println!("{}", serde_json::to_string(&output)?);
}
AppServerDaemonSubcommand::Restart => {
print_app_server_daemon_output(AppServerLifecycleCommand::Restart).await?;
}
AppServerDaemonSubcommand::EnableRemoteControl => {
print_app_server_remote_control_output(AppServerRemoteControlMode::Enabled)
.await?;
}
AppServerDaemonSubcommand::DisableRemoteControl => {
print_app_server_remote_control_output(
AppServerRemoteControlMode::Disabled,
)
.await?;
}
AppServerDaemonSubcommand::Stop => {
print_app_server_daemon_output(AppServerLifecycleCommand::Stop).await?;
}
AppServerDaemonSubcommand::Version => {
print_app_server_daemon_output(AppServerLifecycleCommand::Version).await?;
}
AppServerDaemonSubcommand::PidUpdateLoop => {
codex_app_server_daemon::run_pid_update_loop().await?;
}
},
Some(AppServerSubcommand::Proxy(proxy_cli)) => {
let socket_path = match proxy_cli.socket_path {
Some(socket_path) => socket_path,
@@ -1547,6 +1629,20 @@ fn reject_remote_mode_for_app_server_subcommand(
) -> anyhow::Result<()> {
let subcommand_name = match subcommand {
None => "app-server",
Some(AppServerSubcommand::Daemon(daemon)) => match daemon.subcommand {
AppServerDaemonSubcommand::Bootstrap(_) => "app-server daemon bootstrap",
AppServerDaemonSubcommand::Start => "app-server daemon start",
AppServerDaemonSubcommand::Restart => "app-server daemon restart",
AppServerDaemonSubcommand::EnableRemoteControl => {
"app-server daemon enable-remote-control"
}
AppServerDaemonSubcommand::DisableRemoteControl => {
"app-server daemon disable-remote-control"
}
AppServerDaemonSubcommand::Stop => "app-server daemon stop",
AppServerDaemonSubcommand::Version => "app-server daemon version",
AppServerDaemonSubcommand::PidUpdateLoop => "app-server daemon pid-update-loop",
},
Some(AppServerSubcommand::Proxy(_)) => "app-server proxy",
Some(AppServerSubcommand::GenerateTs(_)) => "app-server generate-ts",
Some(AppServerSubcommand::GenerateJsonSchema(_)) => "app-server generate-json-schema",
@@ -1557,6 +1653,20 @@ fn reject_remote_mode_for_app_server_subcommand(
reject_remote_mode_for_subcommand(remote, remote_auth_token_env, subcommand_name)
}
async fn print_app_server_daemon_output(command: AppServerLifecycleCommand) -> anyhow::Result<()> {
let output = codex_app_server_daemon::run(command).await?;
println!("{}", serde_json::to_string(&output)?);
Ok(())
}
async fn print_app_server_remote_control_output(
mode: AppServerRemoteControlMode,
) -> anyhow::Result<()> {
let output = codex_app_server_daemon::set_remote_control(mode).await?;
println!("{}", serde_json::to_string(&output)?);
Ok(())
}
fn read_remote_auth_token_from_env_var_with<F>(
env_var_name: &str,
get_var: F,
@@ -2524,6 +2634,70 @@ mod tests {
));
}
#[test]
fn app_server_daemon_subcommands_parse() {
assert!(matches!(
app_server_from_args(
[
"codex",
"app-server",
"daemon",
"bootstrap",
"--remote-control"
]
.as_ref()
)
.subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::Bootstrap(AppServerBootstrapCommand {
remote_control: true
})
}))
));
assert!(matches!(
app_server_from_args(["codex", "app-server", "daemon", "start"].as_ref()).subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::Start
}))
));
assert!(matches!(
app_server_from_args(["codex", "app-server", "daemon", "restart"].as_ref()).subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::Restart
}))
));
assert!(matches!(
app_server_from_args(
["codex", "app-server", "daemon", "enable-remote-control"].as_ref()
)
.subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::EnableRemoteControl
}))
));
assert!(matches!(
app_server_from_args(
["codex", "app-server", "daemon", "disable-remote-control"].as_ref()
)
.subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::DisableRemoteControl
}))
));
assert!(matches!(
app_server_from_args(["codex", "app-server", "daemon", "stop"].as_ref()).subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::Stop
}))
));
assert!(matches!(
app_server_from_args(["codex", "app-server", "daemon", "version"].as_ref()).subcommand,
Some(AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::Version
}))
));
}
#[test]
fn app_server_proxy_sock_path_parses() {
let app_server =
@@ -2552,6 +2726,20 @@ mod tests {
assert!(err.to_string().contains("app-server proxy"));
}
#[test]
fn reject_remote_auth_token_env_for_app_server_version() {
let subcommand = AppServerSubcommand::Daemon(AppServerDaemonCommand {
subcommand: AppServerDaemonSubcommand::Version,
});
let err = reject_remote_mode_for_app_server_subcommand(
/*remote*/ None,
Some("CODEX_REMOTE_AUTH_TOKEN"),
Some(&subcommand),
)
.expect_err("app-server daemon version should reject --remote-auth-token-env");
assert!(err.to_string().contains("app-server daemon version"));
}
#[test]
fn app_server_capability_token_flags_parse() {
let app_server = app_server_from_args(