mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
app-server: refactor ctrl-c restart loop
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -102,6 +102,48 @@ enum OutboundControlEvent {
|
||||
DisconnectAll,
|
||||
}
|
||||
|
||||
async fn maybe_finish_ctrl_c_restart(
|
||||
restart_requested: bool,
|
||||
restart_forced: bool,
|
||||
running_turn_count: usize,
|
||||
last_logged_running_turn_count: &mut Option<usize>,
|
||||
connection_count: usize,
|
||||
websocket_accept_shutdown: Option<&CancellationToken>,
|
||||
outbound_control_tx: &mpsc::Sender<OutboundControlEvent>,
|
||||
) -> bool {
|
||||
if !restart_requested {
|
||||
return false;
|
||||
}
|
||||
|
||||
if restart_forced || running_turn_count == 0 {
|
||||
if restart_forced {
|
||||
info!(
|
||||
"received second Ctrl-C; forcing restart with {running_turn_count} running assistant turn(s) and {connection_count} connection(s)"
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Ctrl-C restart: no assistant turns running; stopping acceptor and disconnecting {connection_count} connection(s)"
|
||||
);
|
||||
}
|
||||
if let Some(shutdown_token) = websocket_accept_shutdown {
|
||||
shutdown_token.cancel();
|
||||
}
|
||||
let _ = outbound_control_tx
|
||||
.send(OutboundControlEvent::DisconnectAll)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
|
||||
if *last_logged_running_turn_count != Some(running_turn_count) {
|
||||
info!(
|
||||
"Ctrl-C restart: waiting for {running_turn_count} running assistant turn(s) to finish"
|
||||
);
|
||||
*last_logged_running_turn_count = Some(running_turn_count);
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn config_warning_from_error(
|
||||
summary: impl Into<String>,
|
||||
err: &std::io::Error,
|
||||
@@ -255,27 +297,35 @@ pub async fn run_main_with_transport(
|
||||
let (outbound_control_tx, mut outbound_control_rx) =
|
||||
mpsc::channel::<OutboundControlEvent>(CHANNEL_CAPACITY);
|
||||
|
||||
enum TransportRuntime {
|
||||
Stdio,
|
||||
WebSocket {
|
||||
accept_handle: JoinHandle<()>,
|
||||
shutdown_token: CancellationToken,
|
||||
},
|
||||
}
|
||||
|
||||
let mut stdio_handles = Vec::<JoinHandle<()>>::new();
|
||||
let mut websocket_accept_handle = None;
|
||||
let mut websocket_accept_shutdown = None;
|
||||
match transport {
|
||||
let transport_runtime = match transport {
|
||||
AppServerTransport::Stdio => {
|
||||
start_stdio_connection(transport_event_tx.clone(), &mut stdio_handles).await?;
|
||||
TransportRuntime::Stdio
|
||||
}
|
||||
AppServerTransport::WebSocket { bind_address } => {
|
||||
let shutdown_token = CancellationToken::new();
|
||||
websocket_accept_handle = Some(
|
||||
start_websocket_acceptor(
|
||||
bind_address,
|
||||
transport_event_tx.clone(),
|
||||
shutdown_token.clone(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
websocket_accept_shutdown = Some(shutdown_token);
|
||||
let accept_handle = start_websocket_acceptor(
|
||||
bind_address,
|
||||
transport_event_tx.clone(),
|
||||
shutdown_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
TransportRuntime::WebSocket {
|
||||
accept_handle,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
}
|
||||
let single_client_mode = matches!(transport, AppServerTransport::Stdio);
|
||||
};
|
||||
let single_client_mode = matches!(&transport_runtime, TransportRuntime::Stdio);
|
||||
let shutdown_when_no_connections = single_client_mode;
|
||||
let graceful_ctrl_c_restart_enabled = !single_client_mode;
|
||||
|
||||
@@ -488,42 +538,32 @@ pub async fn run_main_with_transport(
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
|
||||
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
|
||||
let websocket_accept_shutdown = websocket_accept_shutdown.clone();
|
||||
let websocket_accept_shutdown = match &transport_runtime {
|
||||
TransportRuntime::WebSocket { shutdown_token, .. } => Some(shutdown_token.clone()),
|
||||
TransportRuntime::Stdio => None,
|
||||
};
|
||||
async move {
|
||||
let mut listen_for_threads = true;
|
||||
let mut restart_requested = false;
|
||||
let mut restart_forced = false;
|
||||
let mut last_logged_running_turn_count = None;
|
||||
loop {
|
||||
if restart_requested {
|
||||
let running_turn_count = *running_turn_count_rx.borrow();
|
||||
if restart_forced || running_turn_count == 0 {
|
||||
if restart_forced {
|
||||
info!(
|
||||
"received second Ctrl-C; forcing restart with {} running assistant turn(s) and {} connection(s)",
|
||||
running_turn_count,
|
||||
connections.len()
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Ctrl-C restart: no assistant turns running; stopping acceptor and disconnecting {} connection(s)",
|
||||
connections.len()
|
||||
);
|
||||
}
|
||||
if let Some(shutdown_token) = &websocket_accept_shutdown {
|
||||
shutdown_token.cancel();
|
||||
}
|
||||
let _ = outbound_control_tx
|
||||
.send(OutboundControlEvent::DisconnectAll)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
if last_logged_running_turn_count != Some(running_turn_count) {
|
||||
info!(
|
||||
"Ctrl-C restart: waiting for {running_turn_count} running assistant turn(s) to finish"
|
||||
);
|
||||
last_logged_running_turn_count = Some(running_turn_count);
|
||||
}
|
||||
let running_turn_count = {
|
||||
let running_turn_count = running_turn_count_rx.borrow();
|
||||
*running_turn_count
|
||||
};
|
||||
if maybe_finish_ctrl_c_restart(
|
||||
restart_requested,
|
||||
restart_forced,
|
||||
running_turn_count,
|
||||
&mut last_logged_running_turn_count,
|
||||
connections.len(),
|
||||
websocket_accept_shutdown.as_ref(),
|
||||
&outbound_control_tx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
@@ -699,11 +739,13 @@ pub async fn run_main_with_transport(
|
||||
let _ = processor_handle.await;
|
||||
let _ = outbound_handle.await;
|
||||
|
||||
if let Some(shutdown_token) = websocket_accept_shutdown {
|
||||
if let TransportRuntime::WebSocket {
|
||||
accept_handle,
|
||||
shutdown_token,
|
||||
} = transport_runtime
|
||||
{
|
||||
shutdown_token.cancel();
|
||||
}
|
||||
if let Some(handle) = websocket_accept_handle {
|
||||
let _ = handle.await;
|
||||
let _ = accept_handle.await;
|
||||
}
|
||||
|
||||
for handle in stdio_handles {
|
||||
|
||||
Reference in New Issue
Block a user