Files
codex/codex-rs/app-server-client/src/remote.rs
Felipe Coury e9996ec62a fix(tui_app_server): preserve transcript events under backpressure (#15759)
## TL;DR

When running codex with `-c features.tui_app_server=true` we see
corruption when streaming large amounts of data. This PR marks other
event types as _critical_ by making them _must-deliver_.

## Problem

When the TUI consumer falls behind the app-server event stream, the
bounded `mpsc` channel fills up and the forwarding layer drops events
via `try_send`. Previously only `TurnCompleted` was marked as
must-deliver. Streamed assistant text (`AgentMessageDelta`) and the
authoritative final item (`ItemCompleted`) were treated as droppable —
the same as ephemeral command output deltas. Because the TUI renders
markdown incrementally from these deltas, dropping any of them produces
permanently corrupted or incomplete paragraphs that persist for the rest
of the session.

## Mental model

The app-server event stream has two tiers of importance:

1. **Lossless (transcript + terminal):** Events that form the
authoritative record of what the assistant said or that signal turn
lifecycle transitions. Losing any of these corrupts the visible output
or leaves surfaces waiting forever. These are: `AgentMessageDelta`,
`PlanDelta`, `ReasoningSummaryTextDelta`, `ReasoningTextDelta`,
`ItemCompleted`, and `TurnCompleted`.

2. **Best-effort (everything else):** Ephemeral status events like
`CommandExecutionOutputDelta` and progress notifications. Dropping these
under load causes cosmetic gaps but no permanent corruption.

The forwarding layer uses `try_send` for best-effort events (dropping on
backpressure) and blocking `send().await` for lossless events (applying
back-pressure to the producer until the consumer catches up).

## Non-goals

- Eliminating backpressure entirely. The bounded queue is intentional;
this change only widens the set of events that survive it.
- Changing the event protocol or adding new notification types.
- Addressing root causes of consumer slowness (e.g. TUI render cost).

## Tradeoffs

Blocking on transcript events means a slow consumer can now stall the
producer for the duration of those events. This is acceptable because:
(a) the alternative is permanently broken output, which is worse; (b)
the consumer already had to keep up with `TurnCompleted` blocking sends;
and (c) transcript events arrive at model-output speed, not burst speed,
so sustained saturation is unlikely in practice.

## Architecture

Two parallel changes, one per transport:

- **In-process path** (`lib.rs`): The inline forwarding logic was
extracted into `forward_in_process_event`, a standalone async function
that encapsulates the lag-marker / must-deliver / try-send decision
tree. The worker loop now delegates to it. A new
`server_notification_requires_delivery` function (shared `pub(crate)`)
centralizes the notification classification.

- **Remote path** (`remote.rs`): The local `event_requires_delivery` now
delegates to the same shared `server_notification_requires_delivery`,
keeping both transports in sync.

## Observability

No new metrics or log lines. The existing `warn!` on event drops
continues to fire for best-effort events. Lossless events that block
will not produce a log line (they simply wait).

## Tests

- `event_requires_delivery_marks_transcript_and_terminal_events`: unit
test confirming the expanded classification covers `AgentMessageDelta`,
`ItemCompleted`, `TurnCompleted`, and excludes
`CommandExecutionOutputDelta` and `Lagged`.
-
`forward_in_process_event_preserves_transcript_notifications_under_backpressure`:
integration-style test that fills a capacity-1 channel, verifies a
best-effort event is dropped (skipped count increments), then sends
lossless transcript events and confirms they all arrive in order with
the correct lag marker preceding them.
- `remote_backpressure_preserves_transcript_notifications`: end-to-end
test over a real websocket that verifies the remote transport preserves
transcript events under the same backpressure scenario.
- `event_requires_delivery_marks_transcript_and_disconnect_events`
(remote): unit test confirming the remote-side classification covers
transcript events and `Disconnected`.

---------

Co-authored-by: Eric Traut <etraut@openai.com>
2026-03-25 13:50:39 -06:00

946 lines
38 KiB
Rust

/*
This module implements the websocket-backed app-server client transport.
It owns the remote connection lifecycle, including the initialize/initialized
handshake, JSON-RPC request/response routing, server-request resolution, and
notification streaming. The rest of the crate uses the same `AppServerEvent`
surface for both in-process and remote transports, so callers such as
`tui_app_server` can switch between them without changing their higher-level
session logic.
*/
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::time::Duration;
use crate::AppServerEvent;
use crate::RequestResult;
use crate::SHUTDOWN_TIMEOUT;
use crate::TypedRequestError;
use crate::request_method_name;
use crate::server_notification_requires_delivery;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use futures::SinkExt;
use futures::StreamExt;
use serde::de::DeserializeOwned;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use tracing::warn;
use url::Url;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Clone)]
pub struct RemoteAppServerConnectArgs {
pub websocket_url: String,
pub client_name: String,
pub client_version: String,
pub experimental_api: bool,
pub opt_out_notification_methods: Vec<String>,
pub channel_capacity: usize,
}
impl RemoteAppServerConnectArgs {
fn initialize_params(&self) -> InitializeParams {
let capabilities = InitializeCapabilities {
experimental_api: self.experimental_api,
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
None
} else {
Some(self.opt_out_notification_methods.clone())
},
};
InitializeParams {
client_info: ClientInfo {
name: self.client_name.clone(),
title: None,
version: self.client_version.clone(),
},
capabilities: Some(capabilities),
}
}
}
enum RemoteClientCommand {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<IoResult<RequestResult>>,
},
Notify {
notification: ClientNotification,
response_tx: oneshot::Sender<IoResult<()>>,
},
ResolveServerRequest {
request_id: RequestId,
result: JsonRpcResult,
response_tx: oneshot::Sender<IoResult<()>>,
},
RejectServerRequest {
request_id: RequestId,
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
},
}
pub struct RemoteAppServerClient {
command_tx: mpsc::Sender<RemoteClientCommand>,
event_rx: mpsc::Receiver<AppServerEvent>,
pending_events: VecDeque<AppServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
}
#[derive(Clone)]
pub struct RemoteAppServerRequestHandle {
command_tx: mpsc::Sender<RemoteClientCommand>,
}
impl RemoteAppServerClient {
pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let websocket_url = args.websocket_url.clone();
let url = Url::parse(&websocket_url).map_err(|err| {
IoError::new(
ErrorKind::InvalidInput,
format!("invalid websocket URL `{websocket_url}`: {err}"),
)
})?;
let stream = timeout(CONNECT_TIMEOUT, connect_async(url.as_str()))
.await
.map_err(|_| {
IoError::new(
ErrorKind::TimedOut,
format!("timed out connecting to remote app server at `{websocket_url}`"),
)
})?
.map(|(stream, _response)| stream)
.map_err(|err| {
IoError::other(format!(
"failed to connect to remote app server at `{websocket_url}`: {err}"
))
})?;
let mut stream = stream;
let pending_events = initialize_remote_connection(
&mut stream,
&websocket_url,
args.initialize_params(),
INITIALIZE_TIMEOUT,
)
.await?;
let (command_tx, mut command_rx) = mpsc::channel::<RemoteClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<AppServerEvent>(channel_capacity);
let worker_handle = tokio::spawn(async move {
let mut pending_requests =
HashMap::<RequestId, oneshot::Sender<IoResult<RequestResult>>>::new();
let mut skipped_events = 0usize;
loop {
tokio::select! {
command = command_rx.recv() => {
let Some(command) = command else {
let _ = stream.close(None).await;
break;
};
match command {
RemoteClientCommand::Request { request, response_tx } => {
let request_id = request_id_from_client_request(&request);
if pending_requests.contains_key(&request_id) {
let _ = response_tx.send(Err(IoError::new(
ErrorKind::InvalidInput,
format!("duplicate remote app-server request id `{request_id}`"),
)));
continue;
}
pending_requests.insert(request_id.clone(), response_tx);
if let Err(err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)),
&websocket_url,
)
.await
{
let err_message = err.to_string();
if let Some(response_tx) = pending_requests.remove(&request_id) {
let _ = response_tx.send(Err(err));
}
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
&mut stream,
)
.await;
break;
}
}
RemoteClientCommand::Notify { notification, response_tx } => {
let result = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Notification(
jsonrpc_notification_from_client_notification(notification),
),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
}
RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
} => {
let result = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result,
}),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
}
RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
} => {
let result = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Error(JSONRPCError {
error,
id: request_id,
}),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
}
RemoteClientCommand::Shutdown { response_tx } => {
let close_result = stream.close(None).await.map_err(|err| {
IoError::other(format!(
"failed to close websocket app server `{websocket_url}`: {err}"
))
});
let _ = response_tx.send(close_result);
break;
}
}
}
message = stream.next() => {
match message {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<JSONRPCMessage>(&text) {
Ok(JSONRPCMessage::Response(response)) => {
if let Some(response_tx) = pending_requests.remove(&response.id) {
let _ = response_tx.send(Ok(Ok(response.result)));
}
}
Ok(JSONRPCMessage::Error(error)) => {
if let Some(response_tx) = pending_requests.remove(&error.id) {
let _ = response_tx.send(Ok(Err(error.error)));
}
}
Ok(JSONRPCMessage::Notification(notification)) => {
if let Some(event) =
app_server_event_from_notification(notification)
&& let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
event,
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server event");
break;
}
}
Ok(JSONRPCMessage::Request(request)) => {
let request_id = request.id.clone();
let method = request.method.clone();
match ServerRequest::try_from(request) {
Ok(request) => {
if let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::ServerRequest(request),
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server server request");
break;
}
}
Err(err) => {
warn!(%err, method, "rejecting unknown remote app-server request");
if let Err(reject_err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Error(JSONRPCError {
error: JSONRPCErrorError {
code: -32601,
message: format!(
"unsupported remote app-server request `{method}`"
),
data: None,
},
id: request_id,
}),
&websocket_url,
)
.await
{
let err_message = reject_err.to_string();
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
&mut stream,
)
.await;
break;
}
}
}
}
Err(err) => {
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}"
),
},
&mut stream,
)
.await;
break;
}
}
}
Some(Ok(Message::Close(frame))) => {
let reason = frame
.as_ref()
.map(|frame| frame.reason.to_string())
.filter(|reason| !reason.is_empty())
.unwrap_or_else(|| "connection closed".to_string());
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` disconnected: {reason}"
),
},
&mut stream,
)
.await;
break;
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Err(err)) => {
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` transport failed: {err}"
),
},
&mut stream,
)
.await;
break;
}
None => {
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` closed the connection"
),
},
&mut stream,
)
.await;
break;
}
}
}
}
}
let err = IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
);
for (_, response_tx) in pending_requests {
let _ = response_tx.send(Err(IoError::new(err.kind(), err.to_string())));
}
});
Ok(Self {
command_tx,
event_rx,
pending_events: pending_events.into(),
worker_handle,
})
}
pub fn request_handle(&self) -> RemoteAppServerRequestHandle {
RemoteAppServerRequestHandle {
command_tx: self.command_tx.clone(),
}
}
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server request channel is closed",
)
})?
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Notify {
notification,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server notify channel is closed",
)
})?
}
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server resolve channel is closed",
)
})?
}
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server reject channel is closed",
)
})?
}
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
if let Some(event) = self.pending_events.pop_front() {
return Some(event);
}
self.event_rx.recv().await
}
pub async fn shutdown(self) -> IoResult<()> {
let Self {
command_tx,
event_rx,
pending_events: _pending_events,
worker_handle,
} = self;
let mut worker_handle = worker_handle;
drop(event_rx);
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(RemoteClientCommand::Shutdown { response_tx })
.await
.is_ok()
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
{
command_result.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server shutdown channel is closed",
)
})??;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
worker_handle.abort();
let _ = worker_handle.await;
}
Ok(())
}
}
impl RemoteAppServerRequestHandle {
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server request channel is closed",
)
})?
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
}
async fn initialize_remote_connection(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
websocket_url: &str,
params: InitializeParams,
initialize_timeout: Duration,
) -> IoResult<Vec<AppServerEvent>> {
let initialize_request_id = RequestId::String("initialize".to_string());
let mut pending_events = Vec::new();
write_jsonrpc_message(
stream,
JSONRPCMessage::Request(jsonrpc_request_from_client_request(
ClientRequest::Initialize {
request_id: initialize_request_id.clone(),
params,
},
)),
websocket_url,
)
.await?;
timeout(initialize_timeout, async {
loop {
match stream.next().await {
Some(Ok(Message::Text(text))) => {
let message = serde_json::from_str::<JSONRPCMessage>(&text).map_err(|err| {
IoError::other(format!(
"remote app server at `{websocket_url}` sent invalid initialize response: {err}"
))
})?;
match message {
JSONRPCMessage::Response(response) if response.id == initialize_request_id => {
break Ok(());
}
JSONRPCMessage::Error(error) if error.id == initialize_request_id => {
break Err(IoError::other(format!(
"remote app server at `{websocket_url}` rejected initialize: {}",
error.error.message
)));
}
JSONRPCMessage::Notification(notification) => {
if let Some(event) = app_server_event_from_notification(notification) {
pending_events.push(event);
}
}
JSONRPCMessage::Request(request) => {
let request_id = request.id.clone();
let method = request.method.clone();
match ServerRequest::try_from(request) {
Ok(request) => {
pending_events.push(AppServerEvent::ServerRequest(request));
}
Err(err) => {
warn!(%err, method, "rejecting unknown remote app-server request during initialize");
write_jsonrpc_message(
stream,
JSONRPCMessage::Error(JSONRPCError {
error: JSONRPCErrorError {
code: -32601,
message: format!(
"unsupported remote app-server request `{method}`"
),
data: None,
},
id: request_id,
}),
websocket_url,
)
.await?;
}
}
}
JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => {}
}
}
Some(Ok(Message::Binary(_)))
| Some(Ok(Message::Ping(_)))
| Some(Ok(Message::Pong(_)))
| Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Close(frame))) => {
let reason = frame
.as_ref()
.map(|frame| frame.reason.to_string())
.filter(|reason| !reason.is_empty())
.unwrap_or_else(|| "connection closed during initialize".to_string());
break Err(IoError::new(
ErrorKind::ConnectionAborted,
format!(
"remote app server at `{websocket_url}` closed during initialize: {reason}"
),
));
}
Some(Err(err)) => {
break Err(IoError::other(format!(
"remote app server at `{websocket_url}` transport failed during initialize: {err}"
)));
}
None => {
break Err(IoError::new(
ErrorKind::UnexpectedEof,
format!("remote app server at `{websocket_url}` closed during initialize"),
));
}
}
}
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::TimedOut,
format!("timed out waiting for initialize response from `{websocket_url}`"),
)
})??;
write_jsonrpc_message(
stream,
JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification(
ClientNotification::Initialized,
)),
websocket_url,
)
.await?;
Ok(pending_events)
}
fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option<AppServerEvent> {
match ServerNotification::try_from(notification) {
Ok(notification) => Some(AppServerEvent::ServerNotification(notification)),
Err(_) => None,
}
}
async fn deliver_event(
event_tx: &mpsc::Sender<AppServerEvent>,
skipped_events: &mut usize,
event: AppServerEvent,
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
) -> IoResult<()> {
if *skipped_events > 0 {
if event_requires_delivery(&event) {
if event_tx
.send(AppServerEvent::Lagged {
skipped: *skipped_events,
})
.await
.is_err()
{
return Err(IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
));
}
*skipped_events = 0;
} else {
match event_tx.try_send(AppServerEvent::Lagged {
skipped: *skipped_events,
}) {
Ok(()) => *skipped_events = 0,
Err(mpsc::error::TrySendError::Full(_)) => {
*skipped_events = (*skipped_events).saturating_add(1);
reject_if_server_request_dropped(stream, &event).await?;
return Ok(());
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return Err(IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
));
}
}
}
}
if event_requires_delivery(&event) {
event_tx.send(event).await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
)
})?;
return Ok(());
}
match event_tx.try_send(event) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(event)) => {
*skipped_events = (*skipped_events).saturating_add(1);
reject_if_server_request_dropped(stream, &event).await
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
)),
}
}
async fn reject_if_server_request_dropped(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
event: &AppServerEvent,
) -> IoResult<()> {
let AppServerEvent::ServerRequest(request) = event else {
return Ok(());
};
write_jsonrpc_message(
stream,
JSONRPCMessage::Error(JSONRPCError {
error: JSONRPCErrorError {
code: -32001,
message: "remote app-server event queue is full".to_string(),
data: None,
},
id: request.id().clone(),
}),
"<remote-app-server>",
)
.await
}
fn event_requires_delivery(event: &AppServerEvent) -> bool {
match event {
AppServerEvent::ServerNotification(notification) => {
server_notification_requires_delivery(notification)
}
AppServerEvent::Disconnected { .. } => true,
AppServerEvent::Lagged { .. } | AppServerEvent::ServerRequest(_) => false,
}
}
fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
jsonrpc_request_from_client_request(request.clone()).id
}
fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest {
let value = match serde_json::to_value(request) {
Ok(value) => value,
Err(err) => panic!("client request should serialize: {err}"),
};
match serde_json::from_value(value) {
Ok(request) => request,
Err(err) => panic!("client request should encode as JSON-RPC request: {err}"),
}
}
fn jsonrpc_notification_from_client_notification(
notification: ClientNotification,
) -> JSONRPCNotification {
let value = match serde_json::to_value(notification) {
Ok(value) => value,
Err(err) => panic!("client notification should serialize: {err}"),
};
match serde_json::from_value(value) {
Ok(notification) => notification,
Err(err) => panic!("client notification should encode as JSON-RPC notification: {err}"),
}
}
async fn write_jsonrpc_message(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
message: JSONRPCMessage,
websocket_url: &str,
) -> IoResult<()> {
let payload = serde_json::to_string(&message).map_err(IoError::other)?;
stream
.send(Message::Text(payload.into()))
.await
.map_err(|err| {
IoError::other(format!(
"failed to write websocket message to `{websocket_url}`: {err}"
))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn event_requires_delivery_marks_transcript_and_disconnect_events() {
assert!(event_requires_delivery(
&AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
codex_app_server_protocol::AgentMessageDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: "hello".to_string(),
},
),)
));
assert!(event_requires_delivery(
&AppServerEvent::ServerNotification(ServerNotification::ItemCompleted(
codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item: codex_app_server_protocol::ThreadItem::Plan {
id: "item".to_string(),
text: "step".to_string(),
},
}
),)
));
assert!(event_requires_delivery(&AppServerEvent::Disconnected {
message: "closed".to_string(),
}));
assert!(!event_requires_delivery(&AppServerEvent::Lagged {
skipped: 1
}));
}
}