From 462deb0426bfea72f80ae7d56605958d7206309e Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 28 May 2026 20:05:41 +0200 Subject: [PATCH] Wire task completion into thread-idle lifecycle (#24928) ## Why #24744 introduced the thread idle lifecycle hook so idle continuation can be owned by lifecycle contributors instead of hard-coded goal runtime plumbing. Task completion still called `goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle)` directly, so the post-turn idle transition remained goal-specific and did not notify generic thread lifecycle contributors. ## What Changed - Add `Session::emit_thread_idle_lifecycle_if_idle()` to gate idle emission on both no active turn and no queued trigger-turn mailbox work. - Call that helper when a task clears the active turn, replacing the direct `GoalRuntimeEvent::MaybeContinueIfIdle` path. - Cover the behavior with `codex-core` session tests for emitting after task completion and suppressing idle emission while trigger-turn mailbox work is pending. ## Verification - New tests in `core/src/session/tests.rs` exercise the idle lifecycle emission and trigger-turn mailbox guard. --- codex-rs/core/src/session/tests.rs | 103 +++++++++++++++++++++++++++ codex-rs/core/src/tasks/lifecycle.rs | 17 +++++ codex-rs/core/src/tasks/mod.rs | 1 + 3 files changed, 121 insertions(+) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 7cca56dfb2..a596ff3e79 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -7894,6 +7894,29 @@ async fn realtime_conversation_list_voices_emits_builtin_list() { ); } +#[derive(Clone, Copy)] +struct CompletingTask; + +impl SessionTask for CompletingTask { + fn kind(&self) -> TaskKind { + TaskKind::Regular + } + + fn span_name(&self) -> &'static str { + "session_task.completing" + } + + async fn run( + self: Arc, + _session: Arc, + _ctx: Arc, + _input: Vec, + _cancellation_token: CancellationToken, + ) -> Option { + None + } +} + #[derive(Clone, Copy)] struct NeverEndingTask { kind: TaskKind, @@ -8238,6 +8261,86 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input() )); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn task_finish_emits_thread_idle_lifecycle_after_active_turn_clears() { + struct ThreadIdleRecorder { + calls: Arc, + idle_tx: async_channel::Sender<()>, + expected_thread_id: ThreadId, + } + + #[async_trait::async_trait] + impl codex_extension_api::ThreadLifecycleContributor for ThreadIdleRecorder { + async fn on_thread_idle(&self, input: codex_extension_api::ThreadIdleInput<'_>) { + assert_eq!( + self.expected_thread_id.to_string(), + input.thread_store.level_id() + ); + self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.idle_tx.send(()).await.expect("idle receiver open"); + } + } + + let (mut session, turn_context) = make_session_and_context().await; + let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let (idle_tx, idle_rx) = async_channel::bounded(1); + let mut builder = codex_extension_api::ExtensionRegistryBuilder::::new(); + builder.thread_lifecycle_contributor(Arc::new(ThreadIdleRecorder { + calls: Arc::clone(&calls), + idle_tx, + expected_thread_id: session.conversation_id, + })); + session.services.extensions = Arc::new(builder.build()); + + let session = Arc::new(session); + session + .spawn_task(Arc::new(turn_context), Vec::new(), CompletingTask) + .await; + + timeout(StdDuration::from_secs(2), idle_rx.recv()) + .await + .expect("thread idle lifecycle") + .expect("idle receiver open"); + assert_eq!(1, calls.load(std::sync::atomic::Ordering::SeqCst)); + assert!(session.active_turn.lock().await.is_none()); +} + +#[tokio::test] +async fn thread_idle_lifecycle_waits_for_trigger_turn_mailbox_work() { + struct ThreadIdleRecorder { + calls: Arc, + } + + #[async_trait::async_trait] + impl codex_extension_api::ThreadLifecycleContributor for ThreadIdleRecorder { + async fn on_thread_idle(&self, _input: codex_extension_api::ThreadIdleInput<'_>) { + self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } + + let (mut session, _turn_context) = make_session_and_context().await; + let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let mut builder = codex_extension_api::ExtensionRegistryBuilder::::new(); + builder.thread_lifecycle_contributor(Arc::new(ThreadIdleRecorder { + calls: Arc::clone(&calls), + })); + session.services.extensions = Arc::new(builder.build()); + session + .input_queue + .enqueue_mailbox_communication(InterAgentCommunication::new( + AgentPath::root(), + AgentPath::root(), + Vec::new(), + "pending trigger".to_string(), + /*trigger_turn*/ true, + )) + .await; + + session.emit_thread_idle_lifecycle_if_idle().await; + + assert_eq!(0, calls.load(std::sync::atomic::Ordering::SeqCst)); +} + #[tokio::test] async fn steer_input_requires_active_turn() { let (sess, _tc, _rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/tasks/lifecycle.rs b/codex-rs/core/src/tasks/lifecycle.rs index 882e787608..782cc8c7de 100644 --- a/codex-rs/core/src/tasks/lifecycle.rs +++ b/codex-rs/core/src/tasks/lifecycle.rs @@ -38,6 +38,23 @@ impl Session { } } + pub(crate) async fn emit_thread_idle_lifecycle_if_idle(&self) { + if self.active_turn.lock().await.is_some() + || self.input_queue.has_trigger_turn_mailbox_items().await + { + return; + } + + for contributor in self.services.extensions.thread_lifecycle_contributors() { + contributor + .on_thread_idle(codex_extension_api::ThreadIdleInput { + session_store: &self.services.session_extension_data, + thread_store: &self.services.thread_extension_data, + }) + .await; + } + } + pub(super) async fn emit_turn_abort_lifecycle( &self, reason: TurnAbortReason, diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index d8b5689061..38a8a96276 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -793,6 +793,7 @@ impl Session { { warn!("failed to apply goal runtime maybe-continue event: {err}"); } + self.emit_thread_idle_lifecycle_if_idle().await; } async fn take_active_turn(&self) -> Option {