From a37e0df5512ec153aa41f4e6030fbb4cfa9a1919 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 15 May 2026 11:28:56 -0700 Subject: [PATCH] improve remote-control CLI UX --- codex-rs/Cargo.lock | 1 + codex-rs/app-server-daemon/src/backend/mod.rs | 13 + codex-rs/app-server-daemon/src/backend/pid.rs | 104 ++- .../src/backend/pid_tests.rs | 24 + codex-rs/app-server-daemon/src/client.rs | 140 ++- codex-rs/app-server-daemon/src/lib.rs | 50 +- .../src/remote_control_client.rs | 420 +++++++++ codex-rs/app-server/src/lib.rs | 5 +- codex-rs/cli/Cargo.toml | 1 + codex-rs/cli/src/main.rs | 59 +- codex-rs/cli/src/remote_control_cmd.rs | 823 ++++++++++++++++++ 11 files changed, 1541 insertions(+), 99 deletions(-) create mode 100644 codex-rs/app-server-daemon/src/remote_control_client.rs create mode 100644 codex-rs/cli/src/remote_control_cmd.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index eb2439401b..de6282c54f 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2223,6 +2223,7 @@ dependencies = [ "clap_complete", "codex-api", "codex-app-server", + "codex-app-server-client", "codex-app-server-daemon", "codex-app-server-protocol", "codex-app-server-test-client", diff --git a/codex-rs/app-server-daemon/src/backend/mod.rs b/codex-rs/app-server-daemon/src/backend/mod.rs index 5e92dd5261..d40df07a0e 100644 --- a/codex-rs/app-server-daemon/src/backend/mod.rs +++ b/codex-rs/app-server-daemon/src/backend/mod.rs @@ -1,5 +1,6 @@ mod pid; +use std::path::Path; use std::path::PathBuf; use serde::Serialize; @@ -31,3 +32,15 @@ pub(crate) fn pid_backend(paths: BackendPaths) -> PidBackend { pub(crate) fn pid_update_loop_backend(paths: BackendPaths) -> PidBackend { PidBackend::new_update_loop(paths.codex_bin, paths.update_pid_file) } + +pub(crate) async fn append_stderr_log_tail_context(pid_file: &Path, context: &mut String) { + match pid::read_stderr_log_tail(pid_file).await { + Ok(Some(tail)) => tail.append_to_context(context), + Ok(None) => {} + Err(err) => { + context.push_str(&format!( + "\n\nFailed to read managed app-server stderr log: {err:#}" + )); + } + } +} diff --git a/codex-rs/app-server-daemon/src/backend/pid.rs b/codex-rs/app-server-daemon/src/backend/pid.rs index 7f8aa1660d..f5f4fc5b66 100644 --- a/codex-rs/app-server-daemon/src/backend/pid.rs +++ b/codex-rs/app-server-daemon/src/backend/pid.rs @@ -1,3 +1,4 @@ +use std::io::SeekFrom; use std::path::Path; use std::path::PathBuf; #[cfg(unix)] @@ -10,6 +11,8 @@ use anyhow::bail; use serde::Deserialize; use serde::Serialize; use tokio::fs; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; #[cfg(unix)] use tokio::process::Command; use tokio::time::sleep; @@ -18,6 +21,7 @@ const STOP_POLL_INTERVAL: Duration = Duration::from_millis(50); const STOP_GRACE_PERIOD: Duration = Duration::from_secs(60); const STOP_TIMEOUT: Duration = Duration::from_secs(70); const START_TIMEOUT: Duration = Duration::from_secs(10); +const STDERR_LOG_TAIL_BYTES: u64 = 4096; #[derive(Debug)] #[cfg_attr(not(unix), allow(dead_code))] @@ -35,6 +39,25 @@ struct PidRecord { process_start_time: String, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct PidLogTail { + pub(crate) path: PathBuf, + pub(crate) contents: String, +} + +impl PidLogTail { + pub(crate) fn append_to_context(&self, context: &mut String) { + context.push_str(&format!( + "\n\nManaged app-server stderr ({}):", + self.path.display() + )); + for line in self.contents.lines() { + context.push_str("\n "); + context.push_str(line); + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] enum PidFileState { Missing, @@ -129,11 +152,18 @@ impl PidBackend { } }; let mut command = Command::new(&self.codex_bin); + let stderr_log = match self.open_stderr_log().await { + Ok(stderr_log) => stderr_log, + Err(err) => { + let _ = fs::remove_file(&self.pid_file).await; + return Err(err); + } + }; command .args(self.command_args()) .stdin(Stdio::null()) .stdout(Stdio::null()) - .stderr(Stdio::null()); + .stderr(Stdio::from(stderr_log.into_std().await)); #[cfg(unix)] { @@ -169,8 +199,11 @@ impl PidBackend { }, Err(err) => { let _ = self.terminate_process(pid); + let mut context = + format!("failed to record pid-managed app-server process {pid} startup"); + super::append_stderr_log_tail_context(&self.pid_file, &mut context).await; let _ = fs::remove_file(&self.pid_file).await; - return Err(err); + return Err(err).context(context); } }; let contents = serde_json::to_vec(&record).context("failed to serialize pid record")?; @@ -344,6 +377,23 @@ impl PidBackend { Ok(reservation_lock) } + #[cfg(unix)] + async fn open_stderr_log(&self) -> Result { + let stderr_log_file = stderr_log_file_for_pid_file(&self.pid_file); + fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&stderr_log_file) + .await + .with_context(|| { + format!( + "failed to open stderr log for pid-managed app server {}", + stderr_log_file.display() + ) + }) + } + #[cfg(unix)] fn command_args(&self) -> Vec<&'static str> { match self.command_kind { @@ -376,6 +426,56 @@ impl PidBackend { } } +pub(crate) async fn read_stderr_log_tail(pid_file: &Path) -> Result> { + let path = stderr_log_file_for_pid_file(pid_file); + let Some(contents) = read_log_tail(&path, STDERR_LOG_TAIL_BYTES).await? else { + return Ok(None); + }; + Ok(Some(PidLogTail { path, contents })) +} + +fn stderr_log_file_for_pid_file(pid_file: &Path) -> PathBuf { + pid_file.with_extension("stderr.log") +} + +async fn read_log_tail(path: &Path, byte_limit: u64) -> Result> { + let mut file = match fs::File::open(path).await { + Ok(file) => file, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => { + return Err(err) + .with_context(|| format!("failed to open stderr log {}", path.display())); + } + }; + let len = file + .metadata() + .await + .with_context(|| format!("failed to inspect stderr log {}", path.display()))? + .len(); + if len == 0 { + return Ok(None); + } + + let start = len.saturating_sub(byte_limit); + file.seek(SeekFrom::Start(start)) + .await + .with_context(|| format!("failed to seek stderr log {}", path.display()))?; + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes) + .await + .with_context(|| format!("failed to read stderr log {}", path.display()))?; + if start > 0 + && let Some(newline_index) = bytes.iter().position(|byte| *byte == b'\n') + { + bytes.drain(..=newline_index); + } + let contents = String::from_utf8_lossy(&bytes).trim_end().to_string(); + if contents.is_empty() { + return Ok(None); + } + Ok(Some(contents)) +} + #[cfg(unix)] fn process_exists(pid: u32) -> bool { let Ok(pid) = libc::pid_t::try_from(pid) else { diff --git a/codex-rs/app-server-daemon/src/backend/pid_tests.rs b/codex-rs/app-server-daemon/src/backend/pid_tests.rs index 305c2a36a3..4c3a0e4414 100644 --- a/codex-rs/app-server-daemon/src/backend/pid_tests.rs +++ b/codex-rs/app-server-daemon/src/backend/pid_tests.rs @@ -6,7 +6,10 @@ use tempfile::TempDir; use super::PidBackend; use super::PidCommandKind; use super::PidFileState; +use super::PidLogTail; use super::PidRecord; +use super::read_stderr_log_tail; +use super::stderr_log_file_for_pid_file; use super::try_lock_file; #[tokio::test] @@ -170,3 +173,24 @@ fn app_server_remote_control_uses_runtime_flag() { vec!["app-server", "--remote-control", "--listen", "unix://"] ); } + +#[tokio::test] +async fn read_stderr_log_tail_returns_recent_complete_lines() { + let temp_dir = TempDir::new().expect("temp dir"); + let pid_file = temp_dir.path().join("app-server.pid"); + let log_file = stderr_log_file_for_pid_file(&pid_file); + let contents = format!("{}\nrecent error\nusage", "x".repeat(4100)); + tokio::fs::write(&log_file, contents) + .await + .expect("write stderr log"); + + assert_eq!( + read_stderr_log_tail(&pid_file) + .await + .expect("read stderr log"), + Some(PidLogTail { + path: log_file, + contents: "recent error\nusage".to_string(), + }) + ); +} diff --git a/codex-rs/app-server-daemon/src/client.rs b/codex-rs/app-server-daemon/src/client.rs index 44fccda394..8c771ba1f2 100644 --- a/codex-rs/app-server-daemon/src/client.rs +++ b/codex-rs/app-server-daemon/src/client.rs @@ -5,6 +5,7 @@ use anyhow::Context; use anyhow::Result; use anyhow::anyhow; use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::InitializeCapabilities; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::InitializeResponse; use codex_app_server_protocol::JSONRPCMessage; @@ -14,12 +15,16 @@ use codex_app_server_protocol::RequestId; use codex_uds::UnixStream; use futures::SinkExt; use futures::StreamExt; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; use tokio::time::timeout; +use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::client_async; use tokio_tungstenite::tungstenite::Message; -const PROBE_TIMEOUT: Duration = Duration::from_secs(2); +pub(crate) const CONTROL_SOCKET_RESPONSE_TIMEOUT: Duration = Duration::from_secs(2); const CLIENT_NAME: &str = "codex_app_server_daemon"; +const INITIALIZE_REQUEST_ID: RequestId = RequestId::Integer(1); #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct ProbeInfo { @@ -27,7 +32,7 @@ pub(crate) struct ProbeInfo { } pub(crate) async fn probe(socket_path: &Path) -> Result { - timeout(PROBE_TIMEOUT, probe_inner(socket_path)) + timeout(CONTROL_SOCKET_RESPONSE_TIMEOUT, probe_inner(socket_path)) .await .with_context(|| { format!( @@ -38,54 +43,14 @@ pub(crate) async fn probe(socket_path: &Path) -> Result { } async fn probe_inner(socket_path: &Path) -> Result { - let stream = UnixStream::connect(socket_path) - .await - .with_context(|| format!("failed to connect to {}", socket_path.display()))?; - let (mut websocket, _response) = client_async("ws://localhost/", stream) - .await - .with_context(|| format!("failed to upgrade {}", socket_path.display()))?; - - let initialize = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(1), - method: "initialize".to_string(), - params: Some(serde_json::to_value(InitializeParams { - client_info: ClientInfo { - name: CLIENT_NAME.to_string(), - title: Some("Codex App Server Daemon".to_string()), - version: env!("CARGO_PKG_VERSION").to_string(), - }, - capabilities: None, - })?), - trace: None, - }); - websocket - .send(Message::Text(serde_json::to_string(&initialize)?.into())) - .await - .context("failed to send initialize request")?; - - let response = loop { - let frame = websocket - .next() - .await - .ok_or_else(|| anyhow!("app-server closed before initialize response"))??; - let Message::Text(payload) = frame else { - continue; - }; - let message = serde_json::from_str::(&payload)?; - if let JSONRPCMessage::Response(response) = message - && response.id == RequestId::Integer(1) - { - break response; - } - }; - let initialize_response = serde_json::from_value::(response.result)?; + let mut websocket = connect(socket_path).await?; + let initialize_response = initialize(&mut websocket, /*experimental_api*/ false).await?; let initialized = JSONRPCMessage::Notification(JSONRPCNotification { method: "initialized".to_string(), params: None, }); - websocket - .send(Message::Text(serde_json::to_string(&initialized)?.into())) + send_message(&mut websocket, &initialized) .await .context("failed to send initialized notification")?; websocket.close(None).await.ok(); @@ -95,6 +60,91 @@ async fn probe_inner(socket_path: &Path) -> Result { }) } +pub(crate) async fn connect(socket_path: &Path) -> Result> { + let stream = UnixStream::connect(socket_path) + .await + .with_context(|| format!("failed to connect to {}", socket_path.display()))?; + let (websocket, _response) = client_async("ws://localhost/", stream) + .await + .with_context(|| format!("failed to upgrade {}", socket_path.display()))?; + Ok(websocket) +} + +pub(crate) async fn initialize( + websocket: &mut WebSocketStream, + experimental_api: bool, +) -> Result +where + S: AsyncRead + AsyncWrite + Unpin, +{ + let initialize = JSONRPCMessage::Request(JSONRPCRequest { + id: INITIALIZE_REQUEST_ID, + method: "initialize".to_string(), + params: Some(serde_json::to_value(InitializeParams { + client_info: ClientInfo { + name: CLIENT_NAME.to_string(), + title: Some("Codex App Server Daemon".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + capabilities: if experimental_api { + Some(InitializeCapabilities { + experimental_api: true, + ..Default::default() + }) + } else { + None + }, + })?), + trace: None, + }); + send_message(websocket, &initialize) + .await + .context("failed to send initialize request")?; + + let response = loop { + let message = timeout(CONTROL_SOCKET_RESPONSE_TIMEOUT, read_message(websocket)) + .await + .context("timed out waiting for initialize response")??; + if let JSONRPCMessage::Response(response) = message + && response.id == INITIALIZE_REQUEST_ID + { + break response; + } + }; + serde_json::from_value::(response.result) + .context("failed to parse initialize response") +} + +pub(crate) async fn send_message( + websocket: &mut WebSocketStream, + message: &JSONRPCMessage, +) -> Result<()> +where + S: AsyncRead + AsyncWrite + Unpin, +{ + websocket + .send(Message::Text(serde_json::to_string(message)?.into())) + .await?; + Ok(()) +} + +pub(crate) async fn read_message(websocket: &mut WebSocketStream) -> Result +where + S: AsyncRead + AsyncWrite + Unpin, +{ + loop { + let frame = websocket + .next() + .await + .ok_or_else(|| anyhow!("app-server closed the control socket"))??; + let Message::Text(payload) = frame else { + continue; + }; + return serde_json::from_str::(&payload) + .context("failed to parse app-server JSON-RPC message"); + } +} + fn parse_version_from_user_agent(user_agent: &str) -> Result { let (_originator, rest) = user_agent .split_once('/') diff --git a/codex-rs/app-server-daemon/src/lib.rs b/codex-rs/app-server-daemon/src/lib.rs index 4ab223051c..bb3bb8a837 100644 --- a/codex-rs/app-server-daemon/src/lib.rs +++ b/codex-rs/app-server-daemon/src/lib.rs @@ -1,6 +1,7 @@ mod backend; mod client; mod managed_install; +mod remote_control_client; mod settings; mod update_loop; @@ -13,6 +14,7 @@ use anyhow::Result; use anyhow::anyhow; pub use backend::BackendKind; use backend::BackendPaths; +use codex_app_server_protocol::RemoteControlConnectionStatus; use codex_app_server_transport::app_server_control_socket_path; use codex_utils_home_dir::find_codex_home; use managed_install::managed_codex_bin; @@ -96,6 +98,20 @@ pub enum RemoteControlStartOutput { Start(LifecycleOutput), } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteControlReadyStatus { + pub status: RemoteControlConnectionStatus, + pub server_name: String, + pub environment_id: Option, + pub timed_out: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteControlReadyOutput { + pub daemon: RemoteControlStartOutput, + pub remote_control: RemoteControlReadyStatus, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RemoteControlMode { Enabled, @@ -179,6 +195,13 @@ pub async fn ensure_remote_control_started() -> Result .await } +pub async fn ensure_remote_control_ready() -> Result { + ensure_supported_platform()?; + Daemon::from_environment()? + .ensure_remote_control_ready() + .await +} + pub async fn set_remote_control(mode: RemoteControlMode) -> Result { ensure_supported_platform()?; Daemon::from_environment()?.set_remote_control(mode).await @@ -395,17 +418,22 @@ impl Daemon { sleep(START_POLL_INTERVAL).await; } Err(err) => { - return Err(err).with_context(|| { - format!( - "app server did not become ready on {}", - self.socket_path.display() - ) - }); + let context = self.app_server_not_ready_context().await; + return Err(err).context(context); } } } } + async fn app_server_not_ready_context(&self) -> String { + let mut context = format!( + "app server did not become ready on {}", + self.socket_path.display() + ); + backend::append_stderr_log_tail_context(&self.pid_file, &mut context).await; + context + } + async fn bootstrap(&self, options: BootstrapOptions) -> Result { let _operation_lock = self.acquire_operation_lock().await?; self.bootstrap_locked(options).await @@ -430,6 +458,16 @@ impl Daemon { Ok(RemoteControlStartOutput::Bootstrap(output)) } + async fn ensure_remote_control_ready(&self) -> Result { + let daemon = self.ensure_remote_control_started().await?; + let remote_control = + remote_control_client::enable_remote_control(&self.socket_path).await?; + Ok(RemoteControlReadyOutput { + daemon, + remote_control, + }) + } + async fn set_remote_control(&self, mode: RemoteControlMode) -> Result { let _operation_lock = self.acquire_operation_lock().await?; self.set_remote_control_locked(mode).await diff --git a/codex-rs/app-server-daemon/src/remote_control_client.rs b/codex-rs/app-server-daemon/src/remote_control_client.rs new file mode 100644 index 0000000000..ecb3c1c260 --- /dev/null +++ b/codex-rs/app-server-daemon/src/remote_control_client.rs @@ -0,0 +1,420 @@ +use std::path::Path; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use anyhow::anyhow; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::RemoteControlConnectionStatus; +use codex_app_server_protocol::RemoteControlEnableResponse; +use codex_app_server_protocol::RemoteControlStatusChangedNotification; +use codex_app_server_protocol::RequestId; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::time::timeout; +use tokio_tungstenite::WebSocketStream; + +use crate::RemoteControlReadyStatus; +use crate::client; + +const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10); +const REMOTE_CONTROL_ENABLE_REQUEST_ID: RequestId = RequestId::Integer(2); + +pub(crate) async fn enable_remote_control(socket_path: &Path) -> Result { + enable_remote_control_with_timeout(socket_path, REMOTE_CONTROL_READY_TIMEOUT).await +} + +async fn enable_remote_control_with_timeout( + socket_path: &Path, + ready_timeout: Duration, +) -> Result { + let mut websocket = client::connect(socket_path).await?; + + client::initialize(&mut websocket, /*experimental_api*/ true).await?; + let initialized = JSONRPCMessage::Notification(JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + client::send_message(&mut websocket, &initialized) + .await + .context("failed to send initialized notification")?; + + let enable = JSONRPCMessage::Request(JSONRPCRequest { + id: REMOTE_CONTROL_ENABLE_REQUEST_ID, + method: "remoteControl/enable".to_string(), + params: None, + trace: None, + }); + client::send_message(&mut websocket, &enable) + .await + .context("failed to send remoteControl/enable request")?; + + let mut latest = read_enable_response(&mut websocket).await?; + if latest.status == RemoteControlConnectionStatus::Connecting { + latest = wait_for_remote_control_status(&mut websocket, latest, ready_timeout).await?; + } + websocket.close(None).await.ok(); + Ok(latest) +} + +async fn read_enable_response( + websocket: &mut WebSocketStream, +) -> Result +where + S: AsyncRead + AsyncWrite + Unpin, +{ + loop { + let message = timeout( + client::CONTROL_SOCKET_RESPONSE_TIMEOUT, + client::read_message(websocket), + ) + .await + .context("timed out waiting for remoteControl/enable response")??; + match message { + JSONRPCMessage::Response(response) + if response.id == REMOTE_CONTROL_ENABLE_REQUEST_ID => + { + let response = + serde_json::from_value::(response.result) + .context("failed to parse remoteControl/enable response")?; + return Ok(RemoteControlReadyStatus::from(response)); + } + JSONRPCMessage::Error(err) if err.id == REMOTE_CONTROL_ENABLE_REQUEST_ID => { + return Err(anyhow!( + "remoteControl/enable failed: {}", + err.error.message + )); + } + JSONRPCMessage::Notification(notification) + if remote_control_status_notification(¬ification).is_some() => + { + continue; + } + _ => {} + } + } +} + +async fn wait_for_remote_control_status( + websocket: &mut WebSocketStream, + mut latest: RemoteControlReadyStatus, + ready_timeout: Duration, +) -> Result +where + S: AsyncRead + AsyncWrite + Unpin, +{ + let deadline = tokio::time::Instant::now() + ready_timeout; + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let message = match timeout(remaining, client::read_message(websocket)).await { + Ok(Ok(message)) => message, + Ok(Err(err)) => return Err(err), + Err(_) => { + latest.timed_out = true; + return Ok(latest); + } + }; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + let Some(status) = remote_control_status_notification(¬ification) else { + continue; + }; + latest = RemoteControlReadyStatus::from(status); + if latest.status != RemoteControlConnectionStatus::Connecting { + return Ok(latest); + } + } + latest.timed_out = true; + Ok(latest) +} + +fn remote_control_status_notification( + notification: &JSONRPCNotification, +) -> Option { + if notification.method != "remoteControl/status/changed" { + return None; + } + let params = notification.params.clone()?; + serde_json::from_value(params).ok() +} + +impl From for RemoteControlReadyStatus { + fn from(response: RemoteControlEnableResponse) -> Self { + let RemoteControlEnableResponse { + status, + server_name, + installation_id: _, + environment_id, + } = response; + Self { + status, + server_name, + environment_id, + timed_out: false, + } + } +} + +impl From for RemoteControlReadyStatus { + fn from(notification: RemoteControlStatusChangedNotification) -> Self { + let RemoteControlStatusChangedNotification { + status, + server_name, + installation_id: _, + environment_id, + } = notification; + Self { + status, + server_name, + environment_id, + timed_out: false, + } + } +} + +#[cfg(all(test, unix))] +mod tests { + use anyhow::Result; + use codex_app_server_protocol::JSONRPCResponse; + use codex_uds::UnixListener; + use pretty_assertions::assert_eq; + use tempfile::TempDir; + use tokio_tungstenite::accept_async; + + use super::*; + + const INITIALIZE_REQUEST_ID: RequestId = RequestId::Integer(1); + const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111"; + const TEST_SERVER_NAME: &str = "owen-mbp"; + const TEST_CODEX_HOME: &str = "/tmp/codex-home"; + + #[tokio::test] + async fn enable_remote_control_uses_connected_enable_response_without_later_notification() + -> Result<()> { + let status = run_enable_remote_control_scenario(EnableScenario { + initial_notification: Some(remote_control_status( + RemoteControlConnectionStatus::Connected, + Some("env_test"), + )), + enable_response: remote_control_status( + RemoteControlConnectionStatus::Connected, + Some("env_test"), + ), + after_enable_notification: None, + ready_timeout: Duration::from_millis(20), + }) + .await?; + + assert_eq!( + status, + RemoteControlReadyStatus { + status: RemoteControlConnectionStatus::Connected, + server_name: TEST_SERVER_NAME.to_string(), + environment_id: Some("env_test".to_string()), + timed_out: false, + } + ); + Ok(()) + } + + #[tokio::test] + async fn enable_remote_control_waits_for_connected_notification() -> Result<()> { + let status = run_enable_remote_control_scenario(EnableScenario { + initial_notification: None, + enable_response: remote_control_status( + RemoteControlConnectionStatus::Connecting, + /*environment_id*/ None, + ), + after_enable_notification: Some(remote_control_status( + RemoteControlConnectionStatus::Connected, + Some("env_test"), + )), + ready_timeout: Duration::from_secs(1), + }) + .await?; + + assert_eq!( + status, + RemoteControlReadyStatus { + status: RemoteControlConnectionStatus::Connected, + server_name: TEST_SERVER_NAME.to_string(), + environment_id: Some("env_test".to_string()), + timed_out: false, + } + ); + Ok(()) + } + + #[tokio::test] + async fn enable_remote_control_reports_connecting_after_timeout() -> Result<()> { + let status = run_enable_remote_control_scenario(EnableScenario { + initial_notification: None, + enable_response: remote_control_status( + RemoteControlConnectionStatus::Connecting, + /*environment_id*/ None, + ), + after_enable_notification: None, + ready_timeout: Duration::from_millis(20), + }) + .await?; + + assert_eq!( + status, + RemoteControlReadyStatus { + status: RemoteControlConnectionStatus::Connecting, + server_name: TEST_SERVER_NAME.to_string(), + environment_id: None, + timed_out: true, + } + ); + Ok(()) + } + + #[tokio::test] + async fn enable_remote_control_returns_errored_enable_response() -> Result<()> { + let status = run_enable_remote_control_scenario(EnableScenario { + initial_notification: None, + enable_response: remote_control_status( + RemoteControlConnectionStatus::Errored, + /*environment_id*/ None, + ), + after_enable_notification: None, + ready_timeout: Duration::from_millis(20), + }) + .await?; + + assert_eq!( + status, + RemoteControlReadyStatus { + status: RemoteControlConnectionStatus::Errored, + server_name: TEST_SERVER_NAME.to_string(), + environment_id: None, + timed_out: false, + } + ); + Ok(()) + } + + struct EnableScenario { + initial_notification: Option, + enable_response: RemoteControlStatusChangedNotification, + after_enable_notification: Option, + ready_timeout: Duration, + } + + async fn run_enable_remote_control_scenario( + scenario: EnableScenario, + ) -> Result { + let dir = TempDir::new()?; + let socket_path = dir.path().join("app-server.sock"); + let listener = UnixListener::bind(&socket_path).await?; + let ready_timeout = scenario.ready_timeout; + let server_task = tokio::spawn(serve_enable_remote_control_scenario(listener, scenario)); + + let status = enable_remote_control_with_timeout(&socket_path, ready_timeout).await?; + server_task.await??; + Ok(status) + } + + async fn serve_enable_remote_control_scenario( + mut listener: UnixListener, + scenario: EnableScenario, + ) -> Result<()> { + let stream = listener.accept().await?; + let mut websocket = accept_async(stream).await?; + + let initialize = client::read_message(&mut websocket).await?; + let JSONRPCMessage::Request(initialize) = initialize else { + panic!("expected initialize request"); + }; + assert_eq!(initialize.id, INITIALIZE_REQUEST_ID); + assert_eq!(initialize.method, "initialize"); + let Some(initialize_params) = initialize.params else { + panic!("expected initialize params"); + }; + assert_eq!( + initialize_params["capabilities"]["experimentalApi"], + serde_json::Value::Bool(true) + ); + client::send_message( + &mut websocket, + &JSONRPCMessage::Response(JSONRPCResponse { + id: INITIALIZE_REQUEST_ID, + result: serde_json::json!({ + "userAgent": "codex_app_server/1.2.3", + "codexHome": TEST_CODEX_HOME, + "platformFamily": "unix", + "platformOs": "macos", + }), + }), + ) + .await?; + + let initialized = client::read_message(&mut websocket).await?; + let JSONRPCMessage::Notification(initialized) = initialized else { + panic!("expected initialized notification"); + }; + assert_eq!(initialized.method, "initialized"); + + if let Some(status) = scenario.initial_notification { + send_remote_control_status(&mut websocket, status).await?; + } + + let enable = client::read_message(&mut websocket).await?; + let JSONRPCMessage::Request(enable) = enable else { + panic!("expected remoteControl/enable request"); + }; + assert_eq!(enable.id, REMOTE_CONTROL_ENABLE_REQUEST_ID); + assert_eq!(enable.method, "remoteControl/enable"); + client::send_message( + &mut websocket, + &JSONRPCMessage::Response(JSONRPCResponse { + id: REMOTE_CONTROL_ENABLE_REQUEST_ID, + result: serde_json::to_value(RemoteControlEnableResponse::from( + scenario.enable_response, + ))?, + }), + ) + .await?; + + if let Some(status) = scenario.after_enable_notification { + send_remote_control_status(&mut websocket, status).await?; + } else { + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + } + + async fn send_remote_control_status( + websocket: &mut WebSocketStream, + status: RemoteControlStatusChangedNotification, + ) -> Result<()> + where + S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, + { + client::send_message( + websocket, + &JSONRPCMessage::Notification(JSONRPCNotification { + method: "remoteControl/status/changed".to_string(), + params: Some(serde_json::to_value(status)?), + }), + ) + .await + } + + fn remote_control_status( + status: RemoteControlConnectionStatus, + environment_id: Option<&str>, + ) -> RemoteControlStatusChangedNotification { + RemoteControlStatusChangedNotification { + status, + server_name: TEST_SERVER_NAME.to_string(), + installation_id: TEST_INSTALLATION_ID.to_string(), + environment_id: environment_id.map(str::to_string), + } + } +} diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 4482d326f6..142df216b5 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -400,6 +400,7 @@ pub enum PluginStartupTasks { pub struct AppServerRuntimeOptions { pub plugin_startup_tasks: PluginStartupTasks, pub remote_control_enabled: bool, + pub install_shutdown_signal_handler: bool, } impl Default for AppServerRuntimeOptions { @@ -407,6 +408,7 @@ impl Default for AppServerRuntimeOptions { Self { plugin_startup_tasks: PluginStartupTasks::Start, remote_control_enabled: false, + install_shutdown_signal_handler: true, } } } @@ -645,7 +647,8 @@ pub async fn run_main_with_transport_options( let single_client_mode = matches!(&transport, AppServerTransport::Stdio); let shutdown_when_no_connections = single_client_mode; - let graceful_signal_restart_enabled = !single_client_mode; + let graceful_signal_restart_enabled = + runtime_options.install_shutdown_signal_handler && !single_client_mode; let mut app_server_client_name_rx = None; match &transport { diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index 627d5d16d1..b916c20a28 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -22,6 +22,7 @@ anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } clap_complete = { workspace = true } codex-app-server = { workspace = true } +codex-app-server-client = { workspace = true } codex-app-server-daemon = { workspace = true } codex-app-server-protocol = { workspace = true } codex-app-server-test-client = { workspace = true } diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 87ac5d6676..87fb007430 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -52,6 +52,7 @@ mod doctor; mod marketplace_cmd; mod mcp_cmd; mod plugin_cmd; +mod remote_control_cmd; mod state_db_recovery; #[cfg(not(windows))] mod wsl_paths; @@ -59,6 +60,7 @@ mod wsl_paths; use crate::mcp_cmd::McpCli; use crate::plugin_cmd::PluginCli; use crate::plugin_cmd::PluginSubcommand; +use crate::remote_control_cmd::RemoteControlCommand; use doctor::DoctorCommand; use state_db_recovery as local_state_db; @@ -559,21 +561,6 @@ struct AppServerBootstrapCommand { remote_control: bool, } -#[derive(Debug, Args)] -struct RemoteControlCommand { - #[command(subcommand)] - subcommand: Option, -} - -#[derive(Debug, Clone, Copy, clap::Subcommand)] -enum RemoteControlSubcommand { - /// Start the app-server daemon with remote control enabled. - Start, - - /// Stop the app-server daemon. - Stop, -} - #[derive(Debug, Args)] struct GenerateTsCommand { /// Output directory where .ts files will be written @@ -1063,24 +1050,18 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { } } Some(Subcommand::RemoteControl(remote_control_cli)) => { - let subcommand_name = remote_control_subcommand_name(&remote_control_cli); + let subcommand_name = remote_control_cli.subcommand_name(); reject_remote_mode_for_subcommand( root_remote.as_deref(), root_remote_auth_token_env.as_deref(), subcommand_name, )?; - match remote_control_cli - .subcommand - .unwrap_or(RemoteControlSubcommand::Start) - { - RemoteControlSubcommand::Start => { - let output = codex_app_server_daemon::ensure_remote_control_started().await?; - println!("{}", serde_json::to_string(&output)?); - } - RemoteControlSubcommand::Stop => { - print_app_server_daemon_output(AppServerLifecycleCommand::Stop).await?; - } - } + remote_control_cmd::run( + remote_control_cli, + arg0_paths.clone(), + root_config_overrides, + ) + .await?; } #[cfg(any(target_os = "macos", target_os = "windows"))] Some(Subcommand::App(app_cli)) => { @@ -1802,9 +1783,7 @@ fn unsupported_subcommand_name_for_strict_config( Some(Subcommand::AppServer(app_server)) => { Some(app_server_subcommand_name(app_server.subcommand.as_ref())) } - Some(Subcommand::RemoteControl(remote_control)) => { - Some(remote_control_subcommand_name(remote_control)) - } + Some(Subcommand::RemoteControl(remote_control)) => Some(remote_control.subcommand_name()), Some(Subcommand::Mcp(_)) => Some("mcp"), Some(Subcommand::Plugin(_)) => Some("plugin"), #[cfg(any(target_os = "macos", target_os = "windows"))] @@ -1857,14 +1836,6 @@ fn reject_remote_mode_for_app_server_subcommand( reject_remote_mode_for_subcommand(remote, remote_auth_token_env, subcommand_name) } -fn remote_control_subcommand_name(command: &RemoteControlCommand) -> &'static str { - match command.subcommand { - None => "remote-control", - Some(RemoteControlSubcommand::Start) => "remote-control start", - Some(RemoteControlSubcommand::Stop) => "remote-control stop", - } -} - fn app_server_subcommand_name(subcommand: Option<&AppServerSubcommand>) -> &'static str { match subcommand { None => "app-server", @@ -2879,12 +2850,10 @@ mod tests { fn reject_remote_flag_for_remote_control() { let cli = MultitoolCli::try_parse_from(["codex", "--remote", "unix://", "remote-control"]) .expect("parse"); - assert_matches!( - cli.subcommand, - Some(Subcommand::RemoteControl(RemoteControlCommand { - subcommand: None - })) - ); + let Some(Subcommand::RemoteControl(remote_control)) = &cli.subcommand else { + panic!("expected remote-control subcommand"); + }; + assert_eq!(remote_control.subcommand_name(), "remote-control"); let err = reject_remote_mode_for_subcommand( cli.remote.remote.as_deref(), diff --git a/codex-rs/cli/src/remote_control_cmd.rs b/codex-rs/cli/src/remote_control_cmd.rs new file mode 100644 index 0000000000..0702b8024a --- /dev/null +++ b/codex-rs/cli/src/remote_control_cmd.rs @@ -0,0 +1,823 @@ +use std::time::Duration; + +use anyhow::Context; +use clap::Args; +use codex_app_server::AppServerRuntimeOptions; +use codex_app_server::AppServerTransport; +use codex_app_server::AppServerWebsocketAuthSettings; +use codex_app_server_client::AppServerEvent; +use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY; +use codex_app_server_client::RemoteAppServerClient; +use codex_app_server_client::RemoteAppServerConnectArgs; +use codex_app_server_client::RemoteAppServerEndpoint; +use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand; +use codex_app_server_daemon::LifecycleOutput as AppServerLifecycleOutput; +use codex_app_server_daemon::LifecycleStatus as AppServerLifecycleStatus; +use codex_app_server_daemon::RemoteControlReadyOutput as AppServerRemoteControlReadyOutput; +use codex_app_server_daemon::RemoteControlReadyStatus as AppServerRemoteControlReadyStatus; +use codex_app_server_daemon::RemoteControlStartOutput as AppServerRemoteControlStartOutput; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::RemoteControlConnectionStatus; +use codex_app_server_protocol::RemoteControlEnableResponse; +use codex_app_server_protocol::RemoteControlStatusChangedNotification; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; +use codex_arg0::Arg0DispatchPaths; +use codex_config::LoaderOverrides; +use codex_protocol::protocol::SessionSource; +use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_cli::CliConfigOverrides; +use serde::Serialize; +use tokio::sync::watch; +use tokio::task::JoinHandle; +use tokio::time::Instant; +use tokio::time::sleep; +use tokio::time::timeout; + +const FOREGROUND_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +const FOREGROUND_SOCKET_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(50); +const FOREGROUND_APP_SERVER_ABORT_TIMEOUT: Duration = Duration::from_secs(1); +const REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); +const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10); +const REMOTE_CONTROL_CLIENT_NAME: &str = "codex-remote-control"; + +#[derive(Debug, Args)] +pub(crate) struct RemoteControlCommand { + /// Emit machine-readable JSON. + #[arg(long = "json", global = true)] + json: bool, + + #[command(subcommand)] + subcommand: Option, +} + +impl RemoteControlCommand { + pub(crate) fn subcommand_name(&self) -> &'static str { + match self.subcommand { + None => "remote-control", + Some(RemoteControlSubcommand::Start) => "remote-control start", + Some(RemoteControlSubcommand::Stop) => "remote-control stop", + } + } +} + +#[derive(Debug, Clone, Copy, clap::Subcommand)] +enum RemoteControlSubcommand { + /// Start the app-server daemon with remote control enabled. + Start, + + /// Stop the app-server daemon. + Stop, +} + +pub(crate) async fn run( + command: RemoteControlCommand, + arg0_paths: Arg0DispatchPaths, + root_config_overrides: CliConfigOverrides, +) -> anyhow::Result<()> { + match command.subcommand { + None => { + run_foreground_remote_control(command.json, arg0_paths, root_config_overrides).await?; + } + Some(RemoteControlSubcommand::Start) => { + let output = codex_app_server_daemon::ensure_remote_control_ready().await?; + print_remote_control_start_output(&output, command.json)?; + } + Some(RemoteControlSubcommand::Stop) => { + let output = codex_app_server_daemon::run(AppServerLifecycleCommand::Stop).await?; + print_remote_control_stop_output(&output, command.json)?; + } + } + Ok(()) +} + +async fn run_foreground_remote_control( + json: bool, + arg0_paths: Arg0DispatchPaths, + root_config_overrides: CliConfigOverrides, +) -> anyhow::Result<()> { + let socket_dir = tempfile::Builder::new() + .prefix("codex-rc-") + .tempdir_in("/tmp") + .or_else(|_| tempfile::tempdir()) + .context("failed to create private app-server socket directory")?; + let socket_path = socket_dir.path().join("rc.sock"); + let socket_path = AbsolutePathBuf::from_absolute_path(&socket_path) + .context("private app-server socket path was not absolute")?; + let transport = AppServerTransport::UnixSocket { + socket_path: socket_path.clone(), + }; + let runtime_options = AppServerRuntimeOptions { + remote_control_enabled: true, + install_shutdown_signal_handler: false, + ..Default::default() + }; + let (stop_rx, stop_signal_task) = foreground_stop_signal(); + let mut app_server_task = tokio::spawn(codex_app_server::run_main_with_transport_options( + arg0_paths, + root_config_overrides, + LoaderOverrides::default(), + /*strict_config*/ false, + /*default_analytics_enabled*/ false, + transport, + SessionSource::VSCode, + AppServerWebsocketAuthSettings::default(), + runtime_options, + )); + + let summary = match wait_for_foreground_remote_control_start( + &mut app_server_task, + wait_for_foreground_remote_control_ready(socket_path), + stop_rx.clone(), + ) + .await + { + ForegroundStartupResult::Ready(summary) => summary, + ForegroundStartupResult::Stopped => { + abort_foreground_app_server(app_server_task).await; + stop_signal_task.abort(); + return Ok(()); + } + ForegroundStartupResult::ReadyFailed(error) => { + abort_foreground_app_server(app_server_task).await; + stop_signal_task.abort(); + return Err(error); + } + ForegroundStartupResult::AppServerExited(error) => { + stop_signal_task.abort(); + return Err(error); + } + }; + + if *stop_rx.borrow() { + abort_foreground_app_server(app_server_task).await; + stop_signal_task.abort(); + return Ok(()); + } + + if let Err(error) = print_foreground_ready_output(&summary, json) { + abort_foreground_app_server(app_server_task).await; + stop_signal_task.abort(); + return Err(error); + } + + let result = wait_for_foreground_app_server(app_server_task, stop_rx).await; + stop_signal_task.abort(); + result +} + +fn foreground_stop_signal() -> (watch::Receiver, JoinHandle<()>) { + let (stop_tx, stop_rx) = watch::channel(false); + let task = tokio::spawn(async move { + if let Err(err) = tokio::signal::ctrl_c().await { + eprintln!("failed to listen for Ctrl-C: {err}"); + } + let _ = stop_tx.send(true); + }); + (stop_rx, task) +} + +enum ForegroundStartupResult { + Ready(RemoteControlReadySummary), + Stopped, + ReadyFailed(anyhow::Error), + AppServerExited(anyhow::Error), +} + +async fn wait_for_foreground_remote_control_start( + app_server_task: &mut JoinHandle>, + ready: impl std::future::Future>, + mut stop_rx: watch::Receiver, +) -> ForegroundStartupResult { + tokio::pin!(ready); + + tokio::select! { + ready_result = &mut ready => match ready_result { + Ok(summary) => ForegroundStartupResult::Ready(summary), + Err(error) => ForegroundStartupResult::ReadyFailed(error), + }, + app_server_result = app_server_task => { + ForegroundStartupResult::AppServerExited( + foreground_app_server_exited_before_ready(app_server_result) + ) + } + _ = wait_for_stop_signal(&mut stop_rx) => ForegroundStartupResult::Stopped, + } +} + +async fn wait_for_foreground_app_server( + mut app_server_task: JoinHandle>, + mut stop_rx: watch::Receiver, +) -> anyhow::Result<()> { + tokio::select! { + app_server_result = &mut app_server_task => { + app_server_result + .context("foreground app-server task failed to join")? + .context("foreground app-server exited with an error")?; + } + _ = wait_for_stop_signal(&mut stop_rx) => { + abort_foreground_app_server(app_server_task).await; + } + } + + Ok(()) +} + +async fn wait_for_stop_signal(stop_rx: &mut watch::Receiver) { + if *stop_rx.borrow() { + return; + } + let _ = stop_rx.wait_for(|stopped| *stopped).await; +} + +fn foreground_app_server_exited_before_ready( + result: Result, tokio::task::JoinError>, +) -> anyhow::Error { + match result { + Ok(Ok(())) => { + anyhow::anyhow!("foreground app-server exited before remote control became ready") + } + Ok(Err(error)) => anyhow::Error::new(error) + .context("foreground app-server exited before remote control became ready"), + Err(error) => anyhow::Error::new(error) + .context("foreground app-server task failed before remote control became ready"), + } +} + +async fn abort_foreground_app_server(app_server_task: JoinHandle>) { + app_server_task.abort(); + let _ = timeout(FOREGROUND_APP_SERVER_ABORT_TIMEOUT, app_server_task).await; +} + +async fn wait_for_foreground_remote_control_ready( + socket_path: AbsolutePathBuf, +) -> anyhow::Result { + let mut client = connect_foreground_client(socket_path).await?; + enable_remote_control_and_wait(&mut client).await +} + +async fn connect_foreground_client( + socket_path: AbsolutePathBuf, +) -> anyhow::Result { + let socket_path_display = socket_path.as_path().display().to_string(); + let connect_args = RemoteAppServerConnectArgs { + endpoint: RemoteAppServerEndpoint::UnixSocket { socket_path }, + client_name: REMOTE_CONTROL_CLIENT_NAME.to_string(), + client_version: env!("CARGO_PKG_VERSION").to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }; + let deadline = Instant::now() + FOREGROUND_SOCKET_CONNECT_TIMEOUT; + loop { + match RemoteAppServerClient::connect(connect_args.clone()).await { + Ok(client) => return Ok(client), + Err(_) if Instant::now() < deadline => { + sleep(FOREGROUND_SOCKET_CONNECT_RETRY_DELAY).await; + } + Err(error) => { + return Err(error).with_context(|| { + format!("app server did not become ready on {socket_path_display}") + }); + } + } + } +} + +async fn enable_remote_control_and_wait( + client: &mut RemoteAppServerClient, +) -> anyhow::Result { + let enable_response: RemoteControlEnableResponse = timeout( + REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT, + client.request_typed(ClientRequest::RemoteControlEnable { + request_id: RequestId::String("remote-control-enable".to_string()), + params: None, + }), + ) + .await + .context("timed out waiting for remoteControl/enable response")? + .context("failed to enable remote control")?; + wait_for_remote_control_ready( + client, + RemoteControlReadySummary::from(enable_response), + REMOTE_CONTROL_READY_TIMEOUT, + ) + .await +} + +async fn wait_for_remote_control_ready( + client: &mut RemoteAppServerClient, + mut summary: RemoteControlReadySummary, + ready_timeout: Duration, +) -> anyhow::Result { + if summary.status != RemoteControlConnectionStatus::Connecting { + return Ok(summary); + } + + let deadline = Instant::now() + ready_timeout; + loop { + let now = Instant::now(); + if now >= deadline { + summary.timed_out = true; + return Ok(summary); + } + + match timeout(deadline.duration_since(now), client.next_event()).await { + Ok(Some(event)) => { + if apply_remote_control_event(&mut summary, event) + && summary.status != RemoteControlConnectionStatus::Connecting + { + return Ok(summary); + } + } + Ok(None) => { + anyhow::bail!("app-server disconnected before remote control became ready"); + } + Err(_) => { + summary.timed_out = true; + return Ok(summary); + } + } + } +} + +fn apply_remote_control_event( + summary: &mut RemoteControlReadySummary, + event: AppServerEvent, +) -> bool { + match event { + AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged( + notification, + )) => { + *summary = RemoteControlReadySummary::from(notification); + true + } + AppServerEvent::Lagged { skipped: _ } + | AppServerEvent::ServerNotification(_) + | AppServerEvent::ServerRequest(_) + | AppServerEvent::Disconnected { message: _ } => false, + } +} + +fn print_remote_control_start_output( + output: &AppServerRemoteControlReadyOutput, + json: bool, +) -> anyhow::Result<()> { + if json { + println!( + "{}", + serde_json::to_string(&RemoteControlStartJsonOutput::daemon(output))? + ); + return Ok(()); + } + + let summary = RemoteControlReadySummary::from(&output.remote_control); + for line in remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Daemon)? { + println!("{line}"); + } + Ok(()) +} + +fn print_foreground_ready_output( + summary: &RemoteControlReadySummary, + json: bool, +) -> anyhow::Result<()> { + if json { + ensure_remote_control_startable(summary)?; + println!( + "{}", + serde_json::to_string(&RemoteControlStartJsonOutput::foreground(summary))? + ); + return Ok(()); + } + + for line in remote_control_start_human_lines(summary, RemoteControlHumanOutputMode::Foreground)? + { + println!("{line}"); + } + Ok(()) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RemoteControlReadySummary { + status: RemoteControlConnectionStatus, + server_name: String, + environment_id: Option, + timed_out: bool, +} + +impl From for RemoteControlReadySummary { + fn from(response: RemoteControlEnableResponse) -> Self { + let RemoteControlEnableResponse { + status, + server_name, + installation_id: _, + environment_id, + } = response; + Self { + status, + server_name, + environment_id, + timed_out: false, + } + } +} + +impl From for RemoteControlReadySummary { + fn from(notification: RemoteControlStatusChangedNotification) -> Self { + let RemoteControlStatusChangedNotification { + status, + server_name, + installation_id: _, + environment_id, + } = notification; + Self { + status, + server_name, + environment_id, + timed_out: false, + } + } +} + +impl From<&AppServerRemoteControlReadyStatus> for RemoteControlReadySummary { + fn from(status: &AppServerRemoteControlReadyStatus) -> Self { + Self { + status: status.status, + server_name: status.server_name.clone(), + environment_id: status.environment_id.clone(), + timed_out: status.timed_out, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct RemoteControlStartJsonOutput<'a> { + mode: RemoteControlModeJson, + status: RemoteControlConnectionStatus, + server_name: &'a str, + environment_id: Option<&'a str>, + timed_out: bool, + #[serde(skip_serializing_if = "Option::is_none")] + daemon: Option<&'a AppServerRemoteControlStartOutput>, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +enum RemoteControlModeJson { + Foreground, + Daemon, +} + +impl<'a> RemoteControlStartJsonOutput<'a> { + fn foreground(summary: &'a RemoteControlReadySummary) -> Self { + Self { + mode: RemoteControlModeJson::Foreground, + status: summary.status, + server_name: &summary.server_name, + environment_id: summary.environment_id.as_deref(), + timed_out: summary.timed_out, + daemon: None, + } + } + + fn daemon(output: &'a AppServerRemoteControlReadyOutput) -> Self { + let remote_control = &output.remote_control; + Self { + mode: RemoteControlModeJson::Daemon, + status: remote_control.status, + server_name: &remote_control.server_name, + environment_id: remote_control.environment_id.as_deref(), + timed_out: remote_control.timed_out, + daemon: Some(&output.daemon), + } + } +} + +fn remote_control_start_human_message( + output: &RemoteControlReadySummary, +) -> anyhow::Result { + ensure_remote_control_startable(output)?; + match output.status { + RemoteControlConnectionStatus::Connected => Ok(format!( + "This machine is available for remote control as {}.", + output.server_name + )), + RemoteControlConnectionStatus::Connecting => Ok(format!( + "Remote control is enabled on {} and still connecting.", + output.server_name + )), + RemoteControlConnectionStatus::Errored | RemoteControlConnectionStatus::Disabled => { + unreachable!("errored and disabled statuses are rejected before formatting") + } + } +} + +fn ensure_remote_control_startable(output: &RemoteControlReadySummary) -> anyhow::Result<()> { + match output.status { + RemoteControlConnectionStatus::Connected | RemoteControlConnectionStatus::Connecting => { + Ok(()) + } + RemoteControlConnectionStatus::Errored => { + anyhow::bail!( + "Remote control is enabled on {} but the connection is errored.", + output.server_name + ); + } + RemoteControlConnectionStatus::Disabled => { + anyhow::bail!("Remote control is disabled on {}.", output.server_name); + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RemoteControlHumanOutputMode { + Foreground, + Daemon, +} + +fn remote_control_start_human_lines( + summary: &RemoteControlReadySummary, + mode: RemoteControlHumanOutputMode, +) -> anyhow::Result> { + let mut lines = vec![remote_control_start_human_message(summary)?]; + match mode { + RemoteControlHumanOutputMode::Foreground => { + lines.push("Press Ctrl-C to stop.".to_string()); + } + RemoteControlHumanOutputMode::Daemon => {} + } + Ok(lines) +} + +fn print_remote_control_stop_output( + output: &AppServerLifecycleOutput, + json: bool, +) -> anyhow::Result<()> { + if json { + println!("{}", serde_json::to_string(output)?); + return Ok(()); + } + + println!("{}", remote_control_stop_human_message(output)); + Ok(()) +} + +fn remote_control_stop_human_message(output: &AppServerLifecycleOutput) -> String { + match output.status { + AppServerLifecycleStatus::Stopped => "Remote control stopped.".to_string(), + AppServerLifecycleStatus::NotRunning => "Remote control is not running.".to_string(), + AppServerLifecycleStatus::Started + | AppServerLifecycleStatus::Restarted + | AppServerLifecycleStatus::AlreadyRunning + | AppServerLifecycleStatus::Running => { + format!( + "Remote control stop completed with status {:?}.", + output.status + ) + } + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use serde_json::json; + use std::path::PathBuf; + + use super::*; + + fn remote_control_status(status: RemoteControlConnectionStatus) -> RemoteControlReadySummary { + RemoteControlReadySummary { + status, + server_name: "owen-mbp".to_string(), + environment_id: Some("env_test".to_string()), + timed_out: status == RemoteControlConnectionStatus::Connecting, + } + } + + fn enable_response(status: RemoteControlConnectionStatus) -> RemoteControlEnableResponse { + RemoteControlEnableResponse { + status, + server_name: "owen-mbp".to_string(), + installation_id: "install_test".to_string(), + environment_id: Some("env_test".to_string()), + } + } + + fn status_notification( + status: RemoteControlConnectionStatus, + ) -> RemoteControlStatusChangedNotification { + RemoteControlStatusChangedNotification { + status, + server_name: "owen-mbp".to_string(), + installation_id: "install_test".to_string(), + environment_id: Some("env_test".to_string()), + } + } + + fn daemon_ready_output( + status: RemoteControlConnectionStatus, + ) -> AppServerRemoteControlReadyOutput { + AppServerRemoteControlReadyOutput { + daemon: AppServerRemoteControlStartOutput::Start(AppServerLifecycleOutput { + status: AppServerLifecycleStatus::Started, + backend: None, + pid: Some(42), + socket_path: PathBuf::from("/tmp/app-server-control.sock"), + cli_version: Some("1.0.0".to_string()), + app_server_version: Some("2.0.0".to_string()), + }), + remote_control: AppServerRemoteControlReadyStatus { + status, + server_name: "owen-mbp".to_string(), + environment_id: Some("env_test".to_string()), + timed_out: status == RemoteControlConnectionStatus::Connecting, + }, + } + } + + #[test] + fn remote_control_human_start_messages_use_server_name() { + assert_eq!( + remote_control_start_human_message(&remote_control_status( + RemoteControlConnectionStatus::Connected + )) + .expect("connected message"), + "This machine is available for remote control as owen-mbp." + ); + assert_eq!( + remote_control_start_human_message(&remote_control_status( + RemoteControlConnectionStatus::Connecting + )) + .expect("connecting message"), + "Remote control is enabled on owen-mbp and still connecting." + ); + assert_eq!( + remote_control_start_human_message(&remote_control_status( + RemoteControlConnectionStatus::Errored + )) + .expect_err("errored status should fail") + .to_string(), + "Remote control is enabled on owen-mbp but the connection is errored." + ); + assert_eq!( + remote_control_start_human_message(&remote_control_status( + RemoteControlConnectionStatus::Disabled + )) + .expect_err("disabled status should fail") + .to_string(), + "Remote control is disabled on owen-mbp." + ); + } + + #[test] + fn remote_control_human_lines_include_foreground_stop_hint_only() { + let summary = remote_control_status(RemoteControlConnectionStatus::Connected); + + assert_eq!( + remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Foreground) + .expect("foreground lines"), + vec![ + "This machine is available for remote control as owen-mbp.".to_string(), + "Press Ctrl-C to stop.".to_string(), + ] + ); + assert_eq!( + remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Daemon) + .expect("daemon lines"), + vec!["This machine is available for remote control as owen-mbp.".to_string()] + ); + } + + #[test] + fn remote_control_json_output_marks_foreground_or_daemon() { + let foreground_summary = remote_control_status(RemoteControlConnectionStatus::Connected); + assert_eq!( + serde_json::to_value(RemoteControlStartJsonOutput::foreground( + &foreground_summary + )) + .expect("foreground JSON"), + json!({ + "mode": "foreground", + "status": "connected", + "serverName": "owen-mbp", + "environmentId": "env_test", + "timedOut": false, + }) + ); + + let daemon_output = daemon_ready_output(RemoteControlConnectionStatus::Connected); + assert_eq!( + serde_json::to_value(RemoteControlStartJsonOutput::daemon(&daemon_output)) + .expect("daemon JSON"), + json!({ + "mode": "daemon", + "status": "connected", + "serverName": "owen-mbp", + "environmentId": "env_test", + "timedOut": false, + "daemon": { + "status": "started", + "pid": 42, + "socketPath": "/tmp/app-server-control.sock", + "cliVersion": "1.0.0", + "appServerVersion": "2.0.0", + }, + }) + ); + } + + #[test] + fn remote_control_summary_uses_enable_response_as_authoritative() { + assert_eq!( + RemoteControlReadySummary::from(enable_response( + RemoteControlConnectionStatus::Connected + )), + remote_control_status(RemoteControlConnectionStatus::Connected) + ); + } + + #[test] + fn remote_control_status_notification_updates_connecting_summary() { + let mut summary = remote_control_status(RemoteControlConnectionStatus::Connecting); + + let changed = apply_remote_control_event( + &mut summary, + AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged( + status_notification(RemoteControlConnectionStatus::Connected), + )), + ); + + assert!(changed); + assert_eq!( + summary, + remote_control_status(RemoteControlConnectionStatus::Connected) + ); + } + + #[tokio::test] + async fn foreground_wait_aborts_app_server_on_stop_signal() { + let app_server_task = tokio::spawn(std::future::pending::>()); + let (stop_tx, stop_rx) = tokio::sync::watch::channel(false); + stop_tx.send(true).expect("send stop signal"); + + tokio::time::timeout( + std::time::Duration::from_secs(1), + wait_for_foreground_app_server(app_server_task, stop_rx), + ) + .await + .expect("foreground wait should return after stop signal") + .expect("stop signal should shut down cleanly"); + } + + #[tokio::test] + async fn foreground_start_wait_stops_before_ready() { + let mut app_server_task = tokio::spawn(std::future::pending::>()); + let (stop_tx, stop_rx) = tokio::sync::watch::channel(false); + stop_tx.send(true).expect("send stop signal"); + + let startup = tokio::time::timeout( + std::time::Duration::from_secs(1), + wait_for_foreground_remote_control_start( + &mut app_server_task, + std::future::pending::>(), + stop_rx, + ), + ) + .await + .expect("foreground startup wait should return after stop signal"); + + assert!(matches!(startup, ForegroundStartupResult::Stopped)); + app_server_task.abort(); + let _ = app_server_task.await; + } + + #[tokio::test] + async fn foreground_start_wait_reports_app_server_exit_before_ready() { + let mut app_server_task = + tokio::spawn(async { Err(std::io::Error::other("startup failed before socket bind")) }); + let (_stop_tx, stop_rx) = tokio::sync::watch::channel(false); + + let startup = tokio::time::timeout( + std::time::Duration::from_secs(1), + wait_for_foreground_remote_control_start( + &mut app_server_task, + std::future::pending::>(), + stop_rx, + ), + ) + .await + .expect("foreground startup wait should return after app-server exits"); + + let ForegroundStartupResult::AppServerExited(error) = startup else { + panic!("expected app-server exit before ready"); + }; + + assert_eq!( + error.to_string(), + "foreground app-server exited before remote control became ready" + ); + } +}