This commit is contained in:
jif-oai
2025-11-19 20:40:35 +00:00
parent 2a24ae36c2
commit 7ceabac707
2 changed files with 22 additions and 46 deletions

View File

@@ -147,7 +147,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::error;
@@ -2237,51 +2236,25 @@ impl CodexMessageProcessor {
.await
{
info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
// Do not wait on conversation.next_event(); the listener task already consumes
// the stream. Request shutdown and ensure the rollout file is flushed before moving it.
if let Err(err) = conversation.submit(Op::Shutdown).await {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
}
// Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop {
select! {
_ = &mut notified => { break; }
event = conversation_clone.next_event() => {
match event {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
}
}
}
let flush_result =
tokio::time::timeout(Duration::from_secs(5), conversation.flush_rollout()).await;
match flush_result {
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!(
"conversation {conversation_id} rollout flush failed before archive: {err}"
);
}
});
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Successfully submitted Shutdown; wait before proceeding.
select! {
_ = is_shutdown => {
// Normal shutdown: proceed with archive.
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
// Wake any waiter; use notify_waiters to avoid missing the signal.
notify.notify_waiters();
// Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file.
}
}
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_waiters();
Err(_) => {
warn!(
"conversation {conversation_id} rollout flush timed out; proceeding with archive"
);
}
}
}

View File

@@ -135,8 +135,8 @@ use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use codex_utils_tokenizer::warm_model_cache;
use std::path::Path;
use reqwest::StatusCode;
use std::path::Path;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
@@ -1757,7 +1757,10 @@ mod handlers {
let message = format!("Failed to save session '{name}': {err}");
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent { message }),
msg: EventMsg::Error(ErrorEvent {
message,
http_status_code: None,
}),
};
sess.send_event_raw(event).await;
}