mirror of
https://github.com/openai/codex.git
synced 2026-05-28 15:00:16 +00:00
fix(app-server): speed up shutdown (#23578)
## Why Pressing `Ctrl+C` or `Ctrl+D` in the TUI could make Codex pause during shutdown when app-server background work still held outbound sender clones. Shutdown tracing against the current `~/.codex` path found three relevant holders: - `SkillsWatcher` kept its event-loop task alive until the shutdown timeout path. - `AppServerAttestationProvider` retained a strong `Arc<OutgoingMessageSender>`, which could keep outbound teardown waiting after the processor task had exited. - A background `apps/list` task could still own an outbound sender when shutdown began, causing the in-process app-server runtime to wait for its outbound channel to close. ## What Changed - Give `SkillsWatcher` an explicit shutdown `CancellationToken` and cancel it from app-server teardown so its event loop drops the outbound sender promptly. - Change `AppServerAttestationProvider` to keep a `Weak<OutgoingMessageSender>` and return immediately when it can no longer be upgraded. - Give `AppsRequestProcessor` a shutdown `CancellationToken` and cancel in-flight background `apps/list` work during teardown. ## How to Test 1. Start Codex TUI from a real home configuration. 2. Press `Ctrl+C`. 3. Confirm Codex exits promptly instead of pausing during shutdown. 4. Repeat with `Ctrl+D` and confirm the same prompt exit path. Focused manual trace validation from the investigation: - Before the full fix, reproduced shutdown traces showed outbound teardown waiting on lingering owners, including `attestation.provider=1` and later `apps.list.task=1`. - After the fix, fresh real-home `Ctrl+D` traces showed `app_server.runtime.outbound_state_after_processor_join` with `owners=none`, `app_server.runtime.wait_outbound_handle = 0ms`, and total TUI app-server shutdown around `18ms`. Targeted validation: - `RUST_MIN_STACK=8388608 cargo test -p codex-app-server`
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
|
||||
use axum::http::HeaderValue;
|
||||
use codex_app_server_protocol::AttestationGenerateParams;
|
||||
@@ -22,13 +23,13 @@ pub(crate) fn app_server_attestation_provider(
|
||||
thread_state_manager: ThreadStateManager,
|
||||
) -> Arc<dyn AttestationProvider> {
|
||||
Arc::new(AppServerAttestationProvider {
|
||||
outgoing,
|
||||
outgoing: Arc::downgrade(&outgoing),
|
||||
thread_state_manager,
|
||||
})
|
||||
}
|
||||
|
||||
struct AppServerAttestationProvider {
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
outgoing: Weak<OutgoingMessageSender>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
}
|
||||
|
||||
@@ -42,7 +43,9 @@ impl std::fmt::Debug for AppServerAttestationProvider {
|
||||
|
||||
impl AttestationProvider for AppServerAttestationProvider {
|
||||
fn header_for_request(&self, context: AttestationContext) -> GenerateAttestationFuture<'_> {
|
||||
let outgoing = self.outgoing.clone();
|
||||
let Some(outgoing) = self.outgoing.upgrade() else {
|
||||
return Box::pin(async { None });
|
||||
};
|
||||
let thread_state_manager = self.thread_state_manager.clone();
|
||||
Box::pin(async move {
|
||||
request_attestation_header_value_with_timeout(
|
||||
|
||||
@@ -85,6 +85,7 @@ use tokio::sync::broadcast;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
|
||||
const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
@@ -159,6 +160,7 @@ impl ExternalAuth for ExternalAuthRefreshBridge {
|
||||
|
||||
pub(crate) struct MessageProcessor {
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
skills_watcher: Arc<SkillsWatcher>,
|
||||
account_processor: AccountRequestProcessor,
|
||||
apps_processor: AppsRequestProcessor,
|
||||
catalog_processor: CatalogRequestProcessor,
|
||||
@@ -330,6 +332,7 @@ impl MessageProcessor {
|
||||
let thread_list_state_permit = Arc::new(Semaphore::new(/*permits*/ 1));
|
||||
let workspace_settings_cache =
|
||||
Arc::new(workspace_settings::WorkspaceSettingsCache::default());
|
||||
let app_list_shutdown_token = CancellationToken::new();
|
||||
let account_processor = AccountRequestProcessor::new(
|
||||
auth_manager.clone(),
|
||||
Arc::clone(&thread_manager),
|
||||
@@ -343,6 +346,7 @@ impl MessageProcessor {
|
||||
outgoing.clone(),
|
||||
config_manager.clone(),
|
||||
Arc::clone(&workspace_settings_cache),
|
||||
app_list_shutdown_token,
|
||||
);
|
||||
let catalog_processor = CatalogRequestProcessor::new(
|
||||
auth_manager.clone(),
|
||||
@@ -477,6 +481,7 @@ impl MessageProcessor {
|
||||
|
||||
Self {
|
||||
outgoing,
|
||||
skills_watcher,
|
||||
account_processor,
|
||||
apps_processor,
|
||||
catalog_processor,
|
||||
@@ -504,6 +509,8 @@ impl MessageProcessor {
|
||||
|
||||
pub(crate) fn clear_runtime_references(&self) {
|
||||
self.account_processor.clear_external_auth();
|
||||
self.apps_processor.shutdown();
|
||||
self.skills_watcher.shutdown();
|
||||
}
|
||||
|
||||
pub(crate) async fn process_request(
|
||||
|
||||
@@ -429,6 +429,7 @@ use tokio::sync::broadcast;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::sync::DropGuard;
|
||||
use tokio_util::task::TaskTracker;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::Instrument;
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use super::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AppsRequestProcessor {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
config_manager: ConfigManager,
|
||||
workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
|
||||
shutdown_token: CancellationToken,
|
||||
_shutdown_drop_guard: DropGuard,
|
||||
}
|
||||
|
||||
impl AppsRequestProcessor {
|
||||
@@ -16,13 +17,17 @@ impl AppsRequestProcessor {
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
config_manager: ConfigManager,
|
||||
workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
|
||||
shutdown_token: CancellationToken,
|
||||
) -> Self {
|
||||
let shutdown_drop_guard = shutdown_token.clone().drop_guard();
|
||||
Self {
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
outgoing,
|
||||
config_manager,
|
||||
workspace_settings_cache,
|
||||
shutdown_token,
|
||||
_shutdown_drop_guard: shutdown_drop_guard,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,12 +88,20 @@ impl AppsRequestProcessor {
|
||||
let request = request_id.clone();
|
||||
let outgoing = Arc::clone(&self.outgoing);
|
||||
let environment_manager = self.thread_manager.environment_manager();
|
||||
let shutdown_token = self.shutdown_token.child_token();
|
||||
tokio::spawn(async move {
|
||||
Self::apps_list_task(outgoing, request, params, config, environment_manager).await;
|
||||
tokio::select! {
|
||||
_ = shutdown_token.cancelled() => {}
|
||||
_ = Self::apps_list_task(outgoing, request, params, config, environment_manager) => {}
|
||||
}
|
||||
});
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
self.shutdown_token.cancel();
|
||||
}
|
||||
|
||||
async fn apps_list_task(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: ConnectionRequestId,
|
||||
|
||||
@@ -15,6 +15,8 @@ use codex_file_watcher::ThrottledWatchReceiver;
|
||||
use codex_file_watcher::WatchPath;
|
||||
use codex_file_watcher::WatchRegistration;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::sync::DropGuard;
|
||||
use tracing::warn;
|
||||
|
||||
#[cfg(not(test))]
|
||||
@@ -24,6 +26,8 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);
|
||||
|
||||
pub(crate) struct SkillsWatcher {
|
||||
subscriber: FileWatcherSubscriber,
|
||||
shutdown_token: CancellationToken,
|
||||
_shutdown_drop_guard: DropGuard,
|
||||
}
|
||||
|
||||
impl SkillsWatcher {
|
||||
@@ -39,8 +43,18 @@ impl SkillsWatcher {
|
||||
}
|
||||
};
|
||||
let (subscriber, rx) = file_watcher.add_subscriber();
|
||||
Self::spawn_event_loop(rx, skills_manager, outgoing);
|
||||
Arc::new(Self { subscriber })
|
||||
let shutdown_token = CancellationToken::new();
|
||||
let shutdown_drop_guard = shutdown_token.clone().drop_guard();
|
||||
Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token());
|
||||
Arc::new(Self {
|
||||
subscriber,
|
||||
shutdown_token,
|
||||
_shutdown_drop_guard: shutdown_drop_guard,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
self.shutdown_token.cancel();
|
||||
}
|
||||
|
||||
pub(crate) async fn register_thread_config(
|
||||
@@ -92,6 +106,7 @@ impl SkillsWatcher {
|
||||
rx: Receiver,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
shutdown_token: CancellationToken,
|
||||
) {
|
||||
let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL);
|
||||
let Ok(handle) = tokio::runtime::Handle::try_current() else {
|
||||
@@ -99,7 +114,14 @@ impl SkillsWatcher {
|
||||
return;
|
||||
};
|
||||
handle.spawn(async move {
|
||||
while rx.recv().await.is_some() {
|
||||
loop {
|
||||
let event = tokio::select! {
|
||||
_ = shutdown_token.cancelled() => break,
|
||||
event = rx.recv() => event,
|
||||
};
|
||||
if event.is_none() {
|
||||
break;
|
||||
}
|
||||
skills_manager.clear_cache();
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::SkillsChanged(
|
||||
|
||||
Reference in New Issue
Block a user