This commit is contained in:
Owen Lin
2026-05-15 17:16:34 -07:00
parent 2c4563ba28
commit 1d99152db9
5 changed files with 67 additions and 159 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2223,7 +2223,6 @@ dependencies = [
"clap_complete",
"codex-api",
"codex-app-server",
"codex-app-server-client",
"codex-app-server-daemon",
"codex-app-server-protocol",
"codex-app-server-test-client",

View File

@@ -205,6 +205,20 @@ pub async fn ensure_remote_control_ready() -> Result<RemoteControlReadyOutput> {
.await
}
pub async fn enable_remote_control_on_socket(
socket_path: &Path,
connect_timeout: Duration,
connect_retry_delay: Duration,
) -> Result<RemoteControlReadyStatus> {
ensure_supported_platform()?;
remote_control_client::enable_remote_control_with_connect_retry(
socket_path,
connect_timeout,
connect_retry_delay,
)
.await
}
pub async fn set_remote_control(mode: RemoteControlMode) -> Result<RemoteControlOutput> {
ensure_supported_platform()?;
Daemon::from_environment()?.set_remote_control(mode).await

View File

@@ -13,6 +13,8 @@ use codex_app_server_protocol::RemoteControlStatusChangedNotification;
use codex_app_server_protocol::RequestId;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
@@ -23,21 +25,33 @@ const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10);
const REMOTE_CONTROL_ENABLE_REQUEST_ID: RequestId = RequestId::Integer(2);
pub(crate) async fn enable_remote_control(socket_path: &Path) -> Result<RemoteControlReadyStatus> {
enable_remote_control_with_timeout(socket_path, REMOTE_CONTROL_READY_TIMEOUT).await
let mut websocket = client::connect(socket_path).await?;
enable_remote_control_with_timeout(&mut websocket, REMOTE_CONTROL_READY_TIMEOUT).await
}
async fn enable_remote_control_with_timeout(
pub(crate) async fn enable_remote_control_with_connect_retry(
socket_path: &Path,
ready_timeout: Duration,
connect_timeout: Duration,
connect_retry_delay: Duration,
) -> Result<RemoteControlReadyStatus> {
let mut websocket = client::connect(socket_path).await?;
let mut websocket =
connect_with_retry(socket_path, connect_timeout, connect_retry_delay).await?;
enable_remote_control_with_timeout(&mut websocket, REMOTE_CONTROL_READY_TIMEOUT).await
}
client::initialize(&mut websocket, /*experimental_api*/ true).await?;
async fn enable_remote_control_with_timeout<S>(
websocket: &mut WebSocketStream<S>,
ready_timeout: Duration,
) -> Result<RemoteControlReadyStatus>
where
S: AsyncRead + AsyncWrite + Unpin,
{
client::initialize(websocket, /*experimental_api*/ true).await?;
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
method: "initialized".to_string(),
params: None,
});
client::send_message(&mut websocket, &initialized)
client::send_message(websocket, &initialized)
.await
.context("failed to send initialized notification")?;
@@ -47,18 +61,42 @@ async fn enable_remote_control_with_timeout(
params: None,
trace: None,
});
client::send_message(&mut websocket, &enable)
client::send_message(websocket, &enable)
.await
.context("failed to send remoteControl/enable request")?;
let mut latest = read_enable_response(&mut websocket).await?;
let mut latest = read_enable_response(websocket).await?;
if latest.status == RemoteControlConnectionStatus::Connecting {
latest = wait_for_remote_control_status(&mut websocket, latest, ready_timeout).await?;
latest = wait_for_remote_control_status(websocket, latest, ready_timeout).await?;
}
websocket.close(None).await.ok();
Ok(latest)
}
async fn connect_with_retry(
socket_path: &Path,
connect_timeout: Duration,
connect_retry_delay: Duration,
) -> Result<WebSocketStream<codex_uds::UnixStream>> {
let deadline = Instant::now() + connect_timeout;
loop {
match client::connect(socket_path).await {
Ok(websocket) => return Ok(websocket),
Err(_) if Instant::now() < deadline => {
sleep(connect_retry_delay).await;
}
Err(error) => {
return Err(error).with_context(|| {
format!(
"app server did not become ready on {}",
socket_path.display()
)
});
}
}
}
}
async fn read_enable_response<S>(
websocket: &mut WebSocketStream<S>,
) -> Result<RemoteControlReadyStatus>
@@ -314,7 +352,8 @@ mod tests {
let ready_timeout = scenario.ready_timeout;
let server_task = tokio::spawn(serve_enable_remote_control_scenario(listener, scenario));
let status = enable_remote_control_with_timeout(&socket_path, ready_timeout).await?;
let mut websocket = client::connect(&socket_path).await?;
let status = enable_remote_control_with_timeout(&mut websocket, ready_timeout).await?;
server_task.await??;
Ok(status)
}

View File

@@ -22,7 +22,6 @@ anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
clap_complete = { workspace = true }
codex-app-server = { workspace = true }
codex-app-server-client = { workspace = true }
codex-app-server-daemon = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-app-server-test-client = { workspace = true }

View File

@@ -6,22 +6,13 @@ use clap::Args;
use codex_app_server::AppServerRuntimeOptions;
use codex_app_server::AppServerTransport;
use codex_app_server::AppServerWebsocketAuthSettings;
use codex_app_server_client::AppServerEvent;
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
use codex_app_server_client::RemoteAppServerClient;
use codex_app_server_client::RemoteAppServerConnectArgs;
use codex_app_server_client::RemoteAppServerEndpoint;
use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand;
use codex_app_server_daemon::LifecycleOutput as AppServerLifecycleOutput;
use codex_app_server_daemon::LifecycleStatus as AppServerLifecycleStatus;
use codex_app_server_daemon::RemoteControlReadyOutput as AppServerRemoteControlReadyOutput;
use codex_app_server_daemon::RemoteControlReadyStatus as AppServerRemoteControlReadyStatus;
use codex_app_server_daemon::RemoteControlStartOutput as AppServerRemoteControlStartOutput;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::RemoteControlConnectionStatus;
use codex_app_server_protocol::RemoteControlEnableResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_arg0::Arg0DispatchPaths;
use codex_config::LoaderOverrides;
use codex_protocol::protocol::SessionSource;
@@ -30,16 +21,11 @@ use codex_utils_cli::CliConfigOverrides;
use serde::Serialize;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
const FOREGROUND_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const FOREGROUND_SOCKET_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(50);
const FOREGROUND_APP_SERVER_ABORT_TIMEOUT: Duration = Duration::from_secs(1);
const REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10);
const REMOTE_CONTROL_CLIENT_NAME: &str = "codex-remote-control";
#[derive(Debug, Args)]
pub(crate) struct RemoteControlCommand {
@@ -273,111 +259,12 @@ async fn abort_foreground_app_server(app_server_task: JoinHandle<std::io::Result
async fn wait_for_foreground_remote_control_ready(
socket_path: AbsolutePathBuf,
) -> anyhow::Result<AppServerRemoteControlReadyStatus> {
let mut client = connect_foreground_client(socket_path).await?;
enable_remote_control_and_wait(&mut client).await
}
async fn connect_foreground_client(
socket_path: AbsolutePathBuf,
) -> anyhow::Result<RemoteAppServerClient> {
let socket_path_display = socket_path.as_path().display().to_string();
let connect_args = RemoteAppServerConnectArgs {
endpoint: RemoteAppServerEndpoint::UnixSocket { socket_path },
client_name: REMOTE_CONTROL_CLIENT_NAME.to_string(),
client_version: env!("CARGO_PKG_VERSION").to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
};
let deadline = Instant::now() + FOREGROUND_SOCKET_CONNECT_TIMEOUT;
loop {
match RemoteAppServerClient::connect(connect_args.clone()).await {
Ok(client) => return Ok(client),
Err(_) if Instant::now() < deadline => {
sleep(FOREGROUND_SOCKET_CONNECT_RETRY_DELAY).await;
}
Err(error) => {
return Err(error).with_context(|| {
format!("app server did not become ready on {socket_path_display}")
});
}
}
}
}
async fn enable_remote_control_and_wait(
client: &mut RemoteAppServerClient,
) -> anyhow::Result<AppServerRemoteControlReadyStatus> {
let enable_response: RemoteControlEnableResponse = timeout(
REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT,
client.request_typed(ClientRequest::RemoteControlEnable {
request_id: RequestId::String("remote-control-enable".to_string()),
params: None,
}),
codex_app_server_daemon::enable_remote_control_on_socket(
socket_path.as_path(),
FOREGROUND_SOCKET_CONNECT_TIMEOUT,
FOREGROUND_SOCKET_CONNECT_RETRY_DELAY,
)
.await
.context("timed out waiting for remoteControl/enable response")?
.context("failed to enable remote control")?;
wait_for_remote_control_ready(
client,
AppServerRemoteControlReadyStatus::from(enable_response),
REMOTE_CONTROL_READY_TIMEOUT,
)
.await
}
async fn wait_for_remote_control_ready(
client: &mut RemoteAppServerClient,
mut summary: AppServerRemoteControlReadyStatus,
ready_timeout: Duration,
) -> anyhow::Result<AppServerRemoteControlReadyStatus> {
if summary.status != RemoteControlConnectionStatus::Connecting {
return Ok(summary);
}
let deadline = Instant::now() + ready_timeout;
loop {
let now = Instant::now();
if now >= deadline {
summary.timed_out = true;
return Ok(summary);
}
match timeout(deadline.duration_since(now), client.next_event()).await {
Ok(Some(event)) => {
if apply_remote_control_event(&mut summary, event)
&& summary.status != RemoteControlConnectionStatus::Connecting
{
return Ok(summary);
}
}
Ok(None) => {
anyhow::bail!("app-server disconnected before remote control became ready");
}
Err(_) => {
summary.timed_out = true;
return Ok(summary);
}
}
}
}
fn apply_remote_control_event(
summary: &mut AppServerRemoteControlReadyStatus,
event: AppServerEvent,
) -> bool {
match event {
AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged(
notification,
)) => {
*summary = AppServerRemoteControlReadyStatus::from(notification);
true
}
AppServerEvent::Lagged { skipped: _ }
| AppServerEvent::ServerNotification(_)
| AppServerEvent::ServerRequest(_)
| AppServerEvent::Disconnected { message: _ } => false,
}
}
fn print_remote_control_start_output(
@@ -582,7 +469,6 @@ fn remote_control_stop_human_message(output: &AppServerLifecycleOutput) -> Strin
#[cfg(test)]
mod tests {
use codex_app_server_protocol::RemoteControlStatusChangedNotification;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::PathBuf;
@@ -600,17 +486,6 @@ mod tests {
}
}
fn status_notification(
status: RemoteControlConnectionStatus,
) -> RemoteControlStatusChangedNotification {
RemoteControlStatusChangedNotification {
status,
server_name: "owen-mbp".to_string(),
installation_id: "install_test".to_string(),
environment_id: Some("env_test".to_string()),
}
}
fn daemon_ready_output(
status: RemoteControlConnectionStatus,
) -> AppServerRemoteControlReadyOutput {
@@ -754,24 +629,6 @@ mod tests {
);
}
#[test]
fn remote_control_status_notification_updates_connecting_summary() {
let mut summary = remote_control_status(RemoteControlConnectionStatus::Connecting);
let changed = apply_remote_control_event(
&mut summary,
AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged(
status_notification(RemoteControlConnectionStatus::Connected),
)),
);
assert!(changed);
assert_eq!(
summary,
remote_control_status(RemoteControlConnectionStatus::Connected)
);
}
#[tokio::test]
async fn foreground_wait_aborts_app_server_on_stop_signal() {
let app_server_task = tokio::spawn(std::future::pending::<std::io::Result<()>>());