codex: fix CI failure on PR #16805

Stop the realtime test WebRTC server from hanging after the client closes.

Teach the scripted request loop to exit when the data channel or peer connection closes so compact-remote tests can unwind instead of timing out at shutdown.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-05 10:26:58 -07:00
parent 236891f5d3
commit 5d5305c5d4

View File

@@ -24,6 +24,7 @@ use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
use webrtc::track::track_remote::TrackRemote;
@@ -201,14 +202,24 @@ async fn start_session(
let (tx_data_channel, rx_data_channel) = oneshot::channel::<Arc<RTCDataChannel>>();
let tx_data_channel = Mutex::new(Some(tx_data_channel));
let tx_data_channel_request = tx_request.clone();
let connection_closed = Arc::new(Notify::new());
let on_data_channel_closed = Arc::clone(&connection_closed);
let on_peer_connection_closed = Arc::clone(&connection_closed);
peer_connection.on_data_channel(Box::new(move |data_channel| {
let tx_request = tx_data_channel_request.clone();
let on_data_channel_closed = Arc::clone(&on_data_channel_closed);
if let Ok(mut tx_data_channel) = tx_data_channel.lock()
&& let Some(tx_data_channel) = tx_data_channel.take()
{
let _ = tx_data_channel.send(Arc::clone(&data_channel));
}
data_channel.on_close(Box::new(move || {
let on_data_channel_closed = Arc::clone(&on_data_channel_closed);
Box::pin(async move {
on_data_channel_closed.notify_waiters();
})
}));
data_channel.on_message(Box::new(move |message: DataChannelMessage| {
let tx_request = tx_request.clone();
Box::pin(async move {
@@ -226,6 +237,19 @@ async fn start_session(
}));
Box::pin(async {})
}));
peer_connection.on_peer_connection_state_change(Box::new(move |state| {
let on_peer_connection_closed = Arc::clone(&on_peer_connection_closed);
Box::pin(async move {
if matches!(
state,
RTCPeerConnectionState::Closed
| RTCPeerConnectionState::Disconnected
| RTCPeerConnectionState::Failed
) {
on_peer_connection_closed.notify_waiters();
}
})
}));
register_remote_audio_handler(&peer_connection, tx_request.clone());
@@ -246,6 +270,7 @@ async fn start_session(
request_log_updated,
rx_request,
rx_data_channel,
connection_closed,
)
.await;
let _ = done_tx.send(());
@@ -349,6 +374,7 @@ async fn serve_scripted_requests(
request_log_updated: Arc<Notify>,
mut rx_request: mpsc::UnboundedReceiver<Value>,
rx_data_channel: oneshot::Receiver<Arc<RTCDataChannel>>,
connection_closed: Arc<Notify>,
) {
let Ok(Ok(data_channel)) = timeout(REALTIME_DATA_CHANNEL_TIMEOUT, rx_data_channel).await else {
return;
@@ -356,7 +382,14 @@ async fn serve_scripted_requests(
let mut scripted_requests = VecDeque::from(connection.requests);
while let Some(request_events) = scripted_requests.pop_front() {
let Some(body) = rx_request.recv().await else {
// WebRTC compact-remote tests often close the session before consuming every scripted
// request slot. Treat transport closure as end-of-script instead of waiting forever for
// another request that can no longer arrive.
let body = tokio::select! {
body = rx_request.recv() => body,
_ = connection_closed.notified() => None,
};
let Some(body) = body else {
break;
};
log_request(connection_index, body, &requests, &request_log_updated);