Plan outbound routing fixes

This commit is contained in:
jif-oai
2026-02-10 14:38:45 +00:00
parent bd89f60348
commit 9a64991e69
3 changed files with 44 additions and 50 deletions

View File

@@ -13,6 +13,7 @@ use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
@@ -70,23 +71,18 @@ pub use crate::transport::AppServerTransport;
/// - outbound loop: performs potentially slow writes to per-connection writers
///
/// `OutboundControlEvent` keeps those loops coordinated without sharing mutable
/// connection state directly. In particular, the outbound loop needs to know:
/// - when a connection opens/closes so it can route messages correctly
/// - when a connection becomes initialized so broadcast semantics remain unchanged
/// connection state directly. In particular, the outbound loop needs to know
/// when a connection opens/closes so it can route messages correctly.
enum OutboundControlEvent {
/// Register a new writer for an opened connection.
Opened {
connection_id: ConnectionId,
writer: mpsc::Sender<crate::outgoing_message::OutgoingMessage>,
initialized: Arc<AtomicBool>,
ready: oneshot::Sender<()>,
},
/// Remove state for a closed/disconnected connection.
Closed { connection_id: ConnectionId },
/// Mark the connection as initialized, enabling broadcast delivery.
Initialized {
connection_id: ConnectionId,
ready: oneshot::Sender<()>,
},
}
fn config_warning_from_error(
@@ -400,23 +396,18 @@ pub async fn run_main_with_transport(
OutboundControlEvent::Opened {
connection_id,
writer,
initialized,
ready,
} => {
outbound_connections.insert(connection_id, OutboundConnectionState::new(writer));
outbound_connections.insert(
connection_id,
OutboundConnectionState::new(writer, initialized),
);
let _ = ready.send(());
}
OutboundControlEvent::Closed { connection_id } => {
outbound_connections.remove(&connection_id);
}
OutboundControlEvent::Initialized {
connection_id,
ready,
} => {
if let Some(connection_state) = outbound_connections.get_mut(&connection_id) {
connection_state.initialized = true;
}
let _ = ready.send(());
}
}
}
}
@@ -451,11 +442,13 @@ pub async fn run_main_with_transport(
};
match event {
TransportEvent::ConnectionOpened { connection_id, writer } => {
let outbound_initialized = Arc::new(AtomicBool::new(false));
let (ready_tx, ready_rx) = oneshot::channel();
if outbound_control_tx
.send(OutboundControlEvent::Opened {
connection_id,
writer: writer.clone(),
initialized: Arc::clone(&outbound_initialized),
ready: ready_tx,
})
.await
@@ -466,7 +459,7 @@ pub async fn run_main_with_transport(
if ready_rx.await.is_err() {
break;
}
connections.insert(connection_id, ConnectionState::new());
connections.insert(connection_id, ConnectionState::new(outbound_initialized));
}
TransportEvent::ConnectionClosed { connection_id } => {
if outbound_control_tx
@@ -494,22 +487,10 @@ pub async fn run_main_with_transport(
connection_id,
request,
&mut connection_state.session,
&connection_state.outbound_initialized,
)
.await;
if !was_initialized && connection_state.session.initialized {
let (ready_tx, ready_rx) = oneshot::channel();
let send_result = outbound_control_tx
.send(OutboundControlEvent::Initialized {
connection_id,
ready: ready_tx,
})
.await;
if send_result.is_err() {
break;
}
if ready_rx.await.is_err() {
break;
}
processor.send_initialize_notifications().await;
}
}

View File

@@ -1,6 +1,8 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
@@ -191,6 +193,7 @@ impl MessageProcessor {
connection_id: ConnectionId,
request: JSONRPCRequest,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_id = ConnectionRequestId {
connection_id,
@@ -286,6 +289,7 @@ impl MessageProcessor {
self.outgoing.send_response(request_id, response).await;
session.initialized = true;
outbound_initialized.store(true, Ordering::Release);
return;
}
}

View File

@@ -17,6 +17,7 @@ use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use tokio::io::AsyncBufReadExt;
@@ -143,27 +144,29 @@ pub(crate) enum TransportEvent {
}
pub(crate) struct ConnectionState {
pub(crate) outbound_initialized: Arc<AtomicBool>,
pub(crate) session: ConnectionSessionState,
}
impl ConnectionState {
pub(crate) fn new() -> Self {
pub(crate) fn new(outbound_initialized: Arc<AtomicBool>) -> Self {
Self {
outbound_initialized,
session: ConnectionSessionState::default(),
}
}
}
pub(crate) struct OutboundConnectionState {
pub(crate) initialized: Arc<AtomicBool>,
pub(crate) writer: mpsc::Sender<OutgoingMessage>,
pub(crate) initialized: bool,
}
impl OutboundConnectionState {
pub(crate) fn new(writer: mpsc::Sender<OutgoingMessage>) -> Self {
pub(crate) fn new(writer: mpsc::Sender<OutgoingMessage>, initialized: Arc<AtomicBool>) -> Self {
Self {
initialized,
writer,
initialized: false,
}
}
}
@@ -383,20 +386,26 @@ async fn enqueue_incoming_message(
connection_id,
message: JSONRPCMessage::Request(request),
})) => {
if writer
.try_send(OutgoingMessage::Error(OutgoingError {
id: request.id,
error: JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "Server overloaded; retry later.".to_string(),
data: None,
},
}))
.is_err()
{
warn!("failed to enqueue overload response for connection: {connection_id:?}");
let overload_error = OutgoingMessage::Error(OutgoingError {
id: request.id,
error: JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "Server overloaded; retry later.".to_string(),
data: None,
},
});
match writer.try_send(overload_error) {
Ok(()) => true,
Err(mpsc::error::TrySendError::Closed(_)) => false,
Err(mpsc::error::TrySendError::Full(overload_error)) => {
if writer.send(overload_error).await.is_err() {
warn!("failed to send overload response for connection: {connection_id:?}");
false
} else {
true
}
}
}
true
}
Err(mpsc::error::TrySendError::Full(event)) => transport_event_tx.send(event).await.is_ok(),
}
@@ -445,7 +454,7 @@ pub(crate) async fn route_outgoing_envelope(
let target_connections: Vec<ConnectionId> = connections
.iter()
.filter_map(|(connection_id, connection_state)| {
if connection_state.initialized {
if connection_state.initialized.load(Ordering::Acquire) {
Some(*connection_id)
} else {
None