From 80628db40413e1d304cdc745503497050ba0fe96 Mon Sep 17 00:00:00 2001 From: Edward Frazer Date: Thu, 7 May 2026 13:20:21 -0700 Subject: [PATCH] fix(app-server): isolate staged upload forwarding --- .../app-server-transport/src/transport/mod.rs | 1 + .../remote_control/client_tracker.rs | 1 + .../src/transport/stdio.rs | 1 + .../src/transport/unix_socket_tests.rs | 25 ++++++++----------- .../src/transport/websocket.rs | 13 ++++------ codex-rs/app-server/src/lib.rs | 23 +++++++++++++++++ .../src/request_processors/fs_processor.rs | 18 +++++++++++-- 7 files changed, 57 insertions(+), 25 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/mod.rs b/codex-rs/app-server-transport/src/transport/mod.rs index 4ea252a443..903dd22eef 100644 --- a/codex-rs/app-server-transport/src/transport/mod.rs +++ b/codex-rs/app-server-transport/src/transport/mod.rs @@ -153,6 +153,7 @@ pub enum TransportEvent { origin: ConnectionOrigin, writer: mpsc::Sender, binary_writer: Option>>, + binary_reader: Option>>, disconnect_sender: Option, }, ConnectionClosed { diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index 4f6f6552ed..11978f7a44 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -164,6 +164,7 @@ impl ClientTracker { origin: ConnectionOrigin::RemoteControl, writer: writer_tx, binary_writer: None, + binary_reader: None, disconnect_sender: Some(disconnect_token.clone()), }) .await?; diff --git a/codex-rs/app-server-transport/src/transport/stdio.rs b/codex-rs/app-server-transport/src/transport/stdio.rs index 4425db7f89..3364ec7150 100644 --- a/codex-rs/app-server-transport/src/transport/stdio.rs +++ b/codex-rs/app-server-transport/src/transport/stdio.rs @@ -35,6 +35,7 @@ pub async fn start_stdio_connection( origin: ConnectionOrigin::Stdio, writer: writer_tx, binary_writer: None, + binary_reader: None, disconnect_sender: None, }) .await diff --git a/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs b/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs index 9f1df5d2eb..fc12cda562 100644 --- a/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs +++ b/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs @@ -79,8 +79,12 @@ async fn control_socket_acceptor_upgrades_and_forwards_websocket_text_binary_mes .await .expect("connection opened event should arrive") .expect("connection opened event"); - let connection_id = match opened { - TransportEvent::ConnectionOpened { connection_id, .. } => connection_id, + let (connection_id, mut binary_reader) = match opened { + TransportEvent::ConnectionOpened { + connection_id, + binary_reader: Some(binary_reader), + .. + } => (connection_id, binary_reader), _ => panic!("expected connection opened event"), }; @@ -116,20 +120,11 @@ async fn control_socket_acceptor_upgrades_and_forwards_websocket_text_binary_mes .send(WebSocketMessage::Binary(Bytes::from_static(b"sftp"))) .await .expect("binary payload should send"); - let incoming_binary = timeout(Duration::from_secs(1), transport_event_rx.recv()) + let incoming_binary = timeout(Duration::from_secs(1), binary_reader.recv()) .await - .expect("incoming binary event should arrive") - .expect("incoming binary event"); - assert_eq!( - match incoming_binary { - TransportEvent::IncomingBinary { - connection_id: incoming_connection_id, - bytes, - } => (incoming_connection_id, bytes), - _ => panic!("expected incoming binary event"), - }, - (connection_id, b"sftp".to_vec()) - ); + .expect("incoming binary payload should arrive") + .expect("incoming binary payload"); + assert_eq!(incoming_binary, b"sftp".to_vec()); websocket .send(WebSocketMessage::Ping(Bytes::from_static(b"check"))) diff --git a/codex-rs/app-server-transport/src/transport/websocket.rs b/codex-rs/app-server-transport/src/transport/websocket.rs index 39245e933d..283a79c644 100644 --- a/codex-rs/app-server-transport/src/transport/websocket.rs +++ b/codex-rs/app-server-transport/src/transport/websocket.rs @@ -183,6 +183,7 @@ pub(crate) async fn run_websocket_connection( mpsc::channel::(WEBSOCKET_OUTBOUND_CHANNEL_CAPACITY); let writer_tx_for_reader = writer_tx.clone(); let (binary_writer_tx, binary_writer_rx) = mpsc::channel::>(CHANNEL_CAPACITY); + let (binary_reader_tx, binary_reader_rx) = mpsc::channel::>(CHANNEL_CAPACITY); let disconnect_token = CancellationToken::new(); if transport_event_tx .send(TransportEvent::ConnectionOpened { @@ -190,6 +191,7 @@ pub(crate) async fn run_websocket_connection( origin: ConnectionOrigin::WebSocket, writer: writer_tx, binary_writer: Some(binary_writer_tx.clone()), + binary_reader: Some(binary_reader_rx), disconnect_sender: Some(disconnect_token.clone()), }) .await @@ -211,6 +213,7 @@ pub(crate) async fn run_websocket_connection( transport_event_tx.clone(), writer_tx_for_reader, writer_control_tx, + binary_reader_tx, connection_id, disconnect_token.clone(), )); @@ -353,6 +356,7 @@ async fn run_websocket_inbound_loop( transport_event_tx: mpsc::Sender, writer_tx_for_reader: mpsc::Sender, writer_control_tx: mpsc::Sender, + binary_reader_tx: mpsc::Sender>, connection_id: ConnectionId, disconnect_token: CancellationToken, ) where @@ -393,14 +397,7 @@ async fn run_websocket_inbound_loop( Some(IncomingWebSocketMessage::Pong) => {} Some(IncomingWebSocketMessage::Close) => break, Some(IncomingWebSocketMessage::Binary(bytes)) => { - if transport_event_tx - .send(TransportEvent::IncomingBinary { - connection_id, - bytes, - }) - .await - .is_err() - { + if binary_reader_tx.send(bytes).await.is_err() { break; } } diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index ae32585065..20d49a80c3 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -829,6 +829,7 @@ pub async fn run_main_with_transport_options( origin, writer, binary_writer, + binary_reader, disconnect_sender, } => { let outbound_initialized = Arc::new(AtomicBool::new(false)); @@ -854,6 +855,28 @@ pub async fn run_main_with_transport_options( { break; } + if let (Some(binary_writer), Some(mut binary_reader)) = + (binary_writer.clone(), binary_reader) + { + let processor = Arc::clone(&processor); + tokio::spawn(async move { + while let Some(bytes) = binary_reader.recv().await { + if let Err(err) = processor + .process_upload_binary( + connection_id, + binary_writer.clone(), + bytes, + ) + .await + { + warn!( + "failed to process upload binary payload: {}", + err.message + ); + } + } + }); + } connections.insert( connection_id, ConnectionState::new( diff --git a/codex-rs/app-server/src/request_processors/fs_processor.rs b/codex-rs/app-server/src/request_processors/fs_processor.rs index 70e740d91a..a22d51fecf 100644 --- a/codex-rs/app-server/src/request_processors/fs_processor.rs +++ b/codex-rs/app-server/src/request_processors/fs_processor.rs @@ -145,7 +145,7 @@ impl FsRequestProcessor { } else { let lane = UploadSftpLane::start( self.upload_paths_for_connection(connection_id).await, - binary_writer, + binary_writer.clone(), ) .await; self.upload_sftp_lanes @@ -155,7 +155,21 @@ impl FsRequestProcessor { .or_insert_with(|| lane.clone()) .clone() }; - lane.send(bytes).await + match lane.send(bytes.clone()).await { + Ok(()) => Ok(()), + Err(_) => { + let lane = UploadSftpLane::start( + self.upload_paths_for_connection(connection_id).await, + binary_writer, + ) + .await; + self.upload_sftp_lanes + .lock() + .await + .insert(connection_id, lane.clone()); + lane.send(bytes).await + } + } } async fn upload_paths_for_connection(&self, connection_id: ConnectionId) -> UploadPaths {