/* This module implements the websocket-backed app-server client transport. It owns the remote connection lifecycle, including the initialize/initialized handshake, JSON-RPC request/response routing, server-request resolution, and notification streaming. The rest of the crate uses the same `AppServerEvent` surface for both in-process and remote transports, so callers such as the TUI can switch between them without changing their higher-level session logic. */ use std::collections::HashMap; use std::collections::VecDeque; use std::io::Error as IoError; use std::io::ErrorKind; use std::io::Result as IoResult; use std::time::Duration; use crate::AppServerEvent; use crate::RequestResult; use crate::SHUTDOWN_TIMEOUT; use crate::TypedRequestError; use crate::request_method_name; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::InitializeCapabilities; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result as JsonRpcResult; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use futures::SinkExt; use futures::StreamExt; use serde::de::DeserializeOwned; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::connect_async_with_config; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::http::HeaderValue; use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION; use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; use tracing::warn; use url::Url; const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10); const REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE: usize = 128 << 20; #[derive(Debug, Clone)] pub struct RemoteAppServerConnectArgs { pub websocket_url: String, pub auth_token: Option, pub client_name: String, pub client_version: String, pub experimental_api: bool, pub opt_out_notification_methods: Vec, pub channel_capacity: usize, } impl RemoteAppServerConnectArgs { fn initialize_params(&self) -> InitializeParams { let capabilities = InitializeCapabilities { experimental_api: self.experimental_api, request_attestation: false, opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() { None } else { Some(self.opt_out_notification_methods.clone()) }, }; InitializeParams { client_info: ClientInfo { name: self.client_name.clone(), title: None, version: self.client_version.clone(), }, capabilities: Some(capabilities), } } } pub(crate) fn websocket_url_supports_auth_token(url: &Url) -> bool { match (url.scheme(), url.host()) { ("wss", Some(_)) => true, ("ws", Some(url::Host::Domain(domain))) => domain.eq_ignore_ascii_case("localhost"), ("ws", Some(url::Host::Ipv4(addr))) => addr.is_loopback(), ("ws", Some(url::Host::Ipv6(addr))) => addr.is_loopback(), _ => false, } } enum RemoteClientCommand { Request { request: Box, response_tx: oneshot::Sender>, }, Notify { notification: ClientNotification, response_tx: oneshot::Sender>, }, ResolveServerRequest { request_id: RequestId, result: JsonRpcResult, response_tx: oneshot::Sender>, }, RejectServerRequest { request_id: RequestId, error: JSONRPCErrorError, response_tx: oneshot::Sender>, }, Shutdown { response_tx: oneshot::Sender>, }, } pub struct RemoteAppServerClient { command_tx: mpsc::Sender, event_rx: mpsc::UnboundedReceiver, pending_events: VecDeque, worker_handle: tokio::task::JoinHandle<()>, } #[derive(Clone)] pub struct RemoteAppServerRequestHandle { command_tx: mpsc::Sender, } impl RemoteAppServerClient { pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult { let channel_capacity = args.channel_capacity.max(1); let websocket_url = args.websocket_url.clone(); let url = Url::parse(&websocket_url).map_err(|err| { IoError::new( ErrorKind::InvalidInput, format!("invalid websocket URL `{websocket_url}`: {err}"), ) })?; if args.auth_token.is_some() && !websocket_url_supports_auth_token(&url) { return Err(IoError::new( ErrorKind::InvalidInput, format!( "remote auth tokens require `wss://` or loopback `ws://` URLs; got `{websocket_url}`" ), )); } let mut request = url.as_str().into_client_request().map_err(|err| { IoError::new( ErrorKind::InvalidInput, format!("invalid websocket URL `{websocket_url}`: {err}"), ) })?; if let Some(auth_token) = args.auth_token.as_deref() { let header_value = HeaderValue::from_str(&format!("Bearer {auth_token}")).map_err(|err| { IoError::new( ErrorKind::InvalidInput, format!("invalid remote authorization header value: {err}"), ) })?; request.headers_mut().insert(AUTHORIZATION, header_value); } ensure_rustls_crypto_provider(); // Remote resume responses can legitimately carry large thread histories. // Keep a bounded cap, but raise it above tungstenite's 16 MiB frame default. let websocket_config = WebSocketConfig::default() .max_frame_size(Some(REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE)) .max_message_size(Some(REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE)); let stream = timeout( CONNECT_TIMEOUT, connect_async_with_config( request, Some(websocket_config), /*disable_nagle*/ false, ), ) .await .map_err(|_| { IoError::new( ErrorKind::TimedOut, format!("timed out connecting to remote app server at `{websocket_url}`"), ) })? .map(|(stream, _response)| stream) .map_err(|err| { IoError::other(format!( "failed to connect to remote app server at `{websocket_url}`: {err}" )) })?; let mut stream = stream; let pending_events = initialize_remote_connection( &mut stream, &websocket_url, args.initialize_params(), INITIALIZE_TIMEOUT, ) .await?; let (command_tx, mut command_rx) = mpsc::channel::(channel_capacity); let (event_tx, event_rx) = mpsc::unbounded_channel::(); let worker_handle = tokio::spawn(async move { let mut pending_requests = HashMap::>>::new(); let mut worker_exit_error: Option<(ErrorKind, String)> = None; loop { tokio::select! { command = command_rx.recv() => { let Some(command) = command else { let _ = stream.close(None).await; break; }; match command { RemoteClientCommand::Request { request, response_tx } => { let request_id = request_id_from_client_request(&request); if pending_requests.contains_key(&request_id) { let _ = response_tx.send(Err(IoError::new( ErrorKind::InvalidInput, format!("duplicate remote app-server request id `{request_id}`"), ))); continue; } pending_requests.insert(request_id.clone(), response_tx); if let Err(err) = write_jsonrpc_message( &mut stream, JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)), &websocket_url, ) .await { let err_message = err.to_string(); let message = format!( "remote app server at `{websocket_url}` write failed: {err_message}" ); if let Some(response_tx) = pending_requests.remove(&request_id) { let _ = response_tx.send(Err(err)); } let _ = deliver_event( &event_tx, AppServerEvent::Disconnected { message: message.clone(), }, ); worker_exit_error = Some((ErrorKind::BrokenPipe, message)); break; } } RemoteClientCommand::Notify { notification, response_tx } => { let result = write_jsonrpc_message( &mut stream, JSONRPCMessage::Notification( jsonrpc_notification_from_client_notification(notification), ), &websocket_url, ) .await; let _ = response_tx.send(result); } RemoteClientCommand::ResolveServerRequest { request_id, result, response_tx, } => { let result = write_jsonrpc_message( &mut stream, JSONRPCMessage::Response(JSONRPCResponse { id: request_id, result, }), &websocket_url, ) .await; let _ = response_tx.send(result); } RemoteClientCommand::RejectServerRequest { request_id, error, response_tx, } => { let result = write_jsonrpc_message( &mut stream, JSONRPCMessage::Error(JSONRPCError { error, id: request_id, }), &websocket_url, ) .await; let _ = response_tx.send(result); } RemoteClientCommand::Shutdown { response_tx } => { let close_result = stream.close(None).await.map_err(|err| { IoError::other(format!( "failed to close websocket app server `{websocket_url}`: {err}" )) }); let _ = response_tx.send(close_result); break; } } } message = stream.next() => { match message { Some(Ok(Message::Text(text))) => { match serde_json::from_str::(&text) { Ok(JSONRPCMessage::Response(response)) => { if let Some(response_tx) = pending_requests.remove(&response.id) { let _ = response_tx.send(Ok(Ok(response.result))); } } Ok(JSONRPCMessage::Error(error)) => { if let Some(response_tx) = pending_requests.remove(&error.id) { let _ = response_tx.send(Ok(Err(error.error))); } } Ok(JSONRPCMessage::Notification(notification)) => { if let Some(event) = app_server_event_from_notification(notification) && let Err(err) = deliver_event( &event_tx, event, ) { warn!(%err, "failed to deliver remote app-server event"); break; } } Ok(JSONRPCMessage::Request(request)) => { let request_id = request.id.clone(); let method = request.method.clone(); match ServerRequest::try_from(request) { Ok(request) => { if let Err(err) = deliver_event( &event_tx, AppServerEvent::ServerRequest(request), ) { warn!(%err, "failed to deliver remote app-server server request"); break; } } Err(err) => { warn!(%err, method, "rejecting unknown remote app-server request"); if let Err(reject_err) = write_jsonrpc_message( &mut stream, JSONRPCMessage::Error(JSONRPCError { error: JSONRPCErrorError { code: -32601, message: format!( "unsupported remote app-server request `{method}`" ), data: None, }, id: request_id, }), &websocket_url, ) .await { let err_message = reject_err.to_string(); let message = format!( "remote app server at `{websocket_url}` write failed: {err_message}" ); let _ = deliver_event( &event_tx, AppServerEvent::Disconnected { message: message.clone(), }, ); worker_exit_error = Some((ErrorKind::BrokenPipe, message)); break; } } } } Err(err) => { let message = format!( "remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}" ); let _ = deliver_event( &event_tx, AppServerEvent::Disconnected { message: message.clone(), }, ); worker_exit_error = Some((ErrorKind::InvalidData, message)); break; } } } Some(Ok(Message::Close(frame))) => { let reason = frame .as_ref() .map(|frame| frame.reason.to_string()) .filter(|reason| !reason.is_empty()) .unwrap_or_else(|| "connection closed".to_string()); let message = format!( "remote app server at `{websocket_url}` disconnected: {reason}" ); let _ = deliver_event( &event_tx, AppServerEvent::Disconnected { message: message.clone(), }, ); worker_exit_error = Some(( ErrorKind::ConnectionAborted, message, )); break; } Some(Ok(Message::Binary(_))) | Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {} Some(Err(err)) => { let message = format!( "remote app server at `{websocket_url}` transport failed: {err}" ); let _ = deliver_event( &event_tx, AppServerEvent::Disconnected { message: message.clone(), }, ); worker_exit_error = Some((ErrorKind::InvalidData, message)); break; } None => { let message = format!( "remote app server at `{websocket_url}` closed the connection" ); let _ = deliver_event( &event_tx, AppServerEvent::Disconnected { message: message.clone(), }, ); worker_exit_error = Some((ErrorKind::UnexpectedEof, message)); break; } } } } } let (err_kind, err_message) = worker_exit_error.unwrap_or_else(|| { ( ErrorKind::BrokenPipe, "remote app-server worker channel is closed".to_string(), ) }); for (_, response_tx) in pending_requests { let _ = response_tx.send(Err(IoError::new(err_kind, err_message.clone()))); } }); Ok(Self { command_tx, event_rx, pending_events: pending_events.into(), worker_handle, }) } pub fn request_handle(&self) -> RemoteAppServerRequestHandle { RemoteAppServerRequestHandle { command_tx: self.command_tx.clone(), } } pub async fn request(&self, request: ClientRequest) -> IoResult { let (response_tx, response_rx) = oneshot::channel(); self.command_tx .send(RemoteClientCommand::Request { request: Box::new(request), response_tx, }) .await .map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server worker channel is closed", ) })?; response_rx.await.map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server request channel is closed", ) })? } pub async fn request_typed(&self, request: ClientRequest) -> Result where T: DeserializeOwned, { let method = request_method_name(&request); let response = self.request(request) .await .map_err(|source| TypedRequestError::Transport { method: method.clone(), source, })?; let result = response.map_err(|source| TypedRequestError::Server { method: method.clone(), source, })?; serde_json::from_value(result) .map_err(|source| TypedRequestError::Deserialize { method, source }) } pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> { let (response_tx, response_rx) = oneshot::channel(); self.command_tx .send(RemoteClientCommand::Notify { notification, response_tx, }) .await .map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server worker channel is closed", ) })?; response_rx.await.map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server notify channel is closed", ) })? } pub async fn resolve_server_request( &self, request_id: RequestId, result: JsonRpcResult, ) -> IoResult<()> { let (response_tx, response_rx) = oneshot::channel(); self.command_tx .send(RemoteClientCommand::ResolveServerRequest { request_id, result, response_tx, }) .await .map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server worker channel is closed", ) })?; response_rx.await.map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server resolve channel is closed", ) })? } pub async fn reject_server_request( &self, request_id: RequestId, error: JSONRPCErrorError, ) -> IoResult<()> { let (response_tx, response_rx) = oneshot::channel(); self.command_tx .send(RemoteClientCommand::RejectServerRequest { request_id, error, response_tx, }) .await .map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server worker channel is closed", ) })?; response_rx.await.map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server reject channel is closed", ) })? } pub async fn next_event(&mut self) -> Option { if let Some(event) = self.pending_events.pop_front() { return Some(event); } self.event_rx.recv().await } pub async fn shutdown(self) -> IoResult<()> { let Self { command_tx, event_rx, pending_events: _pending_events, worker_handle, } = self; let mut worker_handle = worker_handle; drop(event_rx); let (response_tx, response_rx) = oneshot::channel(); if command_tx .send(RemoteClientCommand::Shutdown { response_tx }) .await .is_ok() && let Ok(Ok(close_result)) = timeout(SHUTDOWN_TIMEOUT, response_rx).await { close_result?; } if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await { worker_handle.abort(); let _ = worker_handle.await; } Ok(()) } } impl RemoteAppServerRequestHandle { pub async fn request(&self, request: ClientRequest) -> IoResult { let (response_tx, response_rx) = oneshot::channel(); self.command_tx .send(RemoteClientCommand::Request { request: Box::new(request), response_tx, }) .await .map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server worker channel is closed", ) })?; response_rx.await.map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server request channel is closed", ) })? } pub async fn request_typed(&self, request: ClientRequest) -> Result where T: DeserializeOwned, { let method = request_method_name(&request); let response = self.request(request) .await .map_err(|source| TypedRequestError::Transport { method: method.clone(), source, })?; let result = response.map_err(|source| TypedRequestError::Server { method: method.clone(), source, })?; serde_json::from_value(result) .map_err(|source| TypedRequestError::Deserialize { method, source }) } } async fn initialize_remote_connection( stream: &mut WebSocketStream>, websocket_url: &str, params: InitializeParams, initialize_timeout: Duration, ) -> IoResult> { let initialize_request_id = RequestId::String("initialize".to_string()); let mut pending_events = Vec::new(); write_jsonrpc_message( stream, JSONRPCMessage::Request(jsonrpc_request_from_client_request( ClientRequest::Initialize { request_id: initialize_request_id.clone(), params, }, )), websocket_url, ) .await?; timeout(initialize_timeout, async { loop { match stream.next().await { Some(Ok(Message::Text(text))) => { let message = serde_json::from_str::(&text).map_err(|err| { IoError::other(format!( "remote app server at `{websocket_url}` sent invalid initialize response: {err}" )) })?; match message { JSONRPCMessage::Response(response) if response.id == initialize_request_id => { break Ok(()); } JSONRPCMessage::Error(error) if error.id == initialize_request_id => { break Err(IoError::other(format!( "remote app server at `{websocket_url}` rejected initialize: {}", error.error.message ))); } JSONRPCMessage::Notification(notification) => { if let Some(event) = app_server_event_from_notification(notification) { pending_events.push(event); } } JSONRPCMessage::Request(request) => { let request_id = request.id.clone(); let method = request.method.clone(); match ServerRequest::try_from(request) { Ok(request) => { pending_events.push(AppServerEvent::ServerRequest(request)); } Err(err) => { warn!(%err, method, "rejecting unknown remote app-server request during initialize"); write_jsonrpc_message( stream, JSONRPCMessage::Error(JSONRPCError { error: JSONRPCErrorError { code: -32601, message: format!( "unsupported remote app-server request `{method}`" ), data: None, }, id: request_id, }), websocket_url, ) .await?; } } } JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => {} } } Some(Ok(Message::Binary(_))) | Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {} Some(Ok(Message::Close(frame))) => { let reason = frame .as_ref() .map(|frame| frame.reason.to_string()) .filter(|reason| !reason.is_empty()) .unwrap_or_else(|| "connection closed during initialize".to_string()); break Err(IoError::new( ErrorKind::ConnectionAborted, format!( "remote app server at `{websocket_url}` closed during initialize: {reason}" ), )); } Some(Err(err)) => { break Err(IoError::other(format!( "remote app server at `{websocket_url}` transport failed during initialize: {err}" ))); } None => { break Err(IoError::new( ErrorKind::UnexpectedEof, format!("remote app server at `{websocket_url}` closed during initialize"), )); } } } }) .await .map_err(|_| { IoError::new( ErrorKind::TimedOut, format!("timed out waiting for initialize response from `{websocket_url}`"), ) })??; write_jsonrpc_message( stream, JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification( ClientNotification::Initialized, )), websocket_url, ) .await?; Ok(pending_events) } fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option { match ServerNotification::try_from(notification) { Ok(notification) => Some(AppServerEvent::ServerNotification(notification)), Err(_) => None, } } fn deliver_event( event_tx: &mpsc::UnboundedSender, event: AppServerEvent, ) -> IoResult<()> { event_tx.send(event).map_err(|_| { IoError::new( ErrorKind::BrokenPipe, "remote app-server event consumer channel is closed", ) }) } fn request_id_from_client_request(request: &ClientRequest) -> RequestId { jsonrpc_request_from_client_request(request.clone()).id } fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest { let value = match serde_json::to_value(request) { Ok(value) => value, Err(err) => panic!("client request should serialize: {err}"), }; match serde_json::from_value(value) { Ok(request) => request, Err(err) => panic!("client request should encode as JSON-RPC request: {err}"), } } fn jsonrpc_notification_from_client_notification( notification: ClientNotification, ) -> JSONRPCNotification { let value = match serde_json::to_value(notification) { Ok(value) => value, Err(err) => panic!("client notification should serialize: {err}"), }; match serde_json::from_value(value) { Ok(notification) => notification, Err(err) => panic!("client notification should encode as JSON-RPC notification: {err}"), } } async fn write_jsonrpc_message( stream: &mut WebSocketStream>, message: JSONRPCMessage, websocket_url: &str, ) -> IoResult<()> { let payload = serde_json::to_string(&message).map_err(IoError::other)?; stream .send(Message::Text(payload.into())) .await .map_err(|err| { IoError::other(format!( "failed to write websocket message to `{websocket_url}`: {err}" )) }) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn shutdown_tolerates_worker_exit_after_command_is_queued() { let (command_tx, mut command_rx) = mpsc::channel(1); let (_event_tx, event_rx) = mpsc::unbounded_channel::(); let worker_handle = tokio::spawn(async move { let _ = command_rx.recv().await; }); let client = RemoteAppServerClient { command_tx, event_rx, pending_events: VecDeque::new(), worker_handle, }; client .shutdown() .await .expect("shutdown should complete when worker exits first"); } }