diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index de6282c54f..eb2439401b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2223,7 +2223,6 @@ dependencies = [ "clap_complete", "codex-api", "codex-app-server", - "codex-app-server-client", "codex-app-server-daemon", "codex-app-server-protocol", "codex-app-server-test-client", diff --git a/codex-rs/app-server-daemon/src/lib.rs b/codex-rs/app-server-daemon/src/lib.rs index 70d9445ae7..62864fd5b7 100644 --- a/codex-rs/app-server-daemon/src/lib.rs +++ b/codex-rs/app-server-daemon/src/lib.rs @@ -205,6 +205,20 @@ pub async fn ensure_remote_control_ready() -> Result { .await } +pub async fn enable_remote_control_on_socket( + socket_path: &Path, + connect_timeout: Duration, + connect_retry_delay: Duration, +) -> Result { + ensure_supported_platform()?; + remote_control_client::enable_remote_control_with_connect_retry( + socket_path, + connect_timeout, + connect_retry_delay, + ) + .await +} + pub async fn set_remote_control(mode: RemoteControlMode) -> Result { ensure_supported_platform()?; Daemon::from_environment()?.set_remote_control(mode).await diff --git a/codex-rs/app-server-daemon/src/remote_control_client.rs b/codex-rs/app-server-daemon/src/remote_control_client.rs index ecb3c1c260..cc32e50dde 100644 --- a/codex-rs/app-server-daemon/src/remote_control_client.rs +++ b/codex-rs/app-server-daemon/src/remote_control_client.rs @@ -13,6 +13,8 @@ use codex_app_server_protocol::RemoteControlStatusChangedNotification; use codex_app_server_protocol::RequestId; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::time::Instant; +use tokio::time::sleep; use tokio::time::timeout; use tokio_tungstenite::WebSocketStream; @@ -23,21 +25,33 @@ const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10); const REMOTE_CONTROL_ENABLE_REQUEST_ID: RequestId = RequestId::Integer(2); pub(crate) async fn enable_remote_control(socket_path: &Path) -> Result { - enable_remote_control_with_timeout(socket_path, REMOTE_CONTROL_READY_TIMEOUT).await + let mut websocket = client::connect(socket_path).await?; + enable_remote_control_with_timeout(&mut websocket, REMOTE_CONTROL_READY_TIMEOUT).await } -async fn enable_remote_control_with_timeout( +pub(crate) async fn enable_remote_control_with_connect_retry( socket_path: &Path, - ready_timeout: Duration, + connect_timeout: Duration, + connect_retry_delay: Duration, ) -> Result { - let mut websocket = client::connect(socket_path).await?; + let mut websocket = + connect_with_retry(socket_path, connect_timeout, connect_retry_delay).await?; + enable_remote_control_with_timeout(&mut websocket, REMOTE_CONTROL_READY_TIMEOUT).await +} - client::initialize(&mut websocket, /*experimental_api*/ true).await?; +async fn enable_remote_control_with_timeout( + websocket: &mut WebSocketStream, + ready_timeout: Duration, +) -> Result +where + S: AsyncRead + AsyncWrite + Unpin, +{ + client::initialize(websocket, /*experimental_api*/ true).await?; let initialized = JSONRPCMessage::Notification(JSONRPCNotification { method: "initialized".to_string(), params: None, }); - client::send_message(&mut websocket, &initialized) + client::send_message(websocket, &initialized) .await .context("failed to send initialized notification")?; @@ -47,18 +61,42 @@ async fn enable_remote_control_with_timeout( params: None, trace: None, }); - client::send_message(&mut websocket, &enable) + client::send_message(websocket, &enable) .await .context("failed to send remoteControl/enable request")?; - let mut latest = read_enable_response(&mut websocket).await?; + let mut latest = read_enable_response(websocket).await?; if latest.status == RemoteControlConnectionStatus::Connecting { - latest = wait_for_remote_control_status(&mut websocket, latest, ready_timeout).await?; + latest = wait_for_remote_control_status(websocket, latest, ready_timeout).await?; } websocket.close(None).await.ok(); Ok(latest) } +async fn connect_with_retry( + socket_path: &Path, + connect_timeout: Duration, + connect_retry_delay: Duration, +) -> Result> { + let deadline = Instant::now() + connect_timeout; + loop { + match client::connect(socket_path).await { + Ok(websocket) => return Ok(websocket), + Err(_) if Instant::now() < deadline => { + sleep(connect_retry_delay).await; + } + Err(error) => { + return Err(error).with_context(|| { + format!( + "app server did not become ready on {}", + socket_path.display() + ) + }); + } + } + } +} + async fn read_enable_response( websocket: &mut WebSocketStream, ) -> Result @@ -314,7 +352,8 @@ mod tests { let ready_timeout = scenario.ready_timeout; let server_task = tokio::spawn(serve_enable_remote_control_scenario(listener, scenario)); - let status = enable_remote_control_with_timeout(&socket_path, ready_timeout).await?; + let mut websocket = client::connect(&socket_path).await?; + let status = enable_remote_control_with_timeout(&mut websocket, ready_timeout).await?; server_task.await??; Ok(status) } diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index b916c20a28..627d5d16d1 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -22,7 +22,6 @@ anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } clap_complete = { workspace = true } codex-app-server = { workspace = true } -codex-app-server-client = { workspace = true } codex-app-server-daemon = { workspace = true } codex-app-server-protocol = { workspace = true } codex-app-server-test-client = { workspace = true } diff --git a/codex-rs/cli/src/remote_control_cmd.rs b/codex-rs/cli/src/remote_control_cmd.rs index a2ff981c13..1ff8bd7595 100644 --- a/codex-rs/cli/src/remote_control_cmd.rs +++ b/codex-rs/cli/src/remote_control_cmd.rs @@ -6,22 +6,13 @@ use clap::Args; use codex_app_server::AppServerRuntimeOptions; use codex_app_server::AppServerTransport; use codex_app_server::AppServerWebsocketAuthSettings; -use codex_app_server_client::AppServerEvent; -use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY; -use codex_app_server_client::RemoteAppServerClient; -use codex_app_server_client::RemoteAppServerConnectArgs; -use codex_app_server_client::RemoteAppServerEndpoint; use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand; use codex_app_server_daemon::LifecycleOutput as AppServerLifecycleOutput; use codex_app_server_daemon::LifecycleStatus as AppServerLifecycleStatus; use codex_app_server_daemon::RemoteControlReadyOutput as AppServerRemoteControlReadyOutput; use codex_app_server_daemon::RemoteControlReadyStatus as AppServerRemoteControlReadyStatus; use codex_app_server_daemon::RemoteControlStartOutput as AppServerRemoteControlStartOutput; -use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::RemoteControlConnectionStatus; -use codex_app_server_protocol::RemoteControlEnableResponse; -use codex_app_server_protocol::RequestId; -use codex_app_server_protocol::ServerNotification; use codex_arg0::Arg0DispatchPaths; use codex_config::LoaderOverrides; use codex_protocol::protocol::SessionSource; @@ -30,16 +21,11 @@ use codex_utils_cli::CliConfigOverrides; use serde::Serialize; use tokio::sync::watch; use tokio::task::JoinHandle; -use tokio::time::Instant; -use tokio::time::sleep; use tokio::time::timeout; const FOREGROUND_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const FOREGROUND_SOCKET_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(50); const FOREGROUND_APP_SERVER_ABORT_TIMEOUT: Duration = Duration::from_secs(1); -const REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); -const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10); -const REMOTE_CONTROL_CLIENT_NAME: &str = "codex-remote-control"; #[derive(Debug, Args)] pub(crate) struct RemoteControlCommand { @@ -273,111 +259,12 @@ async fn abort_foreground_app_server(app_server_task: JoinHandle anyhow::Result { - let mut client = connect_foreground_client(socket_path).await?; - enable_remote_control_and_wait(&mut client).await -} - -async fn connect_foreground_client( - socket_path: AbsolutePathBuf, -) -> anyhow::Result { - let socket_path_display = socket_path.as_path().display().to_string(); - let connect_args = RemoteAppServerConnectArgs { - endpoint: RemoteAppServerEndpoint::UnixSocket { socket_path }, - client_name: REMOTE_CONTROL_CLIENT_NAME.to_string(), - client_version: env!("CARGO_PKG_VERSION").to_string(), - experimental_api: true, - opt_out_notification_methods: Vec::new(), - channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, - }; - let deadline = Instant::now() + FOREGROUND_SOCKET_CONNECT_TIMEOUT; - loop { - match RemoteAppServerClient::connect(connect_args.clone()).await { - Ok(client) => return Ok(client), - Err(_) if Instant::now() < deadline => { - sleep(FOREGROUND_SOCKET_CONNECT_RETRY_DELAY).await; - } - Err(error) => { - return Err(error).with_context(|| { - format!("app server did not become ready on {socket_path_display}") - }); - } - } - } -} - -async fn enable_remote_control_and_wait( - client: &mut RemoteAppServerClient, -) -> anyhow::Result { - let enable_response: RemoteControlEnableResponse = timeout( - REMOTE_CONTROL_ENABLE_RESPONSE_TIMEOUT, - client.request_typed(ClientRequest::RemoteControlEnable { - request_id: RequestId::String("remote-control-enable".to_string()), - params: None, - }), + codex_app_server_daemon::enable_remote_control_on_socket( + socket_path.as_path(), + FOREGROUND_SOCKET_CONNECT_TIMEOUT, + FOREGROUND_SOCKET_CONNECT_RETRY_DELAY, ) .await - .context("timed out waiting for remoteControl/enable response")? - .context("failed to enable remote control")?; - wait_for_remote_control_ready( - client, - AppServerRemoteControlReadyStatus::from(enable_response), - REMOTE_CONTROL_READY_TIMEOUT, - ) - .await -} - -async fn wait_for_remote_control_ready( - client: &mut RemoteAppServerClient, - mut summary: AppServerRemoteControlReadyStatus, - ready_timeout: Duration, -) -> anyhow::Result { - if summary.status != RemoteControlConnectionStatus::Connecting { - return Ok(summary); - } - - let deadline = Instant::now() + ready_timeout; - loop { - let now = Instant::now(); - if now >= deadline { - summary.timed_out = true; - return Ok(summary); - } - - match timeout(deadline.duration_since(now), client.next_event()).await { - Ok(Some(event)) => { - if apply_remote_control_event(&mut summary, event) - && summary.status != RemoteControlConnectionStatus::Connecting - { - return Ok(summary); - } - } - Ok(None) => { - anyhow::bail!("app-server disconnected before remote control became ready"); - } - Err(_) => { - summary.timed_out = true; - return Ok(summary); - } - } - } -} - -fn apply_remote_control_event( - summary: &mut AppServerRemoteControlReadyStatus, - event: AppServerEvent, -) -> bool { - match event { - AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged( - notification, - )) => { - *summary = AppServerRemoteControlReadyStatus::from(notification); - true - } - AppServerEvent::Lagged { skipped: _ } - | AppServerEvent::ServerNotification(_) - | AppServerEvent::ServerRequest(_) - | AppServerEvent::Disconnected { message: _ } => false, - } } fn print_remote_control_start_output( @@ -582,7 +469,6 @@ fn remote_control_stop_human_message(output: &AppServerLifecycleOutput) -> Strin #[cfg(test)] mod tests { - use codex_app_server_protocol::RemoteControlStatusChangedNotification; use pretty_assertions::assert_eq; use serde_json::json; use std::path::PathBuf; @@ -600,17 +486,6 @@ mod tests { } } - fn status_notification( - status: RemoteControlConnectionStatus, - ) -> RemoteControlStatusChangedNotification { - RemoteControlStatusChangedNotification { - status, - server_name: "owen-mbp".to_string(), - installation_id: "install_test".to_string(), - environment_id: Some("env_test".to_string()), - } - } - fn daemon_ready_output( status: RemoteControlConnectionStatus, ) -> AppServerRemoteControlReadyOutput { @@ -754,24 +629,6 @@ mod tests { ); } - #[test] - fn remote_control_status_notification_updates_connecting_summary() { - let mut summary = remote_control_status(RemoteControlConnectionStatus::Connecting); - - let changed = apply_remote_control_event( - &mut summary, - AppServerEvent::ServerNotification(ServerNotification::RemoteControlStatusChanged( - status_notification(RemoteControlConnectionStatus::Connected), - )), - ); - - assert!(changed); - assert_eq!( - summary, - remote_control_status(RemoteControlConnectionStatus::Connected) - ); - } - #[tokio::test] async fn foreground_wait_aborts_app_server_on_stop_signal() { let app_server_task = tokio::spawn(std::future::pending::>());