app-server: Organize app-server to allow more transports (#15810)

Make `run_main_with_transport` slightly more flexible by consolidating
logic spread across stdio and websocket transports.
This commit is contained in:
Ruslan Nigmatullin
2026-03-25 17:11:22 -07:00
committed by GitHub
parent 2794e27849
commit d7e35e56cf

View File

@@ -363,38 +363,6 @@ pub async fn run_main_with_transport(
let (outbound_control_tx, mut outbound_control_rx) =
mpsc::channel::<OutboundControlEvent>(CHANNEL_CAPACITY);
enum TransportRuntime {
Stdio,
WebSocket {
accept_handle: JoinHandle<()>,
shutdown_token: CancellationToken,
},
}
let mut stdio_handles = Vec::<JoinHandle<()>>::new();
let transport_runtime = match transport {
AppServerTransport::Stdio => {
start_stdio_connection(transport_event_tx.clone(), &mut stdio_handles).await?;
TransportRuntime::Stdio
}
AppServerTransport::WebSocket { bind_address } => {
let shutdown_token = CancellationToken::new();
let accept_handle = start_websocket_acceptor(
bind_address,
transport_event_tx.clone(),
shutdown_token.clone(),
policy_from_settings(&auth)?,
)
.await?;
TransportRuntime::WebSocket {
accept_handle,
shutdown_token,
}
}
};
let single_client_mode = matches!(&transport_runtime, TransportRuntime::Stdio);
let shutdown_when_no_connections = single_client_mode;
let graceful_signal_restart_enabled = !single_client_mode;
// Parse CLI overrides once and derive the base Config eagerly so later
// components do not need to work with raw TOML values.
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
@@ -558,6 +526,30 @@ pub async fn run_main_with_transport(
}
}
let transport_shutdown_token = CancellationToken::new();
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();
let single_client_mode = matches!(&transport, AppServerTransport::Stdio);
let shutdown_when_no_connections = single_client_mode;
let graceful_signal_restart_enabled = !single_client_mode;
match transport {
AppServerTransport::Stdio => {
start_stdio_connection(transport_event_tx.clone(), &mut transport_accept_handles)
.await?;
}
AppServerTransport::WebSocket { bind_address } => {
let accept_handle = start_websocket_acceptor(
bind_address,
transport_event_tx.clone(),
transport_shutdown_token.clone(),
policy_from_settings(&auth)?,
)
.await?;
transport_accept_handles.push(accept_handle);
}
}
let outbound_handle = tokio::spawn(async move {
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
loop {
@@ -635,10 +627,7 @@ pub async fn run_main_with_transport(
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
let websocket_accept_shutdown = match &transport_runtime {
TransportRuntime::WebSocket { shutdown_token, .. } => Some(shutdown_token.clone()),
TransportRuntime::Stdio => None,
};
let transport_shutdown_token = transport_shutdown_token.clone();
async move {
let mut listen_for_threads = true;
let mut shutdown_state = ShutdownState::default();
@@ -651,9 +640,7 @@ pub async fn run_main_with_transport(
shutdown_state.update(running_turn_count, connections.len()),
ShutdownAction::Finish
) {
if let Some(shutdown_token) = &websocket_accept_shutdown {
shutdown_token.cancel();
}
transport_shutdown_token.cancel();
let _ = outbound_control_tx
.send(OutboundControlEvent::DisconnectAll)
.await;
@@ -847,16 +834,8 @@ pub async fn run_main_with_transport(
let _ = processor_handle.await;
let _ = outbound_handle.await;
if let TransportRuntime::WebSocket {
accept_handle,
shutdown_token,
} = transport_runtime
{
shutdown_token.cancel();
let _ = accept_handle.await;
}
for handle in stdio_handles {
transport_shutdown_token.cancel();
for handle in transport_accept_handles {
let _ = handle.await;
}