Discuss outbound routing fixes

This commit is contained in:
jif-oai
2026-02-10 14:15:50 +00:00
parent aee456ef18
commit f742019bec
3 changed files with 120 additions and 40 deletions

View File

@@ -83,7 +83,10 @@ enum OutboundControlEvent {
/// Remove state for a closed/disconnected connection.
Closed { connection_id: ConnectionId },
/// Mark the connection as initialized, enabling broadcast delivery.
Initialized { connection_id: ConnectionId },
Initialized {
connection_id: ConnectionId,
ready: oneshot::Sender<()>,
},
}
fn config_warning_from_error(
@@ -389,10 +392,14 @@ pub async fn run_main_with_transport(
OutboundControlEvent::Closed { connection_id } => {
outbound_connections.remove(&connection_id);
}
OutboundControlEvent::Initialized { connection_id } => {
OutboundControlEvent::Initialized {
connection_id,
ready,
} => {
if let Some(connection_state) = outbound_connections.get_mut(&connection_id) {
connection_state.initialized = true;
}
let _ = ready.send(());
}
}
}
@@ -474,12 +481,20 @@ pub async fn run_main_with_transport(
)
.await;
if !was_initialized && connection_state.session.initialized {
let (ready_tx, ready_rx) = oneshot::channel();
let send_result = outbound_control_tx
.send(OutboundControlEvent::Initialized { connection_id })
.send(OutboundControlEvent::Initialized {
connection_id,
ready: ready_tx,
})
.await;
if send_result.is_err() {
break;
}
if ready_rx.await.is_err() {
break;
}
processor.send_initialize_notifications().await;
}
}
JSONRPCMessage::Response(response) => {

View File

@@ -286,14 +286,6 @@ impl MessageProcessor {
self.outgoing.send_response(request_id, response).await;
session.initialized = true;
for notification in self.config_warnings.iter().cloned() {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(
notification,
))
.await;
}
return;
}
}
@@ -381,6 +373,14 @@ impl MessageProcessor {
self.codex_message_processor.thread_created_receiver()
}
pub(crate) async fn send_initialize_notifications(&self) {
for notification in self.config_warnings.iter().cloned() {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(notification))
.await;
}
}
pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) {
self.codex_message_processor
.try_attach_thread_listener(thread_id)

View File

@@ -6,7 +6,6 @@ use crate::outgoing_message::OutgoingError;
use crate::outgoing_message::OutgoingMessage;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use futures::SinkExt;
use futures::StreamExt;
use owo_colors::OwoColorize;
@@ -198,7 +197,9 @@ pub(crate) async fn start_stdio_connection(
&writer_tx_for_reader,
connection_id,
&line,
) {
)
.await
{
break;
}
}
@@ -317,7 +318,9 @@ async fn run_websocket_connection(
&writer_tx_for_reader,
connection_id,
&text,
) {
)
.await
{
break;
}
}
@@ -346,14 +349,16 @@ async fn run_websocket_connection(
.await;
}
fn forward_incoming_message(
async fn forward_incoming_message(
transport_event_tx: &mpsc::Sender<TransportEvent>,
writer: &mpsc::Sender<OutgoingMessage>,
connection_id: ConnectionId,
payload: &str,
) -> bool {
match serde_json::from_str::<JSONRPCMessage>(payload) {
Ok(message) => enqueue_incoming_message(transport_event_tx, writer, connection_id, message),
Ok(message) => {
enqueue_incoming_message(transport_event_tx, writer, connection_id, message).await
}
Err(err) => {
error!("Failed to deserialize JSONRPCMessage: {err}");
true
@@ -361,7 +366,7 @@ fn forward_incoming_message(
}
}
fn enqueue_incoming_message(
async fn enqueue_incoming_message(
transport_event_tx: &mpsc::Sender<TransportEvent>,
writer: &mpsc::Sender<OutgoingMessage>,
connection_id: ConnectionId,
@@ -379,32 +384,24 @@ fn enqueue_incoming_message(
message: JSONRPCMessage::Request(request),
})) => {
if writer
.try_send(overloaded_error_for_request(request))
.try_send(OutgoingMessage::Error(OutgoingError {
id: request.id,
error: JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "Server overloaded; retry later.".to_string(),
data: None,
},
}))
.is_err()
{
warn!("failed to enqueue overload response for connection: {connection_id:?}");
}
true
}
Err(mpsc::error::TrySendError::Full(TransportEvent::IncomingMessage { .. })) => {
warn!("dropping non-request incoming message because processor queue is full");
true
}
Err(mpsc::error::TrySendError::Full(_)) => true,
Err(mpsc::error::TrySendError::Full(event)) => transport_event_tx.send(event).await.is_ok(),
}
}
fn overloaded_error_for_request(request: JSONRPCRequest) -> OutgoingMessage {
OutgoingMessage::Error(OutgoingError {
id: request.id,
error: JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "Server overloaded; retry later.".to_string(),
data: None,
},
})
}
fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option<String> {
let value = match serde_json::to_value(outgoing_message) {
Ok(value) => value,
@@ -544,12 +541,9 @@ mod tests {
method: "config/read".to_string(),
params: Some(json!({ "includeLayers": false })),
});
assert!(enqueue_incoming_message(
&transport_event_tx,
&writer_tx,
connection_id,
request
));
assert!(
enqueue_incoming_message(&transport_event_tx, &writer_tx, connection_id, request).await
);
let queued_event = transport_event_rx
.recv()
@@ -582,4 +576,75 @@ mod tests {
})
);
}
#[tokio::test]
async fn enqueue_incoming_response_waits_instead_of_dropping_when_queue_is_full() {
let connection_id = ConnectionId(42);
let (transport_event_tx, mut transport_event_rx) = mpsc::channel(1);
let (writer_tx, _writer_rx) = mpsc::channel(1);
let first_message =
JSONRPCMessage::Notification(codex_app_server_protocol::JSONRPCNotification {
method: "initialized".to_string(),
params: None,
});
transport_event_tx
.send(TransportEvent::IncomingMessage {
connection_id,
message: first_message.clone(),
})
.await
.expect("queue should accept first message");
let response = JSONRPCMessage::Response(codex_app_server_protocol::JSONRPCResponse {
id: codex_app_server_protocol::RequestId::Integer(7),
result: json!({"ok": true}),
});
let transport_event_tx_for_enqueue = transport_event_tx.clone();
let writer_tx_for_enqueue = writer_tx.clone();
let enqueue_handle = tokio::spawn(async move {
enqueue_incoming_message(
&transport_event_tx_for_enqueue,
&writer_tx_for_enqueue,
connection_id,
response,
)
.await
});
let queued_event = transport_event_rx
.recv()
.await
.expect("first event should be dequeued");
match queued_event {
TransportEvent::IncomingMessage {
connection_id: queued_connection_id,
message,
} => {
assert_eq!(queued_connection_id, connection_id);
assert_eq!(message, first_message);
}
_ => panic!("expected queued incoming message"),
}
let enqueue_result = enqueue_handle.await.expect("enqueue task should not panic");
assert!(enqueue_result);
let forwarded_event = transport_event_rx
.recv()
.await
.expect("response should be forwarded instead of dropped");
match forwarded_event {
TransportEvent::IncomingMessage {
connection_id: queued_connection_id,
message:
JSONRPCMessage::Response(codex_app_server_protocol::JSONRPCResponse { id, result }),
} => {
assert_eq!(queued_connection_id, connection_id);
assert_eq!(id, codex_app_server_protocol::RequestId::Integer(7));
assert_eq!(result, json!({"ok": true}));
}
_ => panic!("expected forwarded response message"),
}
}
}