mirror of
https://github.com/openai/codex.git
synced 2026-05-16 09:12:54 +00:00
fix(app-server-client): skip canceled binary waiters
This commit is contained in:
@@ -338,7 +338,7 @@ impl RemoteAppServerClient {
|
||||
RemoteClientCommand::NextBinary { response_tx } => {
|
||||
if let Some(bytes) = pending_binary_packets.pop_front() {
|
||||
let _ = response_tx.send(Ok(bytes));
|
||||
} else {
|
||||
} else if !response_tx.is_closed() {
|
||||
pending_binary_waiters.push_back(response_tx);
|
||||
}
|
||||
}
|
||||
@@ -466,12 +466,11 @@ impl RemoteAppServerClient {
|
||||
break;
|
||||
}
|
||||
Some(Ok(Message::Binary(bytes))) => {
|
||||
let bytes = bytes.to_vec();
|
||||
if let Some(response_tx) = pending_binary_waiters.pop_front() {
|
||||
let _ = response_tx.send(Ok(bytes));
|
||||
} else {
|
||||
pending_binary_packets.push_back(bytes);
|
||||
}
|
||||
deliver_binary_packet(
|
||||
&mut pending_binary_waiters,
|
||||
&mut pending_binary_packets,
|
||||
bytes.to_vec(),
|
||||
);
|
||||
}
|
||||
Some(Ok(Message::Ping(_)))
|
||||
| Some(Ok(Message::Pong(_)))
|
||||
@@ -925,6 +924,21 @@ fn deliver_event(
|
||||
})
|
||||
}
|
||||
|
||||
fn deliver_binary_packet(
|
||||
pending_binary_waiters: &mut VecDeque<oneshot::Sender<IoResult<Vec<u8>>>>,
|
||||
pending_binary_packets: &mut VecDeque<Vec<u8>>,
|
||||
bytes: Vec<u8>,
|
||||
) {
|
||||
while let Some(response_tx) = pending_binary_waiters.pop_front() {
|
||||
if response_tx.is_closed() {
|
||||
continue;
|
||||
}
|
||||
let _ = response_tx.send(Ok(bytes));
|
||||
return;
|
||||
}
|
||||
pending_binary_packets.push_back(bytes);
|
||||
}
|
||||
|
||||
fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
|
||||
jsonrpc_request_from_client_request(request.clone()).id
|
||||
}
|
||||
@@ -1007,4 +1021,28 @@ mod tests {
|
||||
.await
|
||||
.expect("shutdown should complete when worker exits first");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_packets_skip_canceled_waiters() {
|
||||
let (canceled_tx, canceled_rx) = oneshot::channel();
|
||||
drop(canceled_rx);
|
||||
let (live_tx, live_rx) = oneshot::channel();
|
||||
let mut pending_binary_waiters = VecDeque::from([canceled_tx, live_tx]);
|
||||
let mut pending_binary_packets = VecDeque::new();
|
||||
|
||||
deliver_binary_packet(
|
||||
&mut pending_binary_waiters,
|
||||
&mut pending_binary_packets,
|
||||
vec![1, 2, 3],
|
||||
);
|
||||
|
||||
assert!(pending_binary_packets.is_empty());
|
||||
assert_eq!(
|
||||
live_rx
|
||||
.await
|
||||
.expect("live waiter should receive packet")
|
||||
.expect("binary packet should be delivered"),
|
||||
vec![1, 2, 3]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user