fix thread/archive

This commit is contained in:
Owen Lin
2025-11-03 07:44:22 -08:00
parent 52d6a68ce9
commit 35f4fd978d

View File

@@ -1202,11 +1202,20 @@ impl CodexMessageProcessor {
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop {
select! {
_ = notify_clone.notified() => { break; }
_ = &mut notified => { break; }
event = conversation_clone.next_event() => {
if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) { break; }
match event {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
}
}
}
}
@@ -1217,13 +1226,14 @@ impl CodexMessageProcessor {
_ = is_shutdown => {}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
notify.notify_one();
// Wake any waiter; use notify_waiters to avoid missing the signal.
notify.notify_waiters();
}
}
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_one();
notify.notify_waiters();
}
}
}