From 2ef43576a847442944df2130dfcda66f1bf94d3e Mon Sep 17 00:00:00 2001 From: Edward Frazer Date: Thu, 7 May 2026 13:52:10 -0700 Subject: [PATCH] fix(app-server-client): bound queued binary frames --- codex-rs/app-server-client/src/remote.rs | 41 +++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/codex-rs/app-server-client/src/remote.rs b/codex-rs/app-server-client/src/remote.rs index 9bc0061197..1552300f93 100644 --- a/codex-rs/app-server-client/src/remote.rs +++ b/codex-rs/app-server-client/src/remote.rs @@ -466,11 +466,24 @@ impl RemoteAppServerClient { break; } Some(Ok(Message::Binary(bytes))) => { - deliver_binary_packet( + if !deliver_binary_packet( &mut pending_binary_waiters, &mut pending_binary_packets, bytes.to_vec(), - ); + channel_capacity, + ) { + let message = format!( + "remote app server at `{websocket_url}` sent too many pending binary frames" + ); + let _ = deliver_event( + &event_tx, + AppServerEvent::Disconnected { + message: message.clone(), + }, + ); + worker_exit_error = Some((ErrorKind::InvalidData, message)); + break; + } } Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) @@ -928,15 +941,20 @@ fn deliver_binary_packet( pending_binary_waiters: &mut VecDeque>>>, pending_binary_packets: &mut VecDeque>, bytes: Vec, -) { + max_pending_binary_packets: usize, +) -> bool { while let Some(response_tx) = pending_binary_waiters.pop_front() { if response_tx.is_closed() { continue; } let _ = response_tx.send(Ok(bytes)); - return; + return true; + } + if pending_binary_packets.len() >= max_pending_binary_packets { + return false; } pending_binary_packets.push_back(bytes); + true } fn request_id_from_client_request(request: &ClientRequest) -> RequestId { @@ -1034,6 +1052,7 @@ mod tests { &mut pending_binary_waiters, &mut pending_binary_packets, vec![1, 2, 3], + 1, ); assert!(pending_binary_packets.is_empty()); @@ -1045,4 +1064,18 @@ mod tests { vec![1, 2, 3] ); } + + #[test] + fn binary_packets_reject_queue_overflow() { + let mut pending_binary_waiters = VecDeque::new(); + let mut pending_binary_packets = VecDeque::from([vec![1, 2, 3]]); + + assert!(!deliver_binary_packet( + &mut pending_binary_waiters, + &mut pending_binary_packets, + vec![4, 5, 6], + 1, + )); + assert_eq!(pending_binary_packets, VecDeque::from([vec![1, 2, 3]])); + } }