Files
codex/codex-rs/app-server-client/src/remote.rs
Eric Traut 79ea577156 TUI: Keep remote app-server events draining (#18932)
Addresses #18860

Problem: Remote app-server clients could stop draining websocket events
when their bounded local event channel filled, leaving clients stuck on
stale in-progress turns after a disconnect.

Solution: Use an unbounded local event channel for the remote client so
the websocket reader can keep forwarding disconnect and progress events
instead of blocking or dropping them.

Why this is reasonable: This does not make the remote websocket itself
unbounded. The changed queue lives inside the remote client, between the
task that reads the remote websocket and the API consumer in the same
client process. Once an event has been received from the remote server,
preserving it is preferable to blocking websocket reads or dropping
disconnect/lifecycle events; network-level backpressure still happens at
the websocket boundary if the remote side outpaces the client.
2026-04-22 09:29:34 -07:00

857 lines
35 KiB
Rust

/*
This module implements the websocket-backed app-server client transport.
It owns the remote connection lifecycle, including the initialize/initialized
handshake, JSON-RPC request/response routing, server-request resolution, and
notification streaming. The rest of the crate uses the same `AppServerEvent`
surface for both in-process and remote transports, so callers such as the TUI
can switch between them without changing their higher-level session logic.
*/
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::time::Duration;
use crate::AppServerEvent;
use crate::RequestResult;
use crate::SHUTDOWN_TIMEOUT;
use crate::TypedRequestError;
use crate::request_method_name;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use futures::SinkExt;
use futures::StreamExt;
use serde::de::DeserializeOwned;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION;
use tracing::warn;
use url::Url;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Clone)]
pub struct RemoteAppServerConnectArgs {
pub websocket_url: String,
pub auth_token: Option<String>,
pub client_name: String,
pub client_version: String,
pub experimental_api: bool,
pub opt_out_notification_methods: Vec<String>,
pub channel_capacity: usize,
}
impl RemoteAppServerConnectArgs {
fn initialize_params(&self) -> InitializeParams {
let capabilities = InitializeCapabilities {
experimental_api: self.experimental_api,
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
None
} else {
Some(self.opt_out_notification_methods.clone())
},
};
InitializeParams {
client_info: ClientInfo {
name: self.client_name.clone(),
title: None,
version: self.client_version.clone(),
},
capabilities: Some(capabilities),
}
}
}
pub(crate) fn websocket_url_supports_auth_token(url: &Url) -> bool {
match (url.scheme(), url.host()) {
("wss", Some(_)) => true,
("ws", Some(url::Host::Domain(domain))) => domain.eq_ignore_ascii_case("localhost"),
("ws", Some(url::Host::Ipv4(addr))) => addr.is_loopback(),
("ws", Some(url::Host::Ipv6(addr))) => addr.is_loopback(),
_ => false,
}
}
enum RemoteClientCommand {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<IoResult<RequestResult>>,
},
Notify {
notification: ClientNotification,
response_tx: oneshot::Sender<IoResult<()>>,
},
ResolveServerRequest {
request_id: RequestId,
result: JsonRpcResult,
response_tx: oneshot::Sender<IoResult<()>>,
},
RejectServerRequest {
request_id: RequestId,
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
},
}
pub struct RemoteAppServerClient {
command_tx: mpsc::Sender<RemoteClientCommand>,
event_rx: mpsc::UnboundedReceiver<AppServerEvent>,
pending_events: VecDeque<AppServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
}
#[derive(Clone)]
pub struct RemoteAppServerRequestHandle {
command_tx: mpsc::Sender<RemoteClientCommand>,
}
impl RemoteAppServerClient {
pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let websocket_url = args.websocket_url.clone();
let url = Url::parse(&websocket_url).map_err(|err| {
IoError::new(
ErrorKind::InvalidInput,
format!("invalid websocket URL `{websocket_url}`: {err}"),
)
})?;
if args.auth_token.is_some() && !websocket_url_supports_auth_token(&url) {
return Err(IoError::new(
ErrorKind::InvalidInput,
format!(
"remote auth tokens require `wss://` or loopback `ws://` URLs; got `{websocket_url}`"
),
));
}
let mut request = url.as_str().into_client_request().map_err(|err| {
IoError::new(
ErrorKind::InvalidInput,
format!("invalid websocket URL `{websocket_url}`: {err}"),
)
})?;
if let Some(auth_token) = args.auth_token.as_deref() {
let header_value =
HeaderValue::from_str(&format!("Bearer {auth_token}")).map_err(|err| {
IoError::new(
ErrorKind::InvalidInput,
format!("invalid remote authorization header value: {err}"),
)
})?;
request.headers_mut().insert(AUTHORIZATION, header_value);
}
ensure_rustls_crypto_provider();
let stream = timeout(CONNECT_TIMEOUT, connect_async(request))
.await
.map_err(|_| {
IoError::new(
ErrorKind::TimedOut,
format!("timed out connecting to remote app server at `{websocket_url}`"),
)
})?
.map(|(stream, _response)| stream)
.map_err(|err| {
IoError::other(format!(
"failed to connect to remote app server at `{websocket_url}`: {err}"
))
})?;
let mut stream = stream;
let pending_events = initialize_remote_connection(
&mut stream,
&websocket_url,
args.initialize_params(),
INITIALIZE_TIMEOUT,
)
.await?;
let (command_tx, mut command_rx) = mpsc::channel::<RemoteClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::unbounded_channel::<AppServerEvent>();
let worker_handle = tokio::spawn(async move {
let mut pending_requests =
HashMap::<RequestId, oneshot::Sender<IoResult<RequestResult>>>::new();
loop {
tokio::select! {
command = command_rx.recv() => {
let Some(command) = command else {
let _ = stream.close(None).await;
break;
};
match command {
RemoteClientCommand::Request { request, response_tx } => {
let request_id = request_id_from_client_request(&request);
if pending_requests.contains_key(&request_id) {
let _ = response_tx.send(Err(IoError::new(
ErrorKind::InvalidInput,
format!("duplicate remote app-server request id `{request_id}`"),
)));
continue;
}
pending_requests.insert(request_id.clone(), response_tx);
if let Err(err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)),
&websocket_url,
)
.await
{
let err_message = err.to_string();
if let Some(response_tx) = pending_requests.remove(&request_id) {
let _ = response_tx.send(Err(err));
}
let _ = deliver_event(
&event_tx,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
);
break;
}
}
RemoteClientCommand::Notify { notification, response_tx } => {
let result = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Notification(
jsonrpc_notification_from_client_notification(notification),
),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
}
RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
} => {
let result = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result,
}),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
}
RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
} => {
let result = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Error(JSONRPCError {
error,
id: request_id,
}),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
}
RemoteClientCommand::Shutdown { response_tx } => {
let close_result = stream.close(None).await.map_err(|err| {
IoError::other(format!(
"failed to close websocket app server `{websocket_url}`: {err}"
))
});
let _ = response_tx.send(close_result);
break;
}
}
}
message = stream.next() => {
match message {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<JSONRPCMessage>(&text) {
Ok(JSONRPCMessage::Response(response)) => {
if let Some(response_tx) = pending_requests.remove(&response.id) {
let _ = response_tx.send(Ok(Ok(response.result)));
}
}
Ok(JSONRPCMessage::Error(error)) => {
if let Some(response_tx) = pending_requests.remove(&error.id) {
let _ = response_tx.send(Ok(Err(error.error)));
}
}
Ok(JSONRPCMessage::Notification(notification)) => {
if let Some(event) =
app_server_event_from_notification(notification)
&& let Err(err) = deliver_event(
&event_tx,
event,
)
{
warn!(%err, "failed to deliver remote app-server event");
break;
}
}
Ok(JSONRPCMessage::Request(request)) => {
let request_id = request.id.clone();
let method = request.method.clone();
match ServerRequest::try_from(request) {
Ok(request) => {
if let Err(err) = deliver_event(
&event_tx,
AppServerEvent::ServerRequest(request),
)
{
warn!(%err, "failed to deliver remote app-server server request");
break;
}
}
Err(err) => {
warn!(%err, method, "rejecting unknown remote app-server request");
if let Err(reject_err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Error(JSONRPCError {
error: JSONRPCErrorError {
code: -32601,
message: format!(
"unsupported remote app-server request `{method}`"
),
data: None,
},
id: request_id,
}),
&websocket_url,
)
.await
{
let err_message = reject_err.to_string();
let _ = deliver_event(
&event_tx,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
);
break;
}
}
}
}
Err(err) => {
let _ = deliver_event(
&event_tx,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}"
),
},
);
break;
}
}
}
Some(Ok(Message::Close(frame))) => {
let reason = frame
.as_ref()
.map(|frame| frame.reason.to_string())
.filter(|reason| !reason.is_empty())
.unwrap_or_else(|| "connection closed".to_string());
let _ = deliver_event(
&event_tx,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` disconnected: {reason}"
),
},
);
break;
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Err(err)) => {
let _ = deliver_event(
&event_tx,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` transport failed: {err}"
),
},
);
break;
}
None => {
let _ = deliver_event(
&event_tx,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` closed the connection"
),
},
);
break;
}
}
}
}
}
let err = IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
);
for (_, response_tx) in pending_requests {
let _ = response_tx.send(Err(IoError::new(err.kind(), err.to_string())));
}
});
Ok(Self {
command_tx,
event_rx,
pending_events: pending_events.into(),
worker_handle,
})
}
pub fn request_handle(&self) -> RemoteAppServerRequestHandle {
RemoteAppServerRequestHandle {
command_tx: self.command_tx.clone(),
}
}
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server request channel is closed",
)
})?
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Notify {
notification,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server notify channel is closed",
)
})?
}
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server resolve channel is closed",
)
})?
}
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server reject channel is closed",
)
})?
}
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
if let Some(event) = self.pending_events.pop_front() {
return Some(event);
}
self.event_rx.recv().await
}
pub async fn shutdown(self) -> IoResult<()> {
let Self {
command_tx,
event_rx,
pending_events: _pending_events,
worker_handle,
} = self;
let mut worker_handle = worker_handle;
drop(event_rx);
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(RemoteClientCommand::Shutdown { response_tx })
.await
.is_ok()
&& let Ok(Ok(close_result)) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
{
close_result?;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
worker_handle.abort();
let _ = worker_handle.await;
}
Ok(())
}
}
impl RemoteAppServerRequestHandle {
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server request channel is closed",
)
})?
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
}
async fn initialize_remote_connection(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
websocket_url: &str,
params: InitializeParams,
initialize_timeout: Duration,
) -> IoResult<Vec<AppServerEvent>> {
let initialize_request_id = RequestId::String("initialize".to_string());
let mut pending_events = Vec::new();
write_jsonrpc_message(
stream,
JSONRPCMessage::Request(jsonrpc_request_from_client_request(
ClientRequest::Initialize {
request_id: initialize_request_id.clone(),
params,
},
)),
websocket_url,
)
.await?;
timeout(initialize_timeout, async {
loop {
match stream.next().await {
Some(Ok(Message::Text(text))) => {
let message = serde_json::from_str::<JSONRPCMessage>(&text).map_err(|err| {
IoError::other(format!(
"remote app server at `{websocket_url}` sent invalid initialize response: {err}"
))
})?;
match message {
JSONRPCMessage::Response(response) if response.id == initialize_request_id => {
break Ok(());
}
JSONRPCMessage::Error(error) if error.id == initialize_request_id => {
break Err(IoError::other(format!(
"remote app server at `{websocket_url}` rejected initialize: {}",
error.error.message
)));
}
JSONRPCMessage::Notification(notification) => {
if let Some(event) = app_server_event_from_notification(notification) {
pending_events.push(event);
}
}
JSONRPCMessage::Request(request) => {
let request_id = request.id.clone();
let method = request.method.clone();
match ServerRequest::try_from(request) {
Ok(request) => {
pending_events.push(AppServerEvent::ServerRequest(request));
}
Err(err) => {
warn!(%err, method, "rejecting unknown remote app-server request during initialize");
write_jsonrpc_message(
stream,
JSONRPCMessage::Error(JSONRPCError {
error: JSONRPCErrorError {
code: -32601,
message: format!(
"unsupported remote app-server request `{method}`"
),
data: None,
},
id: request_id,
}),
websocket_url,
)
.await?;
}
}
}
JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => {}
}
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Close(frame))) => {
let reason = frame
.as_ref()
.map(|frame| frame.reason.to_string())
.filter(|reason| !reason.is_empty())
.unwrap_or_else(|| "connection closed during initialize".to_string());
break Err(IoError::new(
ErrorKind::ConnectionAborted,
format!(
"remote app server at `{websocket_url}` closed during initialize: {reason}"
),
));
}
Some(Err(err)) => {
break Err(IoError::other(format!(
"remote app server at `{websocket_url}` transport failed during initialize: {err}"
)));
}
None => {
break Err(IoError::new(
ErrorKind::UnexpectedEof,
format!("remote app server at `{websocket_url}` closed during initialize"),
));
}
}
}
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::TimedOut,
format!("timed out waiting for initialize response from `{websocket_url}`"),
)
})??;
write_jsonrpc_message(
stream,
JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification(
ClientNotification::Initialized,
)),
websocket_url,
)
.await?;
Ok(pending_events)
}
fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option<AppServerEvent> {
match ServerNotification::try_from(notification) {
Ok(notification) => Some(AppServerEvent::ServerNotification(notification)),
Err(_) => None,
}
}
fn deliver_event(
event_tx: &mpsc::UnboundedSender<AppServerEvent>,
event: AppServerEvent,
) -> IoResult<()> {
event_tx.send(event).map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
)
})
}
fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
jsonrpc_request_from_client_request(request.clone()).id
}
fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest {
let value = match serde_json::to_value(request) {
Ok(value) => value,
Err(err) => panic!("client request should serialize: {err}"),
};
match serde_json::from_value(value) {
Ok(request) => request,
Err(err) => panic!("client request should encode as JSON-RPC request: {err}"),
}
}
fn jsonrpc_notification_from_client_notification(
notification: ClientNotification,
) -> JSONRPCNotification {
let value = match serde_json::to_value(notification) {
Ok(value) => value,
Err(err) => panic!("client notification should serialize: {err}"),
};
match serde_json::from_value(value) {
Ok(notification) => notification,
Err(err) => panic!("client notification should encode as JSON-RPC notification: {err}"),
}
}
async fn write_jsonrpc_message(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
message: JSONRPCMessage,
websocket_url: &str,
) -> IoResult<()> {
let payload = serde_json::to_string(&message).map_err(IoError::other)?;
stream
.send(Message::Text(payload.into()))
.await
.map_err(|err| {
IoError::other(format!(
"failed to write websocket message to `{websocket_url}`: {err}"
))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn shutdown_tolerates_worker_exit_after_command_is_queued() {
let (command_tx, mut command_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::unbounded_channel::<AppServerEvent>();
let worker_handle = tokio::spawn(async move {
let _ = command_rx.recv().await;
});
let client = RemoteAppServerClient {
command_tx,
event_rx,
pending_events: VecDeque::new(),
worker_handle,
};
client
.shutdown()
.await
.expect("shutdown should complete when worker exits first");
}
}