mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Plan outbound router ordering fixes
This commit is contained in:
@@ -366,6 +366,7 @@ pub async fn run_main_with_transport(
|
||||
}
|
||||
}
|
||||
|
||||
let transport_event_tx_for_outbound = transport_event_tx.clone();
|
||||
let outbound_handle = tokio::spawn(async move {
|
||||
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
||||
loop {
|
||||
@@ -374,7 +375,22 @@ pub async fn run_main_with_transport(
|
||||
let Some(envelope) = envelope else {
|
||||
break;
|
||||
};
|
||||
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
||||
let disconnected_connections =
|
||||
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
||||
let mut should_exit = false;
|
||||
for connection_id in disconnected_connections {
|
||||
if transport_event_tx_for_outbound
|
||||
.send(TransportEvent::ConnectionClosed { connection_id })
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
should_exit = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if should_exit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
event = outbound_control_rx.recv() => {
|
||||
let Some(event) = event else {
|
||||
|
||||
@@ -422,7 +422,8 @@ fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option<Strin
|
||||
pub(crate) async fn route_outgoing_envelope(
|
||||
connections: &mut HashMap<ConnectionId, OutboundConnectionState>,
|
||||
envelope: OutgoingEnvelope,
|
||||
) {
|
||||
) -> Vec<ConnectionId> {
|
||||
let mut disconnected = Vec::new();
|
||||
match envelope {
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
@@ -433,10 +434,11 @@ pub(crate) async fn route_outgoing_envelope(
|
||||
"dropping message for disconnected connection: {:?}",
|
||||
connection_id
|
||||
);
|
||||
return;
|
||||
return disconnected;
|
||||
};
|
||||
if connection_state.writer.send(message).await.is_err() {
|
||||
connections.remove(&connection_id);
|
||||
disconnected.push(connection_id);
|
||||
}
|
||||
}
|
||||
OutgoingEnvelope::Broadcast { message } => {
|
||||
@@ -457,10 +459,12 @@ pub(crate) async fn route_outgoing_envelope(
|
||||
};
|
||||
if connection_state.writer.send(message.clone()).await.is_err() {
|
||||
connections.remove(&connection_id);
|
||||
disconnected.push(connection_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
disconnected
|
||||
}
|
||||
|
||||
pub(crate) fn has_initialized_connections(
|
||||
|
||||
Reference in New Issue
Block a user