mirror of
https://github.com/openai/codex.git
synced 2026-05-16 01:02:48 +00:00
improve remote-control CLI UX
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2223,6 +2223,7 @@ dependencies = [
|
||||
"clap_complete",
|
||||
"codex-api",
|
||||
"codex-app-server",
|
||||
"codex-app-server-client",
|
||||
"codex-app-server-daemon",
|
||||
"codex-app-server-protocol",
|
||||
"codex-app-server-test-client",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
mod pid;
|
||||
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use serde::Serialize;
|
||||
@@ -31,3 +32,15 @@ pub(crate) fn pid_backend(paths: BackendPaths) -> PidBackend {
|
||||
pub(crate) fn pid_update_loop_backend(paths: BackendPaths) -> PidBackend {
|
||||
PidBackend::new_update_loop(paths.codex_bin, paths.update_pid_file)
|
||||
}
|
||||
|
||||
pub(crate) async fn append_stderr_log_tail_context(pid_file: &Path, context: &mut String) {
|
||||
match pid::read_stderr_log_tail(pid_file).await {
|
||||
Ok(Some(tail)) => tail.append_to_context(context),
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
context.push_str(&format!(
|
||||
"\n\nFailed to read managed app-server stderr log: {err:#}"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::io::SeekFrom;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
#[cfg(unix)]
|
||||
@@ -10,6 +11,8 @@ use anyhow::bail;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
#[cfg(unix)]
|
||||
use tokio::process::Command;
|
||||
use tokio::time::sleep;
|
||||
@@ -18,6 +21,7 @@ const STOP_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||
const STOP_GRACE_PERIOD: Duration = Duration::from_secs(60);
|
||||
const STOP_TIMEOUT: Duration = Duration::from_secs(70);
|
||||
const START_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const STDERR_LOG_TAIL_BYTES: u64 = 4096;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
@@ -35,6 +39,25 @@ struct PidRecord {
|
||||
process_start_time: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct PidLogTail {
|
||||
pub(crate) path: PathBuf,
|
||||
pub(crate) contents: String,
|
||||
}
|
||||
|
||||
impl PidLogTail {
|
||||
pub(crate) fn append_to_context(&self, context: &mut String) {
|
||||
context.push_str(&format!(
|
||||
"\n\nManaged app-server stderr ({}):",
|
||||
self.path.display()
|
||||
));
|
||||
for line in self.contents.lines() {
|
||||
context.push_str("\n ");
|
||||
context.push_str(line);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
enum PidFileState {
|
||||
Missing,
|
||||
@@ -129,11 +152,18 @@ impl PidBackend {
|
||||
}
|
||||
};
|
||||
let mut command = Command::new(&self.codex_bin);
|
||||
let stderr_log = match self.open_stderr_log().await {
|
||||
Ok(stderr_log) => stderr_log,
|
||||
Err(err) => {
|
||||
let _ = fs::remove_file(&self.pid_file).await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
command
|
||||
.args(self.command_args())
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::null());
|
||||
.stderr(Stdio::from(stderr_log.into_std().await));
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
@@ -169,8 +199,11 @@ impl PidBackend {
|
||||
},
|
||||
Err(err) => {
|
||||
let _ = self.terminate_process(pid);
|
||||
let mut context =
|
||||
format!("failed to record pid-managed app-server process {pid} startup");
|
||||
super::append_stderr_log_tail_context(&self.pid_file, &mut context).await;
|
||||
let _ = fs::remove_file(&self.pid_file).await;
|
||||
return Err(err);
|
||||
return Err(err).context(context);
|
||||
}
|
||||
};
|
||||
let contents = serde_json::to_vec(&record).context("failed to serialize pid record")?;
|
||||
@@ -344,6 +377,23 @@ impl PidBackend {
|
||||
Ok(reservation_lock)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn open_stderr_log(&self) -> Result<fs::File> {
|
||||
let stderr_log_file = stderr_log_file_for_pid_file(&self.pid_file);
|
||||
fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(&stderr_log_file)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to open stderr log for pid-managed app server {}",
|
||||
stderr_log_file.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn command_args(&self) -> Vec<&'static str> {
|
||||
match self.command_kind {
|
||||
@@ -376,6 +426,56 @@ impl PidBackend {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn read_stderr_log_tail(pid_file: &Path) -> Result<Option<PidLogTail>> {
|
||||
let path = stderr_log_file_for_pid_file(pid_file);
|
||||
let Some(contents) = read_log_tail(&path, STDERR_LOG_TAIL_BYTES).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(Some(PidLogTail { path, contents }))
|
||||
}
|
||||
|
||||
fn stderr_log_file_for_pid_file(pid_file: &Path) -> PathBuf {
|
||||
pid_file.with_extension("stderr.log")
|
||||
}
|
||||
|
||||
async fn read_log_tail(path: &Path, byte_limit: u64) -> Result<Option<String>> {
|
||||
let mut file = match fs::File::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(err) => {
|
||||
return Err(err)
|
||||
.with_context(|| format!("failed to open stderr log {}", path.display()));
|
||||
}
|
||||
};
|
||||
let len = file
|
||||
.metadata()
|
||||
.await
|
||||
.with_context(|| format!("failed to inspect stderr log {}", path.display()))?
|
||||
.len();
|
||||
if len == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let start = len.saturating_sub(byte_limit);
|
||||
file.seek(SeekFrom::Start(start))
|
||||
.await
|
||||
.with_context(|| format!("failed to seek stderr log {}", path.display()))?;
|
||||
let mut bytes = Vec::new();
|
||||
file.read_to_end(&mut bytes)
|
||||
.await
|
||||
.with_context(|| format!("failed to read stderr log {}", path.display()))?;
|
||||
if start > 0
|
||||
&& let Some(newline_index) = bytes.iter().position(|byte| *byte == b'\n')
|
||||
{
|
||||
bytes.drain(..=newline_index);
|
||||
}
|
||||
let contents = String::from_utf8_lossy(&bytes).trim_end().to_string();
|
||||
if contents.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(contents))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn process_exists(pid: u32) -> bool {
|
||||
let Ok(pid) = libc::pid_t::try_from(pid) else {
|
||||
|
||||
@@ -6,7 +6,10 @@ use tempfile::TempDir;
|
||||
use super::PidBackend;
|
||||
use super::PidCommandKind;
|
||||
use super::PidFileState;
|
||||
use super::PidLogTail;
|
||||
use super::PidRecord;
|
||||
use super::read_stderr_log_tail;
|
||||
use super::stderr_log_file_for_pid_file;
|
||||
use super::try_lock_file;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -170,3 +173,24 @@ fn app_server_remote_control_uses_runtime_flag() {
|
||||
vec!["app-server", "--remote-control", "--listen", "unix://"]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_stderr_log_tail_returns_recent_complete_lines() {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let pid_file = temp_dir.path().join("app-server.pid");
|
||||
let log_file = stderr_log_file_for_pid_file(&pid_file);
|
||||
let contents = format!("{}\nrecent error\nusage", "x".repeat(4100));
|
||||
tokio::fs::write(&log_file, contents)
|
||||
.await
|
||||
.expect("write stderr log");
|
||||
|
||||
assert_eq!(
|
||||
read_stderr_log_tail(&pid_file)
|
||||
.await
|
||||
.expect("read stderr log"),
|
||||
Some(PidLogTail {
|
||||
path: log_file,
|
||||
contents: "recent error\nusage".to_string(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::InitializeResponse;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
@@ -14,12 +15,16 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_uds::UnixStream;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::client_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
const PROBE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
pub(crate) const CONTROL_SOCKET_RESPONSE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const CLIENT_NAME: &str = "codex_app_server_daemon";
|
||||
const INITIALIZE_REQUEST_ID: RequestId = RequestId::Integer(1);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct ProbeInfo {
|
||||
@@ -27,7 +32,7 @@ pub(crate) struct ProbeInfo {
|
||||
}
|
||||
|
||||
pub(crate) async fn probe(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
timeout(PROBE_TIMEOUT, probe_inner(socket_path))
|
||||
timeout(CONTROL_SOCKET_RESPONSE_TIMEOUT, probe_inner(socket_path))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
@@ -38,54 +43,14 @@ pub(crate) async fn probe(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
}
|
||||
|
||||
async fn probe_inner(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
let stream = UnixStream::connect(socket_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to connect to {}", socket_path.display()))?;
|
||||
let (mut websocket, _response) = client_async("ws://localhost/", stream)
|
||||
.await
|
||||
.with_context(|| format!("failed to upgrade {}", socket_path.display()))?;
|
||||
|
||||
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(1),
|
||||
method: "initialize".to_string(),
|
||||
params: Some(serde_json::to_value(InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: CLIENT_NAME.to_string(),
|
||||
title: Some("Codex App Server Daemon".to_string()),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
})?),
|
||||
trace: None,
|
||||
});
|
||||
websocket
|
||||
.send(Message::Text(serde_json::to_string(&initialize)?.into()))
|
||||
.await
|
||||
.context("failed to send initialize request")?;
|
||||
|
||||
let response = loop {
|
||||
let frame = websocket
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("app-server closed before initialize response"))??;
|
||||
let Message::Text(payload) = frame else {
|
||||
continue;
|
||||
};
|
||||
let message = serde_json::from_str::<JSONRPCMessage>(&payload)?;
|
||||
if let JSONRPCMessage::Response(response) = message
|
||||
&& response.id == RequestId::Integer(1)
|
||||
{
|
||||
break response;
|
||||
}
|
||||
};
|
||||
let initialize_response = serde_json::from_value::<InitializeResponse>(response.result)?;
|
||||
let mut websocket = connect(socket_path).await?;
|
||||
|
||||
let initialize_response = initialize(&mut websocket, /*experimental_api*/ false).await?;
|
||||
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "initialized".to_string(),
|
||||
params: None,
|
||||
});
|
||||
websocket
|
||||
.send(Message::Text(serde_json::to_string(&initialized)?.into()))
|
||||
send_message(&mut websocket, &initialized)
|
||||
.await
|
||||
.context("failed to send initialized notification")?;
|
||||
websocket.close(None).await.ok();
|
||||
@@ -95,6 +60,91 @@ async fn probe_inner(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn connect(socket_path: &Path) -> Result<WebSocketStream<UnixStream>> {
|
||||
let stream = UnixStream::connect(socket_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to connect to {}", socket_path.display()))?;
|
||||
let (websocket, _response) = client_async("ws://localhost/", stream)
|
||||
.await
|
||||
.with_context(|| format!("failed to upgrade {}", socket_path.display()))?;
|
||||
Ok(websocket)
|
||||
}
|
||||
|
||||
pub(crate) async fn initialize<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
experimental_api: bool,
|
||||
) -> Result<InitializeResponse>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: INITIALIZE_REQUEST_ID,
|
||||
method: "initialize".to_string(),
|
||||
params: Some(serde_json::to_value(InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: CLIENT_NAME.to_string(),
|
||||
title: Some("Codex App Server Daemon".to_string()),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
},
|
||||
capabilities: if experimental_api {
|
||||
Some(InitializeCapabilities {
|
||||
experimental_api: true,
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
},
|
||||
})?),
|
||||
trace: None,
|
||||
});
|
||||
send_message(websocket, &initialize)
|
||||
.await
|
||||
.context("failed to send initialize request")?;
|
||||
|
||||
let response = loop {
|
||||
let message = timeout(CONTROL_SOCKET_RESPONSE_TIMEOUT, read_message(websocket))
|
||||
.await
|
||||
.context("timed out waiting for initialize response")??;
|
||||
if let JSONRPCMessage::Response(response) = message
|
||||
&& response.id == INITIALIZE_REQUEST_ID
|
||||
{
|
||||
break response;
|
||||
}
|
||||
};
|
||||
serde_json::from_value::<InitializeResponse>(response.result)
|
||||
.context("failed to parse initialize response")
|
||||
}
|
||||
|
||||
pub(crate) async fn send_message<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
message: &JSONRPCMessage,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
websocket
|
||||
.send(Message::Text(serde_json::to_string(message)?.into()))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn read_message<S>(websocket: &mut WebSocketStream<S>) -> Result<JSONRPCMessage>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
loop {
|
||||
let frame = websocket
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("app-server closed the control socket"))??;
|
||||
let Message::Text(payload) = frame else {
|
||||
continue;
|
||||
};
|
||||
return serde_json::from_str::<JSONRPCMessage>(&payload)
|
||||
.context("failed to parse app-server JSON-RPC message");
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_version_from_user_agent(user_agent: &str) -> Result<String> {
|
||||
let (_originator, rest) = user_agent
|
||||
.split_once('/')
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod backend;
|
||||
mod client;
|
||||
mod managed_install;
|
||||
mod remote_control_client;
|
||||
mod settings;
|
||||
mod update_loop;
|
||||
|
||||
@@ -13,6 +14,7 @@ use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
pub use backend::BackendKind;
|
||||
use backend::BackendPaths;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_app_server_transport::app_server_control_socket_path;
|
||||
use codex_utils_home_dir::find_codex_home;
|
||||
use managed_install::managed_codex_bin;
|
||||
@@ -96,6 +98,20 @@ pub enum RemoteControlStartOutput {
|
||||
Start(LifecycleOutput),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteControlReadyStatus {
|
||||
pub status: RemoteControlConnectionStatus,
|
||||
pub server_name: String,
|
||||
pub environment_id: Option<String>,
|
||||
pub timed_out: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteControlReadyOutput {
|
||||
pub daemon: RemoteControlStartOutput,
|
||||
pub remote_control: RemoteControlReadyStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RemoteControlMode {
|
||||
Enabled,
|
||||
@@ -179,6 +195,13 @@ pub async fn ensure_remote_control_started() -> Result<RemoteControlStartOutput>
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn ensure_remote_control_ready() -> Result<RemoteControlReadyOutput> {
|
||||
ensure_supported_platform()?;
|
||||
Daemon::from_environment()?
|
||||
.ensure_remote_control_ready()
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_remote_control(mode: RemoteControlMode) -> Result<RemoteControlOutput> {
|
||||
ensure_supported_platform()?;
|
||||
Daemon::from_environment()?.set_remote_control(mode).await
|
||||
@@ -395,17 +418,22 @@ impl Daemon {
|
||||
sleep(START_POLL_INTERVAL).await;
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err).with_context(|| {
|
||||
format!(
|
||||
"app server did not become ready on {}",
|
||||
self.socket_path.display()
|
||||
)
|
||||
});
|
||||
let context = self.app_server_not_ready_context().await;
|
||||
return Err(err).context(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn app_server_not_ready_context(&self) -> String {
|
||||
let mut context = format!(
|
||||
"app server did not become ready on {}",
|
||||
self.socket_path.display()
|
||||
);
|
||||
backend::append_stderr_log_tail_context(&self.pid_file, &mut context).await;
|
||||
context
|
||||
}
|
||||
|
||||
async fn bootstrap(&self, options: BootstrapOptions) -> Result<BootstrapOutput> {
|
||||
let _operation_lock = self.acquire_operation_lock().await?;
|
||||
self.bootstrap_locked(options).await
|
||||
@@ -430,6 +458,16 @@ impl Daemon {
|
||||
Ok(RemoteControlStartOutput::Bootstrap(output))
|
||||
}
|
||||
|
||||
async fn ensure_remote_control_ready(&self) -> Result<RemoteControlReadyOutput> {
|
||||
let daemon = self.ensure_remote_control_started().await?;
|
||||
let remote_control =
|
||||
remote_control_client::enable_remote_control(&self.socket_path).await?;
|
||||
Ok(RemoteControlReadyOutput {
|
||||
daemon,
|
||||
remote_control,
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_remote_control(&self, mode: RemoteControlMode) -> Result<RemoteControlOutput> {
|
||||
let _operation_lock = self.acquire_operation_lock().await?;
|
||||
self.set_remote_control_locked(mode).await
|
||||
|
||||
420
codex-rs/app-server-daemon/src/remote_control_client.rs
Normal file
420
codex-rs/app-server-daemon/src/remote_control_client.rs
Normal file
@@ -0,0 +1,420 @@
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_app_server_protocol::RemoteControlEnableResponse;
|
||||
use codex_app_server_protocol::RemoteControlStatusChangedNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
|
||||
use crate::RemoteControlReadyStatus;
|
||||
use crate::client;
|
||||
|
||||
const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_CONTROL_ENABLE_REQUEST_ID: RequestId = RequestId::Integer(2);
|
||||
|
||||
pub(crate) async fn enable_remote_control(socket_path: &Path) -> Result<RemoteControlReadyStatus> {
|
||||
enable_remote_control_with_timeout(socket_path, REMOTE_CONTROL_READY_TIMEOUT).await
|
||||
}
|
||||
|
||||
async fn enable_remote_control_with_timeout(
|
||||
socket_path: &Path,
|
||||
ready_timeout: Duration,
|
||||
) -> Result<RemoteControlReadyStatus> {
|
||||
let mut websocket = client::connect(socket_path).await?;
|
||||
|
||||
client::initialize(&mut websocket, /*experimental_api*/ true).await?;
|
||||
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "initialized".to_string(),
|
||||
params: None,
|
||||
});
|
||||
client::send_message(&mut websocket, &initialized)
|
||||
.await
|
||||
.context("failed to send initialized notification")?;
|
||||
|
||||
let enable = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: REMOTE_CONTROL_ENABLE_REQUEST_ID,
|
||||
method: "remoteControl/enable".to_string(),
|
||||
params: None,
|
||||
trace: None,
|
||||
});
|
||||
client::send_message(&mut websocket, &enable)
|
||||
.await
|
||||
.context("failed to send remoteControl/enable request")?;
|
||||
|
||||
let mut latest = read_enable_response(&mut websocket).await?;
|
||||
if latest.status == RemoteControlConnectionStatus::Connecting {
|
||||
latest = wait_for_remote_control_status(&mut websocket, latest, ready_timeout).await?;
|
||||
}
|
||||
websocket.close(None).await.ok();
|
||||
Ok(latest)
|
||||
}
|
||||
|
||||
async fn read_enable_response<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
) -> Result<RemoteControlReadyStatus>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
loop {
|
||||
let message = timeout(
|
||||
client::CONTROL_SOCKET_RESPONSE_TIMEOUT,
|
||||
client::read_message(websocket),
|
||||
)
|
||||
.await
|
||||
.context("timed out waiting for remoteControl/enable response")??;
|
||||
match message {
|
||||
JSONRPCMessage::Response(response)
|
||||
if response.id == REMOTE_CONTROL_ENABLE_REQUEST_ID =>
|
||||
{
|
||||
let response =
|
||||
serde_json::from_value::<RemoteControlEnableResponse>(response.result)
|
||||
.context("failed to parse remoteControl/enable response")?;
|
||||
return Ok(RemoteControlReadyStatus::from(response));
|
||||
}
|
||||
JSONRPCMessage::Error(err) if err.id == REMOTE_CONTROL_ENABLE_REQUEST_ID => {
|
||||
return Err(anyhow!(
|
||||
"remoteControl/enable failed: {}",
|
||||
err.error.message
|
||||
));
|
||||
}
|
||||
JSONRPCMessage::Notification(notification)
|
||||
if remote_control_status_notification(¬ification).is_some() =>
|
||||
{
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_remote_control_status<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
mut latest: RemoteControlReadyStatus,
|
||||
ready_timeout: Duration,
|
||||
) -> Result<RemoteControlReadyStatus>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let deadline = tokio::time::Instant::now() + ready_timeout;
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
let message = match timeout(remaining, client::read_message(websocket)).await {
|
||||
Ok(Ok(message)) => message,
|
||||
Ok(Err(err)) => return Err(err),
|
||||
Err(_) => {
|
||||
latest.timed_out = true;
|
||||
return Ok(latest);
|
||||
}
|
||||
};
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
continue;
|
||||
};
|
||||
let Some(status) = remote_control_status_notification(¬ification) else {
|
||||
continue;
|
||||
};
|
||||
latest = RemoteControlReadyStatus::from(status);
|
||||
if latest.status != RemoteControlConnectionStatus::Connecting {
|
||||
return Ok(latest);
|
||||
}
|
||||
}
|
||||
latest.timed_out = true;
|
||||
Ok(latest)
|
||||
}
|
||||
|
||||
fn remote_control_status_notification(
|
||||
notification: &JSONRPCNotification,
|
||||
) -> Option<RemoteControlStatusChangedNotification> {
|
||||
if notification.method != "remoteControl/status/changed" {
|
||||
return None;
|
||||
}
|
||||
let params = notification.params.clone()?;
|
||||
serde_json::from_value(params).ok()
|
||||
}
|
||||
|
||||
impl From<RemoteControlEnableResponse> for RemoteControlReadyStatus {
|
||||
fn from(response: RemoteControlEnableResponse) -> Self {
|
||||
let RemoteControlEnableResponse {
|
||||
status,
|
||||
server_name,
|
||||
installation_id: _,
|
||||
environment_id,
|
||||
} = response;
|
||||
Self {
|
||||
status,
|
||||
server_name,
|
||||
environment_id,
|
||||
timed_out: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemoteControlStatusChangedNotification> for RemoteControlReadyStatus {
|
||||
fn from(notification: RemoteControlStatusChangedNotification) -> Self {
|
||||
let RemoteControlStatusChangedNotification {
|
||||
status,
|
||||
server_name,
|
||||
installation_id: _,
|
||||
environment_id,
|
||||
} = notification;
|
||||
Self {
|
||||
status,
|
||||
server_name,
|
||||
environment_id,
|
||||
timed_out: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, unix))]
|
||||
mod tests {
|
||||
use anyhow::Result;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_uds::UnixListener;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio_tungstenite::accept_async;
|
||||
|
||||
use super::*;
|
||||
|
||||
const INITIALIZE_REQUEST_ID: RequestId = RequestId::Integer(1);
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
const TEST_SERVER_NAME: &str = "owen-mbp";
|
||||
const TEST_CODEX_HOME: &str = "/tmp/codex-home";
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_uses_connected_enable_response_without_later_notification()
|
||||
-> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: Some(remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected,
|
||||
Some("env_test"),
|
||||
)),
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected,
|
||||
Some("env_test"),
|
||||
),
|
||||
after_enable_notification: None,
|
||||
ready_timeout: Duration::from_millis(20),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Connected,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_waits_for_connected_notification() -> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: None,
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Connecting,
|
||||
/*environment_id*/ None,
|
||||
),
|
||||
after_enable_notification: Some(remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected,
|
||||
Some("env_test"),
|
||||
)),
|
||||
ready_timeout: Duration::from_secs(1),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Connected,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_reports_connecting_after_timeout() -> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: None,
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Connecting,
|
||||
/*environment_id*/ None,
|
||||
),
|
||||
after_enable_notification: None,
|
||||
ready_timeout: Duration::from_millis(20),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Connecting,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: None,
|
||||
timed_out: true,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_returns_errored_enable_response() -> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: None,
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Errored,
|
||||
/*environment_id*/ None,
|
||||
),
|
||||
after_enable_notification: None,
|
||||
ready_timeout: Duration::from_millis(20),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Errored,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: None,
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnableScenario {
|
||||
initial_notification: Option<RemoteControlStatusChangedNotification>,
|
||||
enable_response: RemoteControlStatusChangedNotification,
|
||||
after_enable_notification: Option<RemoteControlStatusChangedNotification>,
|
||||
ready_timeout: Duration,
|
||||
}
|
||||
|
||||
async fn run_enable_remote_control_scenario(
|
||||
scenario: EnableScenario,
|
||||
) -> Result<RemoteControlReadyStatus> {
|
||||
let dir = TempDir::new()?;
|
||||
let socket_path = dir.path().join("app-server.sock");
|
||||
let listener = UnixListener::bind(&socket_path).await?;
|
||||
let ready_timeout = scenario.ready_timeout;
|
||||
let server_task = tokio::spawn(serve_enable_remote_control_scenario(listener, scenario));
|
||||
|
||||
let status = enable_remote_control_with_timeout(&socket_path, ready_timeout).await?;
|
||||
server_task.await??;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
async fn serve_enable_remote_control_scenario(
|
||||
mut listener: UnixListener,
|
||||
scenario: EnableScenario,
|
||||
) -> Result<()> {
|
||||
let stream = listener.accept().await?;
|
||||
let mut websocket = accept_async(stream).await?;
|
||||
|
||||
let initialize = client::read_message(&mut websocket).await?;
|
||||
let JSONRPCMessage::Request(initialize) = initialize else {
|
||||
panic!("expected initialize request");
|
||||
};
|
||||
assert_eq!(initialize.id, INITIALIZE_REQUEST_ID);
|
||||
assert_eq!(initialize.method, "initialize");
|
||||
let Some(initialize_params) = initialize.params else {
|
||||
panic!("expected initialize params");
|
||||
};
|
||||
assert_eq!(
|
||||
initialize_params["capabilities"]["experimentalApi"],
|
||||
serde_json::Value::Bool(true)
|
||||
);
|
||||
client::send_message(
|
||||
&mut websocket,
|
||||
&JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: INITIALIZE_REQUEST_ID,
|
||||
result: serde_json::json!({
|
||||
"userAgent": "codex_app_server/1.2.3",
|
||||
"codexHome": TEST_CODEX_HOME,
|
||||
"platformFamily": "unix",
|
||||
"platformOs": "macos",
|
||||
}),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let initialized = client::read_message(&mut websocket).await?;
|
||||
let JSONRPCMessage::Notification(initialized) = initialized else {
|
||||
panic!("expected initialized notification");
|
||||
};
|
||||
assert_eq!(initialized.method, "initialized");
|
||||
|
||||
if let Some(status) = scenario.initial_notification {
|
||||
send_remote_control_status(&mut websocket, status).await?;
|
||||
}
|
||||
|
||||
let enable = client::read_message(&mut websocket).await?;
|
||||
let JSONRPCMessage::Request(enable) = enable else {
|
||||
panic!("expected remoteControl/enable request");
|
||||
};
|
||||
assert_eq!(enable.id, REMOTE_CONTROL_ENABLE_REQUEST_ID);
|
||||
assert_eq!(enable.method, "remoteControl/enable");
|
||||
client::send_message(
|
||||
&mut websocket,
|
||||
&JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: REMOTE_CONTROL_ENABLE_REQUEST_ID,
|
||||
result: serde_json::to_value(RemoteControlEnableResponse::from(
|
||||
scenario.enable_response,
|
||||
))?,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(status) = scenario.after_enable_notification {
|
||||
send_remote_control_status(&mut websocket, status).await?;
|
||||
} else {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_remote_control_status<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
status: RemoteControlStatusChangedNotification,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
|
||||
{
|
||||
client::send_message(
|
||||
websocket,
|
||||
&JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "remoteControl/status/changed".to_string(),
|
||||
params: Some(serde_json::to_value(status)?),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn remote_control_status(
|
||||
status: RemoteControlConnectionStatus,
|
||||
environment_id: Option<&str>,
|
||||
) -> RemoteControlStatusChangedNotification {
|
||||
RemoteControlStatusChangedNotification {
|
||||
status,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
installation_id: TEST_INSTALLATION_ID.to_string(),
|
||||
environment_id: environment_id.map(str::to_string),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -400,6 +400,7 @@ pub enum PluginStartupTasks {
|
||||
pub struct AppServerRuntimeOptions {
|
||||
pub plugin_startup_tasks: PluginStartupTasks,
|
||||
pub remote_control_enabled: bool,
|
||||
pub install_shutdown_signal_handler: bool,
|
||||
}
|
||||
|
||||
impl Default for AppServerRuntimeOptions {
|
||||
@@ -407,6 +408,7 @@ impl Default for AppServerRuntimeOptions {
|
||||
Self {
|
||||
plugin_startup_tasks: PluginStartupTasks::Start,
|
||||
remote_control_enabled: false,
|
||||
install_shutdown_signal_handler: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -645,7 +647,8 @@ pub async fn run_main_with_transport_options(
|
||||
|
||||
let single_client_mode = matches!(&transport, AppServerTransport::Stdio);
|
||||
let shutdown_when_no_connections = single_client_mode;
|
||||
let graceful_signal_restart_enabled = !single_client_mode;
|
||||
let graceful_signal_restart_enabled =
|
||||
runtime_options.install_shutdown_signal_handler && !single_client_mode;
|
||||
let mut app_server_client_name_rx = None;
|
||||
|
||||
match &transport {
|
||||
|
||||
@@ -22,6 +22,7 @@ anyhow = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
clap_complete = { workspace = true }
|
||||
codex-app-server = { workspace = true }
|
||||
codex-app-server-client = { workspace = true }
|
||||
codex-app-server-daemon = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-app-server-test-client = { workspace = true }
|
||||
|
||||
@@ -52,6 +52,7 @@ mod doctor;
|
||||
mod marketplace_cmd;
|
||||
mod mcp_cmd;
|
||||
mod plugin_cmd;
|
||||
mod remote_control_cmd;
|
||||
mod state_db_recovery;
|
||||
#[cfg(not(windows))]
|
||||
mod wsl_paths;
|
||||
@@ -59,6 +60,7 @@ mod wsl_paths;
|
||||
use crate::mcp_cmd::McpCli;
|
||||
use crate::plugin_cmd::PluginCli;
|
||||
use crate::plugin_cmd::PluginSubcommand;
|
||||
use crate::remote_control_cmd::RemoteControlCommand;
|
||||
use doctor::DoctorCommand;
|
||||
use state_db_recovery as local_state_db;
|
||||
|
||||
@@ -559,21 +561,6 @@ struct AppServerBootstrapCommand {
|
||||
remote_control: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
struct RemoteControlCommand {
|
||||
#[command(subcommand)]
|
||||
subcommand: Option<RemoteControlSubcommand>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, clap::Subcommand)]
|
||||
enum RemoteControlSubcommand {
|
||||
/// Start the app-server daemon with remote control enabled.
|
||||
Start,
|
||||
|
||||
/// Stop the app-server daemon.
|
||||
Stop,
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
struct GenerateTsCommand {
|
||||
/// Output directory where .ts files will be written
|
||||
@@ -1063,24 +1050,18 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
Some(Subcommand::RemoteControl(remote_control_cli)) => {
|
||||
let subcommand_name = remote_control_subcommand_name(&remote_control_cli);
|
||||
let subcommand_name = remote_control_cli.subcommand_name();
|
||||
reject_remote_mode_for_subcommand(
|
||||
root_remote.as_deref(),
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
subcommand_name,
|
||||
)?;
|
||||
match remote_control_cli
|
||||
.subcommand
|
||||
.unwrap_or(RemoteControlSubcommand::Start)
|
||||
{
|
||||
RemoteControlSubcommand::Start => {
|
||||
let output = codex_app_server_daemon::ensure_remote_control_started().await?;
|
||||
println!("{}", serde_json::to_string(&output)?);
|
||||
}
|
||||
RemoteControlSubcommand::Stop => {
|
||||
print_app_server_daemon_output(AppServerLifecycleCommand::Stop).await?;
|
||||
}
|
||||
}
|
||||
remote_control_cmd::run(
|
||||
remote_control_cli,
|
||||
arg0_paths.clone(),
|
||||
root_config_overrides,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
#[cfg(any(target_os = "macos", target_os = "windows"))]
|
||||
Some(Subcommand::App(app_cli)) => {
|
||||
@@ -1802,9 +1783,7 @@ fn unsupported_subcommand_name_for_strict_config(
|
||||
Some(Subcommand::AppServer(app_server)) => {
|
||||
Some(app_server_subcommand_name(app_server.subcommand.as_ref()))
|
||||
}
|
||||
Some(Subcommand::RemoteControl(remote_control)) => {
|
||||
Some(remote_control_subcommand_name(remote_control))
|
||||
}
|
||||
Some(Subcommand::RemoteControl(remote_control)) => Some(remote_control.subcommand_name()),
|
||||
Some(Subcommand::Mcp(_)) => Some("mcp"),
|
||||
Some(Subcommand::Plugin(_)) => Some("plugin"),
|
||||
#[cfg(any(target_os = "macos", target_os = "windows"))]
|
||||
@@ -1857,14 +1836,6 @@ fn reject_remote_mode_for_app_server_subcommand(
|
||||
reject_remote_mode_for_subcommand(remote, remote_auth_token_env, subcommand_name)
|
||||
}
|
||||
|
||||
fn remote_control_subcommand_name(command: &RemoteControlCommand) -> &'static str {
|
||||
match command.subcommand {
|
||||
None => "remote-control",
|
||||
Some(RemoteControlSubcommand::Start) => "remote-control start",
|
||||
Some(RemoteControlSubcommand::Stop) => "remote-control stop",
|
||||
}
|
||||
}
|
||||
|
||||
fn app_server_subcommand_name(subcommand: Option<&AppServerSubcommand>) -> &'static str {
|
||||
match subcommand {
|
||||
None => "app-server",
|
||||
@@ -2879,12 +2850,10 @@ mod tests {
|
||||
fn reject_remote_flag_for_remote_control() {
|
||||
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "unix://", "remote-control"])
|
||||
.expect("parse");
|
||||
assert_matches!(
|
||||
cli.subcommand,
|
||||
Some(Subcommand::RemoteControl(RemoteControlCommand {
|
||||
subcommand: None
|
||||
}))
|
||||
);
|
||||
let Some(Subcommand::RemoteControl(remote_control)) = &cli.subcommand else {
|
||||
panic!("expected remote-control subcommand");
|
||||
};
|
||||
assert_eq!(remote_control.subcommand_name(), "remote-control");
|
||||
|
||||
let err = reject_remote_mode_for_subcommand(
|
||||
cli.remote.remote.as_deref(),
|
||||
|
||||
823
codex-rs/cli/src/remote_control_cmd.rs
Normal file
823
codex-rs/cli/src/remote_control_cmd.rs
Normal file
@@ -0,0 +1,823 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::Args;
|
||||
use codex_app_server::AppServerRuntimeOptions;
|
||||
use codex_app_server::AppServerTransport;
|
||||
use codex_app_server::AppServerWebsocketAuthSettings;
|
||||
use codex_app_server_client::AppServerEvent;
|
||||
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
use codex_app_server_client::RemoteAppServerClient;
|
||||
use codex_app_server_client::RemoteAppServerConnectArgs;
|
||||
use codex_app_server_client::RemoteAppServerEndpoint;
|
||||
use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand;
|
||||
use codex_app_server_daemon::LifecycleOutput as AppServerLifecycleOutput;
|
||||
use codex_app_server_daemon::LifecycleStatus as AppServerLifecycleStatus;
|
||||
use codex_app_server_daemon::RemoteControlReadyOutput as AppServerRemoteControlReadyOutput;
|
||||
use codex_app_server_daemon::RemoteControlReadyStatus as AppServerRemoteControlReadyStatus;
|
||||
use codex_app_server_daemon::RemoteControlStartOutput as AppServerRemoteControlStartOutput;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_app_server_protocol::RemoteControlEnableResponse;
|
||||
use codex_app_server_protocol::RemoteControlStatusChangedNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const FOREGROUND_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const FOREGROUND_SOCKET_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(50);
|
||||
const FOREGROUND_APP_SERVER_ABORT_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_CONTROL_CLIENT_NAME: &str = "codex-remote-control";
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub(crate) struct RemoteControlCommand {
|
||||
/// Emit machine-readable JSON.
|
||||
#[arg(long = "json", global = true)]
|
||||
json: bool,
|
||||
|
||||
#[command(subcommand)]
|
||||
subcommand: Option<RemoteControlSubcommand>,
|
||||
}
|
||||
|
||||
impl RemoteControlCommand {
|
||||
pub(crate) fn subcommand_name(&self) -> &'static str {
|
||||
match self.subcommand {
|
||||
None => "remote-control",
|
||||
Some(RemoteControlSubcommand::Start) => "remote-control start",
|
||||
Some(RemoteControlSubcommand::Stop) => "remote-control stop",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, clap::Subcommand)]
|
||||
enum RemoteControlSubcommand {
|
||||
/// Start the app-server daemon with remote control enabled.
|
||||
Start,
|
||||
|
||||
/// Stop the app-server daemon.
|
||||
Stop,
|
||||
}
|
||||
|
||||
pub(crate) async fn run(
|
||||
command: RemoteControlCommand,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
root_config_overrides: CliConfigOverrides,
|
||||
) -> anyhow::Result<()> {
|
||||
match command.subcommand {
|
||||
None => {
|
||||
run_foreground_remote_control(command.json, arg0_paths, root_config_overrides).await?;
|
||||
}
|
||||
Some(RemoteControlSubcommand::Start) => {
|
||||
let output = codex_app_server_daemon::ensure_remote_control_ready().await?;
|
||||
print_remote_control_start_output(&output, command.json)?;
|
||||
}
|
||||
Some(RemoteControlSubcommand::Stop) => {
|
||||
let output = codex_app_server_daemon::run(AppServerLifecycleCommand::Stop).await?;
|
||||
print_remote_control_stop_output(&output, command.json)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_foreground_remote_control(
|
||||
json: bool,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
root_config_overrides: CliConfigOverrides,
|
||||
) -> anyhow::Result<()> {
|
||||
let socket_dir = tempfile::Builder::new()
|
||||
.prefix("codex-rc-")
|
||||
.tempdir_in("/tmp")
|
||||
.or_else(|_| tempfile::tempdir())
|
||||
.context("failed to create private app-server socket directory")?;
|
||||
let socket_path = socket_dir.path().join("rc.sock");
|
||||
let socket_path = AbsolutePathBuf::from_absolute_path(&socket_path)
|
||||
.context("private app-server socket path was not absolute")?;
|
||||
let transport = AppServerTransport::UnixSocket {
|
||||
socket_path: socket_path.clone(),
|
||||
};
|
||||
let runtime_options = AppServerRuntimeOptions {
|
||||
remote_control_enabled: true,
|
||||
install_shutdown_signal_handler: false,
|
||||
..Default::default()
|
||||
};
|
||||
let (stop_rx, stop_signal_task) = foreground_stop_signal();
|
||||
let mut app_server_task = tokio::spawn(codex_app_server::run_main_with_transport_options(
|
||||
arg0_paths,
|
||||
root_config_overrides,
|
||||
LoaderOverrides::default(),
|
||||
/*strict_config*/ false,
|
||||
/*default_analytics_enabled*/ false,
|
||||
transport,
|
||||
SessionSource::VSCode,
|
||||
AppServerWebsocketAuthSettings::default(),
|
||||
runtime_options,
|
||||
));
|
||||
|
||||
let summary = match wait_for_foreground_remote_control_start(
|
||||
&mut app_server_task,
|
||||
wait_for_foreground_remote_control_ready(socket_path),
|
||||
stop_rx.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
ForegroundStartupResult::Ready(summary) => summary,
|
||||
ForegroundStartupResult::Stopped => {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Ok(());
|
||||
}
|
||||
ForegroundStartupResult::ReadyFailed(error) => {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Err(error);
|
||||
}
|
||||
ForegroundStartupResult::AppServerExited(error) => {
|
||||
stop_signal_task.abort();
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
|
||||
if *stop_rx.borrow() {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Err(error) = print_foreground_ready_output(&summary, json) {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let result = wait_for_foreground_app_server(app_server_task, stop_rx).await;
|
||||
stop_signal_task.abort();
|
||||
result
|
||||
}
|
||||
|
||||
fn foreground_stop_signal() -> (watch::Receiver<bool>, JoinHandle<()>) {
|
||||
let (stop_tx, stop_rx) = watch::channel(false);
|
||||
let task = tokio::spawn(async move {
|
||||
if let Err(err) = tokio::signal::ctrl_c().await {
|
||||
eprintln!("failed to listen for Ctrl-C: {err}");
|
||||
}
|
||||
let _ = stop_tx.send(true);
|
||||
});
|
||||
(stop_rx, task)
|
||||
}
|
||||
|
||||
enum ForegroundStartupResult {
|
||||
Ready(RemoteControlReadySummary),
|
||||
Stopped,
|
||||
ReadyFailed(anyhow::Error),
|
||||
AppServerExited(anyhow::Error),
|
||||
}
|
||||
|
||||
async fn wait_for_foreground_remote_control_start(
|
||||
app_server_task: &mut JoinHandle<std::io::Result<()>>,
|
||||
ready: impl std::future::Future<Output = anyhow::Result<RemoteControlReadySummary>>,
|
||||
mut stop_rx: watch::Receiver<bool>,
|
||||
) -> ForegroundStartupResult {
|
||||
tokio::pin!(ready);
|
||||
|
||||
tokio::select! {
|
||||
ready_result = &mut ready => match ready_result {
|
||||
Ok(summary) => ForegroundStartupResult::Ready(summary),
|
||||
Err(error) => ForegroundStartupResult::ReadyFailed(error),
|
||||
},
|
||||
app_server_result = app_server_task => {
|
||||
ForegroundStartupResult::AppServerExited(
|
||||
foreground_app_server_exited_before_ready(app_server_result)
|
||||
)
|
||||
}
|
||||
_ = wait_for_stop_signal(&mut stop_rx) => ForegroundStartupResult::Stopped,
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_foreground_app_server(
|
||||
mut app_server_task: JoinHandle<std::io::Result<()>>,
|
||||
mut stop_rx: watch::Receiver<bool>,
|
||||
) -> anyhow::Result<()> {
|
||||
tokio::select! {
|
||||
app_server_result = &mut app_server_task => {
|
||||
app_server_result
|
||||
.context("foreground app-server task failed to join")?
|
||||
.context("foreground app-server exited with an error")?;
|
||||
}
|
||||
_ = wait_for_stop_signal(&mut stop_rx) => {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_stop_signal(stop_rx: &mut watch::Receiver<bool>) {
|
||||
if *stop_rx.borrow() {
|
||||
return;
|
||||
}
|
||||
let _ = stop_rx.wait_for(|stopped| *stopped).await;
|
||||
}
|
||||
|
||||
fn foreground_app_server_exited_before_ready(
|
||||
result: Result<std::io::Result<()>, tokio::task::JoinError>,
|
||||
) -> anyhow::Error {
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
anyhow::anyhow!("foreground app-server exited before remote control became ready")
|
||||
}
|
||||
Ok(Err(error)) => anyhow::Error::new(error)
|
||||
.context("foreground app-server exited before remote control became ready"),
|
||||
Err(error) => anyhow::Error::new(error)
|
||||
.context("foreground app-server task failed before remote control became ready"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn abort_foreground_app_server(app_server_task: JoinHandle<std::io::Result<()>>) {
|
||||
app_server_task.abort();
|
||||
let _ = timeout(FOREGROUND_APP_SERVER_ABORT_TIMEOUT, app_server_task).await;
|
||||
}
|
||||
|
||||
async fn wait_for_foreground_remote_control_ready(
|
||||
socket_path: AbsolutePathBuf,
|
||||
) -> anyhow::Result<RemoteControlReadySummary> {
|
||||
let mut client = connect_foreground_client(socket_path).await?;
|
||||
enable_remote_control_and_wait(&mut client).await
|
||||
}
|
||||
|
||||
async fn connect_foreground_client(
|
||||
socket_path: AbsolutePathBuf,
|
||||
) -> anyhow::Result<RemoteAppServerClient> {
|
||||
let socket_path_display = socket_path.as_path().display().to_string();
|
||||
let connect_args = RemoteAppServerConnectArgs {
|
||||
endpoint: RemoteAppServerEndpoint::UnixSocket { socket_path },
|
||||
client_name: REMOTE_CONTROL_CLIENT_NAME.to_string(),
|
||||
client_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
||||
};
|
||||
let deadline = Instant::now() + FOREGROUND_SOCKET_CONNECT_TIMEOUT;
|
||||
loop {
|
||||
match RemoteAppServerClient::connect(connect_args.clone()).await {
|
||||
Ok(client) => return Ok(client),
|
||||
Err(_) if Instant::now() < deadline => {
|
||||
sleep(FOREGROUND_SOCKET_CONNECT_RETRY_DELAY).await;
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(error).with_context(|| {
|
||||
format!("app server did not become ready on {socket_path_display}")
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn enable_remote_control_and_wait(
|
||||
client: &mut RemoteAppServerClient,
|
||||
) -> anyhow::Result<RemoteControlReadySummary> {
|
||||
let enable_response: RemoteControlEnableResponse = timeout(
|
||||
REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT,
|
||||
client.request_typed(ClientRequest::RemoteControlEnable {
|
||||
request_id: RequestId::String("remote-control-enable".to_string()),
|
||||
params: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.context("timed out waiting for remoteControl/enable response")?
|
||||
.context("failed to enable remote control")?;
|
||||
wait_for_remote_control_ready(
|
||||
client,
|
||||
RemoteControlReadySummary::from(enable_response),
|
||||
REMOTE_CONTROL_READY_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn wait_for_remote_control_ready(
|
||||
client: &mut RemoteAppServerClient,
|
||||
mut summary: RemoteControlReadySummary,
|
||||
ready_timeout: Duration,
|
||||
) -> anyhow::Result<RemoteControlReadySummary> {
|
||||
if summary.status != RemoteControlConnectionStatus::Connecting {
|
||||
return Ok(summary);
|
||||
}
|
||||
|
||||
let deadline = Instant::now() + ready_timeout;
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
summary.timed_out = true;
|
||||
return Ok(summary);
|
||||
}
|
||||
|
||||
match timeout(deadline.duration_since(now), client.next_event()).await {
|
||||
Ok(Some(event)) => {
|
||||
if apply_remote_control_event(&mut summary, event)
|
||||
&& summary.status != RemoteControlConnectionStatus::Connecting
|
||||
{
|
||||
return Ok(summary);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
anyhow::bail!("app-server disconnected before remote control became ready");
|
||||
}
|
||||
Err(_) => {
|
||||
summary.timed_out = true;
|
||||
return Ok(summary);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_remote_control_event(
|
||||
summary: &mut RemoteControlReadySummary,
|
||||
event: AppServerEvent,
|
||||
) -> bool {
|
||||
match event {
|
||||
AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged(
|
||||
notification,
|
||||
)) => {
|
||||
*summary = RemoteControlReadySummary::from(notification);
|
||||
true
|
||||
}
|
||||
AppServerEvent::Lagged { skipped: _ }
|
||||
| AppServerEvent::ServerNotification(_)
|
||||
| AppServerEvent::ServerRequest(_)
|
||||
| AppServerEvent::Disconnected { message: _ } => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn print_remote_control_start_output(
|
||||
output: &AppServerRemoteControlReadyOutput,
|
||||
json: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if json {
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&RemoteControlStartJsonOutput::daemon(output))?
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let summary = RemoteControlReadySummary::from(&output.remote_control);
|
||||
for line in remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Daemon)? {
|
||||
println!("{line}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_foreground_ready_output(
|
||||
summary: &RemoteControlReadySummary,
|
||||
json: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if json {
|
||||
ensure_remote_control_startable(summary)?;
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&RemoteControlStartJsonOutput::foreground(summary))?
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for line in remote_control_start_human_lines(summary, RemoteControlHumanOutputMode::Foreground)?
|
||||
{
|
||||
println!("{line}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct RemoteControlReadySummary {
|
||||
status: RemoteControlConnectionStatus,
|
||||
server_name: String,
|
||||
environment_id: Option<String>,
|
||||
timed_out: bool,
|
||||
}
|
||||
|
||||
impl From<RemoteControlEnableResponse> for RemoteControlReadySummary {
|
||||
fn from(response: RemoteControlEnableResponse) -> Self {
|
||||
let RemoteControlEnableResponse {
|
||||
status,
|
||||
server_name,
|
||||
installation_id: _,
|
||||
environment_id,
|
||||
} = response;
|
||||
Self {
|
||||
status,
|
||||
server_name,
|
||||
environment_id,
|
||||
timed_out: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemoteControlStatusChangedNotification> for RemoteControlReadySummary {
|
||||
fn from(notification: RemoteControlStatusChangedNotification) -> Self {
|
||||
let RemoteControlStatusChangedNotification {
|
||||
status,
|
||||
server_name,
|
||||
installation_id: _,
|
||||
environment_id,
|
||||
} = notification;
|
||||
Self {
|
||||
status,
|
||||
server_name,
|
||||
environment_id,
|
||||
timed_out: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&AppServerRemoteControlReadyStatus> for RemoteControlReadySummary {
|
||||
fn from(status: &AppServerRemoteControlReadyStatus) -> Self {
|
||||
Self {
|
||||
status: status.status,
|
||||
server_name: status.server_name.clone(),
|
||||
environment_id: status.environment_id.clone(),
|
||||
timed_out: status.timed_out,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct RemoteControlStartJsonOutput<'a> {
|
||||
mode: RemoteControlModeJson,
|
||||
status: RemoteControlConnectionStatus,
|
||||
server_name: &'a str,
|
||||
environment_id: Option<&'a str>,
|
||||
timed_out: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
daemon: Option<&'a AppServerRemoteControlStartOutput>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
enum RemoteControlModeJson {
|
||||
Foreground,
|
||||
Daemon,
|
||||
}
|
||||
|
||||
impl<'a> RemoteControlStartJsonOutput<'a> {
|
||||
fn foreground(summary: &'a RemoteControlReadySummary) -> Self {
|
||||
Self {
|
||||
mode: RemoteControlModeJson::Foreground,
|
||||
status: summary.status,
|
||||
server_name: &summary.server_name,
|
||||
environment_id: summary.environment_id.as_deref(),
|
||||
timed_out: summary.timed_out,
|
||||
daemon: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn daemon(output: &'a AppServerRemoteControlReadyOutput) -> Self {
|
||||
let remote_control = &output.remote_control;
|
||||
Self {
|
||||
mode: RemoteControlModeJson::Daemon,
|
||||
status: remote_control.status,
|
||||
server_name: &remote_control.server_name,
|
||||
environment_id: remote_control.environment_id.as_deref(),
|
||||
timed_out: remote_control.timed_out,
|
||||
daemon: Some(&output.daemon),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_control_start_human_message(
|
||||
output: &RemoteControlReadySummary,
|
||||
) -> anyhow::Result<String> {
|
||||
ensure_remote_control_startable(output)?;
|
||||
match output.status {
|
||||
RemoteControlConnectionStatus::Connected => Ok(format!(
|
||||
"This machine is available for remote control as {}.",
|
||||
output.server_name
|
||||
)),
|
||||
RemoteControlConnectionStatus::Connecting => Ok(format!(
|
||||
"Remote control is enabled on {} and still connecting.",
|
||||
output.server_name
|
||||
)),
|
||||
RemoteControlConnectionStatus::Errored | RemoteControlConnectionStatus::Disabled => {
|
||||
unreachable!("errored and disabled statuses are rejected before formatting")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_remote_control_startable(output: &RemoteControlReadySummary) -> anyhow::Result<()> {
|
||||
match output.status {
|
||||
RemoteControlConnectionStatus::Connected | RemoteControlConnectionStatus::Connecting => {
|
||||
Ok(())
|
||||
}
|
||||
RemoteControlConnectionStatus::Errored => {
|
||||
anyhow::bail!(
|
||||
"Remote control is enabled on {} but the connection is errored.",
|
||||
output.server_name
|
||||
);
|
||||
}
|
||||
RemoteControlConnectionStatus::Disabled => {
|
||||
anyhow::bail!("Remote control is disabled on {}.", output.server_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum RemoteControlHumanOutputMode {
|
||||
Foreground,
|
||||
Daemon,
|
||||
}
|
||||
|
||||
fn remote_control_start_human_lines(
|
||||
summary: &RemoteControlReadySummary,
|
||||
mode: RemoteControlHumanOutputMode,
|
||||
) -> anyhow::Result<Vec<String>> {
|
||||
let mut lines = vec![remote_control_start_human_message(summary)?];
|
||||
match mode {
|
||||
RemoteControlHumanOutputMode::Foreground => {
|
||||
lines.push("Press Ctrl-C to stop.".to_string());
|
||||
}
|
||||
RemoteControlHumanOutputMode::Daemon => {}
|
||||
}
|
||||
Ok(lines)
|
||||
}
|
||||
|
||||
fn print_remote_control_stop_output(
|
||||
output: &AppServerLifecycleOutput,
|
||||
json: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(output)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!("{}", remote_control_stop_human_message(output));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remote_control_stop_human_message(output: &AppServerLifecycleOutput) -> String {
|
||||
match output.status {
|
||||
AppServerLifecycleStatus::Stopped => "Remote control stopped.".to_string(),
|
||||
AppServerLifecycleStatus::NotRunning => "Remote control is not running.".to_string(),
|
||||
AppServerLifecycleStatus::Started
|
||||
| AppServerLifecycleStatus::Restarted
|
||||
| AppServerLifecycleStatus::AlreadyRunning
|
||||
| AppServerLifecycleStatus::Running => {
|
||||
format!(
|
||||
"Remote control stop completed with status {:?}.",
|
||||
output.status
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn remote_control_status(status: RemoteControlConnectionStatus) -> RemoteControlReadySummary {
|
||||
RemoteControlReadySummary {
|
||||
status,
|
||||
server_name: "owen-mbp".to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: status == RemoteControlConnectionStatus::Connecting,
|
||||
}
|
||||
}
|
||||
|
||||
fn enable_response(status: RemoteControlConnectionStatus) -> RemoteControlEnableResponse {
|
||||
RemoteControlEnableResponse {
|
||||
status,
|
||||
server_name: "owen-mbp".to_string(),
|
||||
installation_id: "install_test".to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn status_notification(
|
||||
status: RemoteControlConnectionStatus,
|
||||
) -> RemoteControlStatusChangedNotification {
|
||||
RemoteControlStatusChangedNotification {
|
||||
status,
|
||||
server_name: "owen-mbp".to_string(),
|
||||
installation_id: "install_test".to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn daemon_ready_output(
|
||||
status: RemoteControlConnectionStatus,
|
||||
) -> AppServerRemoteControlReadyOutput {
|
||||
AppServerRemoteControlReadyOutput {
|
||||
daemon: AppServerRemoteControlStartOutput::Start(AppServerLifecycleOutput {
|
||||
status: AppServerLifecycleStatus::Started,
|
||||
backend: None,
|
||||
pid: Some(42),
|
||||
socket_path: PathBuf::from("/tmp/app-server-control.sock"),
|
||||
cli_version: Some("1.0.0".to_string()),
|
||||
app_server_version: Some("2.0.0".to_string()),
|
||||
}),
|
||||
remote_control: AppServerRemoteControlReadyStatus {
|
||||
status,
|
||||
server_name: "owen-mbp".to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: status == RemoteControlConnectionStatus::Connecting,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_human_start_messages_use_server_name() {
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected
|
||||
))
|
||||
.expect("connected message"),
|
||||
"This machine is available for remote control as owen-mbp."
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Connecting
|
||||
))
|
||||
.expect("connecting message"),
|
||||
"Remote control is enabled on owen-mbp and still connecting."
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Errored
|
||||
))
|
||||
.expect_err("errored status should fail")
|
||||
.to_string(),
|
||||
"Remote control is enabled on owen-mbp but the connection is errored."
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Disabled
|
||||
))
|
||||
.expect_err("disabled status should fail")
|
||||
.to_string(),
|
||||
"Remote control is disabled on owen-mbp."
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_human_lines_include_foreground_stop_hint_only() {
|
||||
let summary = remote_control_status(RemoteControlConnectionStatus::Connected);
|
||||
|
||||
assert_eq!(
|
||||
remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Foreground)
|
||||
.expect("foreground lines"),
|
||||
vec![
|
||||
"This machine is available for remote control as owen-mbp.".to_string(),
|
||||
"Press Ctrl-C to stop.".to_string(),
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Daemon)
|
||||
.expect("daemon lines"),
|
||||
vec!["This machine is available for remote control as owen-mbp.".to_string()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_json_output_marks_foreground_or_daemon() {
|
||||
let foreground_summary = remote_control_status(RemoteControlConnectionStatus::Connected);
|
||||
assert_eq!(
|
||||
serde_json::to_value(RemoteControlStartJsonOutput::foreground(
|
||||
&foreground_summary
|
||||
))
|
||||
.expect("foreground JSON"),
|
||||
json!({
|
||||
"mode": "foreground",
|
||||
"status": "connected",
|
||||
"serverName": "owen-mbp",
|
||||
"environmentId": "env_test",
|
||||
"timedOut": false,
|
||||
})
|
||||
);
|
||||
|
||||
let daemon_output = daemon_ready_output(RemoteControlConnectionStatus::Connected);
|
||||
assert_eq!(
|
||||
serde_json::to_value(RemoteControlStartJsonOutput::daemon(&daemon_output))
|
||||
.expect("daemon JSON"),
|
||||
json!({
|
||||
"mode": "daemon",
|
||||
"status": "connected",
|
||||
"serverName": "owen-mbp",
|
||||
"environmentId": "env_test",
|
||||
"timedOut": false,
|
||||
"daemon": {
|
||||
"status": "started",
|
||||
"pid": 42,
|
||||
"socketPath": "/tmp/app-server-control.sock",
|
||||
"cliVersion": "1.0.0",
|
||||
"appServerVersion": "2.0.0",
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_summary_uses_enable_response_as_authoritative() {
|
||||
assert_eq!(
|
||||
RemoteControlReadySummary::from(enable_response(
|
||||
RemoteControlConnectionStatus::Connected
|
||||
)),
|
||||
remote_control_status(RemoteControlConnectionStatus::Connected)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_status_notification_updates_connecting_summary() {
|
||||
let mut summary = remote_control_status(RemoteControlConnectionStatus::Connecting);
|
||||
|
||||
let changed = apply_remote_control_event(
|
||||
&mut summary,
|
||||
AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged(
|
||||
status_notification(RemoteControlConnectionStatus::Connected),
|
||||
)),
|
||||
);
|
||||
|
||||
assert!(changed);
|
||||
assert_eq!(
|
||||
summary,
|
||||
remote_control_status(RemoteControlConnectionStatus::Connected)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn foreground_wait_aborts_app_server_on_stop_signal() {
|
||||
let app_server_task = tokio::spawn(std::future::pending::<std::io::Result<()>>());
|
||||
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
|
||||
stop_tx.send(true).expect("send stop signal");
|
||||
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
wait_for_foreground_app_server(app_server_task, stop_rx),
|
||||
)
|
||||
.await
|
||||
.expect("foreground wait should return after stop signal")
|
||||
.expect("stop signal should shut down cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn foreground_start_wait_stops_before_ready() {
|
||||
let mut app_server_task = tokio::spawn(std::future::pending::<std::io::Result<()>>());
|
||||
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
|
||||
stop_tx.send(true).expect("send stop signal");
|
||||
|
||||
let startup = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
wait_for_foreground_remote_control_start(
|
||||
&mut app_server_task,
|
||||
std::future::pending::<anyhow::Result<RemoteControlReadySummary>>(),
|
||||
stop_rx,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("foreground startup wait should return after stop signal");
|
||||
|
||||
assert!(matches!(startup, ForegroundStartupResult::Stopped));
|
||||
app_server_task.abort();
|
||||
let _ = app_server_task.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn foreground_start_wait_reports_app_server_exit_before_ready() {
|
||||
let mut app_server_task =
|
||||
tokio::spawn(async { Err(std::io::Error::other("startup failed before socket bind")) });
|
||||
let (_stop_tx, stop_rx) = tokio::sync::watch::channel(false);
|
||||
|
||||
let startup = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
wait_for_foreground_remote_control_start(
|
||||
&mut app_server_task,
|
||||
std::future::pending::<anyhow::Result<RemoteControlReadySummary>>(),
|
||||
stop_rx,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("foreground startup wait should return after app-server exits");
|
||||
|
||||
let ForegroundStartupResult::AppServerExited(error) = startup else {
|
||||
panic!("expected app-server exit before ready");
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"foreground app-server exited before remote control became ready"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user