mirror of
https://github.com/openai/codex.git
synced 2026-05-28 06:55:01 +00:00
## Why Pressing `Ctrl+C` or `Ctrl+D` in the TUI could make Codex pause during shutdown when app-server background work still held outbound sender clones. Shutdown tracing against the current `~/.codex` path found three relevant holders: - `SkillsWatcher` kept its event-loop task alive until the shutdown timeout path. - `AppServerAttestationProvider` retained a strong `Arc<OutgoingMessageSender>`, which could keep outbound teardown waiting after the processor task had exited. - A background `apps/list` task could still own an outbound sender when shutdown began, causing the in-process app-server runtime to wait for its outbound channel to close. ## What Changed - Give `SkillsWatcher` an explicit shutdown `CancellationToken` and cancel it from app-server teardown so its event loop drops the outbound sender promptly. - Change `AppServerAttestationProvider` to keep a `Weak<OutgoingMessageSender>` and return immediately when it can no longer be upgraded. - Give `AppsRequestProcessor` a shutdown `CancellationToken` and cancel in-flight background `apps/list` work during teardown. ## How to Test 1. Start Codex TUI from a real home configuration. 2. Press `Ctrl+C`. 3. Confirm Codex exits promptly instead of pausing during shutdown. 4. Repeat with `Ctrl+D` and confirm the same prompt exit path. Focused manual trace validation from the investigation: - Before the full fix, reproduced shutdown traces showed outbound teardown waiting on lingering owners, including `attestation.provider=1` and later `apps.list.task=1`. - After the fix, fresh real-home `Ctrl+D` traces showed `app_server.runtime.outbound_state_after_processor_join` with `owners=none`, `app_server.runtime.wait_outbound_handle = 0ms`, and total TUI app-server shutdown around `18ms`. Targeted validation: - `RUST_MIN_STACK=8388608 cargo test -p codex-app-server`
221 lines
6.7 KiB
Rust
221 lines
6.7 KiB
Rust
use std::sync::Arc;
|
|
use std::sync::Weak;
|
|
|
|
use axum::http::HeaderValue;
|
|
use codex_app_server_protocol::AttestationGenerateParams;
|
|
use codex_app_server_protocol::AttestationGenerateResponse;
|
|
use codex_app_server_protocol::ServerRequestPayload;
|
|
use codex_core::AttestationContext;
|
|
use codex_core::AttestationProvider;
|
|
use codex_core::GenerateAttestationFuture;
|
|
use serde::Serialize;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
use tracing::warn;
|
|
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use crate::thread_state::ThreadStateManager;
|
|
|
|
const ATTESTATION_GENERATE_TIMEOUT: Duration = Duration::from_millis(100);
|
|
|
|
pub(crate) fn app_server_attestation_provider(
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
thread_state_manager: ThreadStateManager,
|
|
) -> Arc<dyn AttestationProvider> {
|
|
Arc::new(AppServerAttestationProvider {
|
|
outgoing: Arc::downgrade(&outgoing),
|
|
thread_state_manager,
|
|
})
|
|
}
|
|
|
|
struct AppServerAttestationProvider {
|
|
outgoing: Weak<OutgoingMessageSender>,
|
|
thread_state_manager: ThreadStateManager,
|
|
}
|
|
|
|
impl std::fmt::Debug for AppServerAttestationProvider {
|
|
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
formatter
|
|
.debug_struct("AppServerAttestationProvider")
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl AttestationProvider for AppServerAttestationProvider {
|
|
fn header_for_request(&self, context: AttestationContext) -> GenerateAttestationFuture<'_> {
|
|
let Some(outgoing) = self.outgoing.upgrade() else {
|
|
return Box::pin(async { None });
|
|
};
|
|
let thread_state_manager = self.thread_state_manager.clone();
|
|
Box::pin(async move {
|
|
request_attestation_header_value_with_timeout(
|
|
outgoing,
|
|
thread_state_manager,
|
|
context.thread_id,
|
|
ATTESTATION_GENERATE_TIMEOUT,
|
|
)
|
|
.await
|
|
.and_then(|value| HeaderValue::from_bytes(value.as_bytes()).ok())
|
|
})
|
|
}
|
|
}
|
|
|
|
async fn request_attestation_header_value_with_timeout(
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
thread_state_manager: ThreadStateManager,
|
|
thread_id: codex_protocol::ThreadId,
|
|
timeout_duration: Duration,
|
|
) -> Option<String> {
|
|
let connection_id = thread_state_manager
|
|
.first_attestation_capable_connection_for_thread(thread_id)
|
|
.await?;
|
|
|
|
let connection_ids = [connection_id];
|
|
let (request_id, rx) = outgoing
|
|
.send_request_to_connections(
|
|
Some(&connection_ids),
|
|
ServerRequestPayload::AttestationGenerate(AttestationGenerateParams {}),
|
|
/*thread_id*/ None,
|
|
)
|
|
.await;
|
|
|
|
let result = match timeout(timeout_duration, rx).await {
|
|
Ok(Ok(Ok(result))) => result,
|
|
Ok(Ok(Err(err))) => {
|
|
warn!(
|
|
code = err.code,
|
|
message = %err.message,
|
|
"attestation generation request failed"
|
|
);
|
|
return app_server_attestation_header_value(
|
|
AppServerAttestationStatus::RequestFailed,
|
|
/*token*/ None,
|
|
);
|
|
}
|
|
Ok(Err(err)) => {
|
|
warn!("attestation generation request canceled: {err}");
|
|
return app_server_attestation_header_value(
|
|
AppServerAttestationStatus::RequestCanceled,
|
|
/*token*/ None,
|
|
);
|
|
}
|
|
Err(_) => {
|
|
let _canceled = outgoing.cancel_request(&request_id).await;
|
|
warn!(
|
|
timeout_seconds = timeout_duration.as_secs(),
|
|
"attestation generation request timed out"
|
|
);
|
|
return app_server_attestation_header_value(
|
|
AppServerAttestationStatus::Timeout,
|
|
/*token*/ None,
|
|
);
|
|
}
|
|
};
|
|
|
|
match serde_json::from_value::<AttestationGenerateResponse>(result) {
|
|
Ok(response) => app_server_attestation_header_value(
|
|
AppServerAttestationStatus::Ok,
|
|
Some(&response.token),
|
|
),
|
|
Err(err) => {
|
|
warn!("failed to deserialize attestation generation response: {err}");
|
|
app_server_attestation_header_value(
|
|
AppServerAttestationStatus::MalformedResponse,
|
|
/*token*/ None,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy)]
|
|
enum AppServerAttestationStatus {
|
|
Ok,
|
|
Timeout,
|
|
RequestFailed,
|
|
RequestCanceled,
|
|
MalformedResponse,
|
|
}
|
|
|
|
impl AppServerAttestationStatus {
|
|
const fn code(self) -> u8 {
|
|
match self {
|
|
Self::Ok => 0,
|
|
Self::Timeout => 1,
|
|
Self::RequestFailed => 2,
|
|
Self::RequestCanceled => 3,
|
|
Self::MalformedResponse => 4,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct AppServerAttestationEnvelope<'a> {
|
|
v: u8,
|
|
s: u8,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
t: Option<&'a str>,
|
|
}
|
|
|
|
fn app_server_attestation_header_value(
|
|
status: AppServerAttestationStatus,
|
|
token: Option<&str>,
|
|
) -> Option<String> {
|
|
serde_json::to_string(&AppServerAttestationEnvelope {
|
|
v: 1,
|
|
s: status.code(),
|
|
t: token,
|
|
})
|
|
.map_err(|err| warn!("failed to serialize app-server attestation envelope: {err}"))
|
|
.ok()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::AppServerAttestationStatus;
|
|
use super::app_server_attestation_header_value;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn app_server_attestation_header_value_wraps_opaque_client_payloads() {
|
|
assert_eq!(
|
|
app_server_attestation_header_value(
|
|
AppServerAttestationStatus::Ok,
|
|
Some("v1.opaque-client-payload"),
|
|
),
|
|
Some(r#"{"v":1,"s":0,"t":"v1.opaque-client-payload"}"#.to_string())
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn app_server_attestation_header_value_reports_app_server_failures() {
|
|
assert_eq!(
|
|
app_server_attestation_header_value(
|
|
AppServerAttestationStatus::Timeout,
|
|
/*token*/ None,
|
|
),
|
|
Some(r#"{"v":1,"s":1}"#.to_string())
|
|
);
|
|
assert_eq!(
|
|
app_server_attestation_header_value(
|
|
AppServerAttestationStatus::RequestFailed,
|
|
/*token*/ None,
|
|
),
|
|
Some(r#"{"v":1,"s":2}"#.to_string())
|
|
);
|
|
assert_eq!(
|
|
app_server_attestation_header_value(
|
|
AppServerAttestationStatus::RequestCanceled,
|
|
/*token*/ None,
|
|
),
|
|
Some(r#"{"v":1,"s":3}"#.to_string())
|
|
);
|
|
assert_eq!(
|
|
app_server_attestation_header_value(
|
|
AppServerAttestationStatus::MalformedResponse,
|
|
/*token*/ None
|
|
),
|
|
Some(r#"{"v":1,"s":4}"#.to_string())
|
|
);
|
|
}
|
|
}
|