fix(tui_app_server): preserve transcript events under backpressure (#15759)

## TL;DR

When running codex with `-c features.tui_app_server=true` we see
corruption when streaming large amounts of data. This PR marks other
event types as _critical_ by making them _must-deliver_.

## Problem

When the TUI consumer falls behind the app-server event stream, the
bounded `mpsc` channel fills up and the forwarding layer drops events
via `try_send`. Previously only `TurnCompleted` was marked as
must-deliver. Streamed assistant text (`AgentMessageDelta`) and the
authoritative final item (`ItemCompleted`) were treated as droppable —
the same as ephemeral command output deltas. Because the TUI renders
markdown incrementally from these deltas, dropping any of them produces
permanently corrupted or incomplete paragraphs that persist for the rest
of the session.

## Mental model

The app-server event stream has two tiers of importance:

1. **Lossless (transcript + terminal):** Events that form the
authoritative record of what the assistant said or that signal turn
lifecycle transitions. Losing any of these corrupts the visible output
or leaves surfaces waiting forever. These are: `AgentMessageDelta`,
`PlanDelta`, `ReasoningSummaryTextDelta`, `ReasoningTextDelta`,
`ItemCompleted`, and `TurnCompleted`.

2. **Best-effort (everything else):** Ephemeral status events like
`CommandExecutionOutputDelta` and progress notifications. Dropping these
under load causes cosmetic gaps but no permanent corruption.

The forwarding layer uses `try_send` for best-effort events (dropping on
backpressure) and blocking `send().await` for lossless events (applying
back-pressure to the producer until the consumer catches up).

## Non-goals

- Eliminating backpressure entirely. The bounded queue is intentional;
this change only widens the set of events that survive it.
- Changing the event protocol or adding new notification types.
- Addressing root causes of consumer slowness (e.g. TUI render cost).

## Tradeoffs

Blocking on transcript events means a slow consumer can now stall the
producer for the duration of those events. This is acceptable because:
(a) the alternative is permanently broken output, which is worse; (b)
the consumer already had to keep up with `TurnCompleted` blocking sends;
and (c) transcript events arrive at model-output speed, not burst speed,
so sustained saturation is unlikely in practice.

## Architecture

Two parallel changes, one per transport:

- **In-process path** (`lib.rs`): The inline forwarding logic was
extracted into `forward_in_process_event`, a standalone async function
that encapsulates the lag-marker / must-deliver / try-send decision
tree. The worker loop now delegates to it. A new
`server_notification_requires_delivery` function (shared `pub(crate)`)
centralizes the notification classification.

- **Remote path** (`remote.rs`): The local `event_requires_delivery` now
delegates to the same shared `server_notification_requires_delivery`,
keeping both transports in sync.

## Observability

No new metrics or log lines. The existing `warn!` on event drops
continues to fire for best-effort events. Lossless events that block
will not produce a log line (they simply wait).

## Tests

- `event_requires_delivery_marks_transcript_and_terminal_events`: unit
test confirming the expanded classification covers `AgentMessageDelta`,
`ItemCompleted`, `TurnCompleted`, and excludes
`CommandExecutionOutputDelta` and `Lagged`.
-
`forward_in_process_event_preserves_transcript_notifications_under_backpressure`:
integration-style test that fills a capacity-1 channel, verifies a
best-effort event is dropped (skipped count increments), then sends
lossless transcript events and confirms they all arrive in order with
the correct lag marker preceding them.
- `remote_backpressure_preserves_transcript_notifications`: end-to-end
test over a real websocket that verifies the remote transport preserves
transcript events under the same backpressure scenario.
- `event_requires_delivery_marks_transcript_and_disconnect_events`
(remote): unit test confirming the remote-side classification covers
transcript events and `Disconnected`.

---------

Co-authored-by: Eric Traut <etraut@openai.com>
This commit is contained in:
Felipe Coury
2026-03-25 16:50:39 -03:00
committed by GitHub
parent 6124564297
commit e9996ec62a
2 changed files with 457 additions and 89 deletions

View File

@@ -85,17 +85,128 @@ impl From<InProcessServerEvent> for AppServerEvent {
}
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
// These terminal events drive surface shutdown/completion state. Dropping
// them under backpressure can leave exec/TUI waiting forever even though
// the underlying turn has already ended.
// These transcript and terminal events must remain lossless. Dropping
// streamed assistant text or the authoritative completed item can leave
// the TUI with permanently corrupted markdown, while dropping completion
// notifications can leave surfaces waiting forever.
match event {
InProcessServerEvent::ServerNotification(notification) => {
server_notification_requires_delivery(notification)
}
_ => false,
}
}
/// Returns `true` for notifications that must survive backpressure.
///
/// Transcript events (`AgentMessageDelta`, `PlanDelta`, reasoning deltas) and
/// the authoritative `ItemCompleted` / `TurnCompleted` form the lossless tier
/// of the event stream. Dropping any of these corrupts the visible assistant
/// output or leaves surfaces waiting for a completion signal that already
/// fired. Everything else (`CommandExecutionOutputDelta`, progress, etc.) is
/// best-effort and may be dropped with only cosmetic impact.
///
/// Both the in-process and remote transports delegate to this function so the
/// classification stays in sync.
pub(crate) fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(
event,
InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
)
notification,
ServerNotification::TurnCompleted(_)
| ServerNotification::ItemCompleted(_)
| ServerNotification::AgentMessageDelta(_)
| ServerNotification::PlanDelta(_)
| ServerNotification::ReasoningSummaryTextDelta(_)
| ServerNotification::ReasoningTextDelta(_)
)
}
/// Outcome of attempting to forward a single event to the consumer channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ForwardEventResult {
/// The event was delivered (or intentionally dropped); the stream is healthy.
Continue,
/// The consumer channel is closed; the caller should stop producing events.
DisableStream,
}
/// Forwards a single in-process event to the consumer, respecting the
/// lossless/best-effort split.
///
/// Lossless events (transcript deltas, item/turn completions) block until the
/// consumer drains capacity. Best-effort events use `try_send` and increment
/// `skipped_events` on failure. When a lag marker needs to be flushed before a
/// lossless event, the flush itself blocks so the marker is never lost.
///
/// If a dropped event is a `ServerRequest`, `reject_server_request` is called
/// so the server does not wait for a response that will never come.
async fn forward_in_process_event<F>(
event_tx: &mpsc::Sender<InProcessServerEvent>,
skipped_events: &mut usize,
event: InProcessServerEvent,
mut reject_server_request: F,
) -> ForwardEventResult
where
F: FnMut(ServerRequest),
{
if *skipped_events > 0 {
if event_requires_delivery(&event) {
// Surface lag before the lossless event, but do not let the lag marker itself cause
// us to drop the transcript/completion notification the caller is blocked on.
if event_tx
.send(InProcessServerEvent::Lagged {
skipped: *skipped_events,
})
.await
.is_err()
{
return ForwardEventResult::DisableStream;
}
*skipped_events = 0;
} else {
match event_tx.try_send(InProcessServerEvent::Lagged {
skipped: *skipped_events,
}) {
Ok(()) => {
*skipped_events = 0;
}
Err(mpsc::error::TrySendError::Full(_)) => {
*skipped_events = skipped_events.saturating_add(1);
warn!("dropping in-process app-server event because consumer queue is full");
if let InProcessServerEvent::ServerRequest(request) = event {
reject_server_request(request);
}
return ForwardEventResult::Continue;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return ForwardEventResult::DisableStream;
}
}
}
}
if event_requires_delivery(&event) {
// Block until the consumer catches up for transcript/completion notifications; this
// preserves the visible assistant output even when the queue is otherwise saturated.
if event_tx.send(event).await.is_err() {
return ForwardEventResult::DisableStream;
}
return ForwardEventResult::Continue;
}
match event_tx.try_send(event) {
Ok(()) => ForwardEventResult::Continue,
Err(mpsc::error::TrySendError::Full(event)) => {
*skipped_events = skipped_events.saturating_add(1);
warn!("dropping in-process app-server event because consumer queue is full");
if let InProcessServerEvent::ServerRequest(request) = event {
reject_server_request(request);
}
ForwardEventResult::Continue
}
Err(mpsc::error::TrySendError::Closed(_)) => ForwardEventResult::DisableStream,
}
}
/// Layered error for [`InProcessAppServerClient::request_typed`].
///
/// This keeps transport failures, server-side JSON-RPC failures, and response
@@ -366,83 +477,26 @@ impl InProcessAppServerClient {
continue;
}
if skipped_events > 0 {
if event_requires_delivery(&event) {
// Surface lag before the terminal event, but
// do not let the lag marker itself cause us to
// drop the completion/abort notification that
// the caller is blocked on.
if event_tx
.send(InProcessServerEvent::Lagged {
skipped: skipped_events,
})
.await
.is_err()
{
event_stream_enabled = false;
continue;
}
skipped_events = 0;
} else {
match event_tx.try_send(InProcessServerEvent::Lagged {
skipped: skipped_events,
}) {
Ok(()) => {
skipped_events = 0;
}
Err(mpsc::error::TrySendError::Full(_)) => {
skipped_events = skipped_events.saturating_add(1);
warn!(
"dropping in-process app-server event because consumer queue is full"
);
if let InProcessServerEvent::ServerRequest(request) = event {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full".to_string(),
data: None,
},
);
}
continue;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
event_stream_enabled = false;
continue;
}
}
}
}
if event_requires_delivery(&event) {
// Block until the consumer catches up for
// terminal notifications; this preserves the
// completion signal even when the queue is
// otherwise saturated.
if event_tx.send(event).await.is_err() {
event_stream_enabled = false;
}
continue;
}
match event_tx.try_send(event) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(event)) => {
skipped_events = skipped_events.saturating_add(1);
warn!("dropping in-process app-server event because consumer queue is full");
if let InProcessServerEvent::ServerRequest(request) = event {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full".to_string(),
data: None,
},
);
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
match forward_in_process_event(
&event_tx,
&mut skipped_events,
event,
|request| {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full"
.to_string(),
data: None,
},
);
},
)
.await
{
ForwardEventResult::Continue => {}
ForwardEventResult::DisableStream => {
event_stream_enabled = false;
}
}
@@ -933,6 +987,53 @@ mod tests {
.expect("message should send");
}
fn command_execution_output_delta_notification(delta: &str) -> ServerNotification {
ServerNotification::CommandExecutionOutputDelta(
codex_app_server_protocol::CommandExecutionOutputDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: delta.to_string(),
},
)
}
fn agent_message_delta_notification(delta: &str) -> ServerNotification {
ServerNotification::AgentMessageDelta(
codex_app_server_protocol::AgentMessageDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: delta.to_string(),
},
)
}
fn item_completed_notification(text: &str) -> ServerNotification {
ServerNotification::ItemCompleted(codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item: codex_app_server_protocol::ThreadItem::AgentMessage {
id: "item".to_string(),
text: text.to_string(),
phase: None,
memory_citation: None,
},
})
}
fn turn_completed_notification() -> ServerNotification {
ServerNotification::TurnCompleted(codex_app_server_protocol::TurnCompletedNotification {
thread_id: "thread".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn".to_string(),
items: Vec::new(),
status: codex_app_server_protocol::TurnStatus::Completed,
error: None,
},
})
}
fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs {
RemoteAppServerConnectArgs {
websocket_url,
@@ -1043,6 +1144,94 @@ mod tests {
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn forward_in_process_event_preserves_transcript_notifications_under_backpressure() {
let (event_tx, mut event_rx) = mpsc::channel(1);
event_tx
.send(InProcessServerEvent::ServerNotification(
command_execution_output_delta_notification("stdout-1"),
))
.await
.expect("initial event should enqueue");
let mut skipped_events = 0usize;
let result = forward_in_process_event(
&event_tx,
&mut skipped_events,
InProcessServerEvent::ServerNotification(command_execution_output_delta_notification(
"stdout-2",
)),
|_| {},
)
.await;
assert_eq!(result, ForwardEventResult::Continue);
assert_eq!(skipped_events, 1);
let receive_task = tokio::spawn(async move {
let mut events = Vec::new();
for _ in 0..5 {
events.push(
timeout(Duration::from_secs(2), event_rx.recv())
.await
.expect("event should arrive before timeout")
.expect("event stream should stay open"),
);
}
events
});
for notification in [
agent_message_delta_notification("hello"),
item_completed_notification("hello"),
turn_completed_notification(),
] {
let result = forward_in_process_event(
&event_tx,
&mut skipped_events,
InProcessServerEvent::ServerNotification(notification),
|_| {},
)
.await;
assert_eq!(result, ForwardEventResult::Continue);
}
assert_eq!(skipped_events, 0);
let events = receive_task
.await
.expect("receiver task should join successfully");
assert!(matches!(
&events[0],
InProcessServerEvent::ServerNotification(
ServerNotification::CommandExecutionOutputDelta(notification)
) if notification.delta == "stdout-1"
));
assert!(matches!(
&events[1],
InProcessServerEvent::Lagged { skipped: 1 }
));
assert!(matches!(
&events[2],
InProcessServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
notification
)) if notification.delta == "hello"
));
assert!(matches!(
&events[3],
InProcessServerEvent::ServerNotification(ServerNotification::ItemCompleted(
notification
)) if matches!(
&notification.item,
codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello"
)
));
assert!(matches!(
&events[4],
InProcessServerEvent::ServerNotification(ServerNotification::TurnCompleted(
notification
)) if notification.turn.status == codex_app_server_protocol::TurnStatus::Completed
));
}
#[tokio::test]
async fn remote_typed_request_roundtrip_works() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
@@ -1207,6 +1396,107 @@ mod tests {
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_backpressure_preserves_transcript_notifications() {
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
for notification in [
command_execution_output_delta_notification("stdout-1"),
command_execution_output_delta_notification("stdout-2"),
agent_message_delta_notification("hello"),
item_completed_notification("hello"),
turn_completed_notification(),
] {
write_websocket_message(
&mut websocket,
JSONRPCMessage::Notification(
serde_json::from_value(
serde_json::to_value(notification)
.expect("notification should serialize"),
)
.expect("notification should convert to JSON-RPC"),
),
)
.await;
}
let _ = done_rx.await;
})
.await;
let mut client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
websocket_url,
client_name: "codex-app-server-client-test".to_string(),
client_version: "0.0.0-test".to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: 1,
})
.await
.expect("remote client should connect");
let first_event = timeout(Duration::from_secs(2), client.next_event())
.await
.expect("first event should arrive before timeout")
.expect("event stream should stay open");
assert!(matches!(
first_event,
AppServerEvent::ServerNotification(ServerNotification::CommandExecutionOutputDelta(
notification
)) if notification.delta == "stdout-1"
));
let mut remaining_events = Vec::new();
for _ in 0..4 {
remaining_events.push(
timeout(Duration::from_secs(2), client.next_event())
.await
.expect("event should arrive before timeout")
.expect("event stream should stay open"),
);
}
let mut transcript_event_names = Vec::new();
for event in &remaining_events {
match event {
AppServerEvent::Lagged { skipped: 1 } => {}
AppServerEvent::ServerNotification(
ServerNotification::CommandExecutionOutputDelta(notification),
) if notification.delta == "stdout-2" => {}
AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
notification,
)) if notification.delta == "hello" => {
transcript_event_names.push("agent_message_delta");
}
AppServerEvent::ServerNotification(ServerNotification::ItemCompleted(
notification,
)) if matches!(
&notification.item,
codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello"
) =>
{
transcript_event_names.push("item_completed");
}
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(
notification,
)) if notification.turn.status
== codex_app_server_protocol::TurnStatus::Completed =>
{
transcript_event_names.push("turn_completed");
}
_ => panic!("unexpected remaining event: {event:?}"),
}
}
assert_eq!(
transcript_event_names,
vec!["agent_message_delta", "item_completed", "turn_completed"]
);
done_tx
.send(())
.expect("server completion signal should send");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_server_request_resolution_roundtrip_works() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
@@ -1446,7 +1736,7 @@ mod tests {
}
#[test]
fn event_requires_delivery_marks_terminal_events() {
fn event_requires_delivery_marks_transcript_and_terminal_events() {
assert!(event_requires_delivery(
&InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(
@@ -1462,9 +1752,49 @@ mod tests {
)
)
));
assert!(event_requires_delivery(
&InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::AgentMessageDelta(
codex_app_server_protocol::AgentMessageDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: "hello".to_string(),
}
)
)
));
assert!(event_requires_delivery(
&InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::ItemCompleted(
codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item: codex_app_server_protocol::ThreadItem::AgentMessage {
id: "item".to_string(),
text: "hello".to_string(),
phase: None,
memory_citation: None,
},
}
)
)
));
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
skipped: 1
}));
assert!(!event_requires_delivery(
&InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::CommandExecutionOutputDelta(
codex_app_server_protocol::CommandExecutionOutputDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: "stdout".to_string(),
}
)
)
));
}
#[tokio::test]

View File

@@ -21,6 +21,7 @@ use crate::RequestResult;
use crate::SHUTDOWN_TIMEOUT;
use crate::TypedRequestError;
use crate::request_method_name;
use crate::server_notification_requires_delivery;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
@@ -854,11 +855,11 @@ async fn reject_if_server_request_dropped(
fn event_requires_delivery(event: &AppServerEvent) -> bool {
match event {
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true,
AppServerEvent::ServerNotification(notification) => {
server_notification_requires_delivery(notification)
}
AppServerEvent::Disconnected { .. } => true,
AppServerEvent::Lagged { .. }
| AppServerEvent::ServerNotification(_)
| AppServerEvent::ServerRequest(_) => false,
AppServerEvent::Lagged { .. } | AppServerEvent::ServerRequest(_) => false,
}
}
@@ -905,3 +906,40 @@ async fn write_jsonrpc_message(
))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn event_requires_delivery_marks_transcript_and_disconnect_events() {
assert!(event_requires_delivery(
&AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
codex_app_server_protocol::AgentMessageDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: "hello".to_string(),
},
),)
));
assert!(event_requires_delivery(
&AppServerEvent::ServerNotification(ServerNotification::ItemCompleted(
codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item: codex_app_server_protocol::ThreadItem::Plan {
id: "item".to_string(),
text: "step".to_string(),
},
}
),)
));
assert!(event_requires_delivery(&AppServerEvent::Disconnected {
message: "closed".to_string(),
}));
assert!(!event_requires_delivery(&AppServerEvent::Lagged {
skipped: 1
}));
}
}