fix(app-server): isolate staged upload forwarding

This commit is contained in:
Edward Frazer
2026-05-07 13:20:21 -07:00
parent 52d5ccb241
commit 80628db404
7 changed files with 57 additions and 25 deletions

View File

@@ -153,6 +153,7 @@ pub enum TransportEvent {
origin: ConnectionOrigin,
writer: mpsc::Sender<QueuedOutgoingMessage>,
binary_writer: Option<mpsc::Sender<Vec<u8>>>,
binary_reader: Option<mpsc::Receiver<Vec<u8>>>,
disconnect_sender: Option<CancellationToken>,
},
ConnectionClosed {

View File

@@ -164,6 +164,7 @@ impl ClientTracker {
origin: ConnectionOrigin::RemoteControl,
writer: writer_tx,
binary_writer: None,
binary_reader: None,
disconnect_sender: Some(disconnect_token.clone()),
})
.await?;

View File

@@ -35,6 +35,7 @@ pub async fn start_stdio_connection(
origin: ConnectionOrigin::Stdio,
writer: writer_tx,
binary_writer: None,
binary_reader: None,
disconnect_sender: None,
})
.await

View File

@@ -79,8 +79,12 @@ async fn control_socket_acceptor_upgrades_and_forwards_websocket_text_binary_mes
.await
.expect("connection opened event should arrive")
.expect("connection opened event");
let connection_id = match opened {
TransportEvent::ConnectionOpened { connection_id, .. } => connection_id,
let (connection_id, mut binary_reader) = match opened {
TransportEvent::ConnectionOpened {
connection_id,
binary_reader: Some(binary_reader),
..
} => (connection_id, binary_reader),
_ => panic!("expected connection opened event"),
};
@@ -116,20 +120,11 @@ async fn control_socket_acceptor_upgrades_and_forwards_websocket_text_binary_mes
.send(WebSocketMessage::Binary(Bytes::from_static(b"sftp")))
.await
.expect("binary payload should send");
let incoming_binary = timeout(Duration::from_secs(1), transport_event_rx.recv())
let incoming_binary = timeout(Duration::from_secs(1), binary_reader.recv())
.await
.expect("incoming binary event should arrive")
.expect("incoming binary event");
assert_eq!(
match incoming_binary {
TransportEvent::IncomingBinary {
connection_id: incoming_connection_id,
bytes,
} => (incoming_connection_id, bytes),
_ => panic!("expected incoming binary event"),
},
(connection_id, b"sftp".to_vec())
);
.expect("incoming binary payload should arrive")
.expect("incoming binary payload");
assert_eq!(incoming_binary, b"sftp".to_vec());
websocket
.send(WebSocketMessage::Ping(Bytes::from_static(b"check")))

View File

@@ -183,6 +183,7 @@ pub(crate) async fn run_websocket_connection<M, SinkError, StreamError>(
mpsc::channel::<QueuedOutgoingMessage>(WEBSOCKET_OUTBOUND_CHANNEL_CAPACITY);
let writer_tx_for_reader = writer_tx.clone();
let (binary_writer_tx, binary_writer_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
let (binary_reader_tx, binary_reader_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
let disconnect_token = CancellationToken::new();
if transport_event_tx
.send(TransportEvent::ConnectionOpened {
@@ -190,6 +191,7 @@ pub(crate) async fn run_websocket_connection<M, SinkError, StreamError>(
origin: ConnectionOrigin::WebSocket,
writer: writer_tx,
binary_writer: Some(binary_writer_tx.clone()),
binary_reader: Some(binary_reader_rx),
disconnect_sender: Some(disconnect_token.clone()),
})
.await
@@ -211,6 +213,7 @@ pub(crate) async fn run_websocket_connection<M, SinkError, StreamError>(
transport_event_tx.clone(),
writer_tx_for_reader,
writer_control_tx,
binary_reader_tx,
connection_id,
disconnect_token.clone(),
));
@@ -353,6 +356,7 @@ async fn run_websocket_inbound_loop<M, StreamError>(
transport_event_tx: mpsc::Sender<TransportEvent>,
writer_tx_for_reader: mpsc::Sender<QueuedOutgoingMessage>,
writer_control_tx: mpsc::Sender<M>,
binary_reader_tx: mpsc::Sender<Vec<u8>>,
connection_id: ConnectionId,
disconnect_token: CancellationToken,
) where
@@ -393,14 +397,7 @@ async fn run_websocket_inbound_loop<M, StreamError>(
Some(IncomingWebSocketMessage::Pong) => {}
Some(IncomingWebSocketMessage::Close) => break,
Some(IncomingWebSocketMessage::Binary(bytes)) => {
if transport_event_tx
.send(TransportEvent::IncomingBinary {
connection_id,
bytes,
})
.await
.is_err()
{
if binary_reader_tx.send(bytes).await.is_err() {
break;
}
}

View File

@@ -829,6 +829,7 @@ pub async fn run_main_with_transport_options(
origin,
writer,
binary_writer,
binary_reader,
disconnect_sender,
} => {
let outbound_initialized = Arc::new(AtomicBool::new(false));
@@ -854,6 +855,28 @@ pub async fn run_main_with_transport_options(
{
break;
}
if let (Some(binary_writer), Some(mut binary_reader)) =
(binary_writer.clone(), binary_reader)
{
let processor = Arc::clone(&processor);
tokio::spawn(async move {
while let Some(bytes) = binary_reader.recv().await {
if let Err(err) = processor
.process_upload_binary(
connection_id,
binary_writer.clone(),
bytes,
)
.await
{
warn!(
"failed to process upload binary payload: {}",
err.message
);
}
}
});
}
connections.insert(
connection_id,
ConnectionState::new(

View File

@@ -145,7 +145,7 @@ impl FsRequestProcessor {
} else {
let lane = UploadSftpLane::start(
self.upload_paths_for_connection(connection_id).await,
binary_writer,
binary_writer.clone(),
)
.await;
self.upload_sftp_lanes
@@ -155,7 +155,21 @@ impl FsRequestProcessor {
.or_insert_with(|| lane.clone())
.clone()
};
lane.send(bytes).await
match lane.send(bytes.clone()).await {
Ok(()) => Ok(()),
Err(_) => {
let lane = UploadSftpLane::start(
self.upload_paths_for_connection(connection_id).await,
binary_writer,
)
.await;
self.upload_sftp_lanes
.lock()
.await
.insert(connection_id, lane.clone());
lane.send(bytes).await
}
}
}
async fn upload_paths_for_connection(&self, connection_id: ConnectionId) -> UploadPaths {