diff --git a/codex-rs/app-server-client/src/remote.rs b/codex-rs/app-server-client/src/remote.rs index c43988ad8e..9bc0061197 100644 --- a/codex-rs/app-server-client/src/remote.rs +++ b/codex-rs/app-server-client/src/remote.rs @@ -338,7 +338,7 @@ impl RemoteAppServerClient { RemoteClientCommand::NextBinary { response_tx } => { if let Some(bytes) = pending_binary_packets.pop_front() { let _ = response_tx.send(Ok(bytes)); - } else { + } else if !response_tx.is_closed() { pending_binary_waiters.push_back(response_tx); } } @@ -466,12 +466,11 @@ impl RemoteAppServerClient { break; } Some(Ok(Message::Binary(bytes))) => { - let bytes = bytes.to_vec(); - if let Some(response_tx) = pending_binary_waiters.pop_front() { - let _ = response_tx.send(Ok(bytes)); - } else { - pending_binary_packets.push_back(bytes); - } + deliver_binary_packet( + &mut pending_binary_waiters, + &mut pending_binary_packets, + bytes.to_vec(), + ); } Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) @@ -925,6 +924,21 @@ fn deliver_event( }) } +fn deliver_binary_packet( + pending_binary_waiters: &mut VecDeque>>>, + pending_binary_packets: &mut VecDeque>, + bytes: Vec, +) { + while let Some(response_tx) = pending_binary_waiters.pop_front() { + if response_tx.is_closed() { + continue; + } + let _ = response_tx.send(Ok(bytes)); + return; + } + pending_binary_packets.push_back(bytes); +} + fn request_id_from_client_request(request: &ClientRequest) -> RequestId { jsonrpc_request_from_client_request(request.clone()).id } @@ -1007,4 +1021,28 @@ mod tests { .await .expect("shutdown should complete when worker exits first"); } + + #[tokio::test] + async fn binary_packets_skip_canceled_waiters() { + let (canceled_tx, canceled_rx) = oneshot::channel(); + drop(canceled_rx); + let (live_tx, live_rx) = oneshot::channel(); + let mut pending_binary_waiters = VecDeque::from([canceled_tx, live_tx]); + let mut pending_binary_packets = VecDeque::new(); + + deliver_binary_packet( + &mut pending_binary_waiters, + &mut pending_binary_packets, + vec![1, 2, 3], + ); + + assert!(pending_binary_packets.is_empty()); + assert_eq!( + live_rx + .await + .expect("live waiter should receive packet") + .expect("binary packet should be delivered"), + vec![1, 2, 3] + ); + } }