Wire task completion into thread-idle lifecycle (#24928)

## Why

#24744 introduced the thread idle lifecycle hook so idle continuation
can be owned by lifecycle contributors instead of hard-coded goal
runtime plumbing. Task completion still called
`goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle)` directly, so
the post-turn idle transition remained goal-specific and did not notify
generic thread lifecycle contributors.

## What Changed

- Add `Session::emit_thread_idle_lifecycle_if_idle()` to gate idle
emission on both no active turn and no queued trigger-turn mailbox work.
- Call that helper when a task clears the active turn, replacing the
direct `GoalRuntimeEvent::MaybeContinueIfIdle` path.
- Cover the behavior with `codex-core` session tests for emitting after
task completion and suppressing idle emission while trigger-turn mailbox
work is pending.

## Verification

- New tests in `core/src/session/tests.rs` exercise the idle lifecycle
emission and trigger-turn mailbox guard.
This commit is contained in:
jif-oai
2026-05-28 20:05:41 +02:00
committed by GitHub
parent c2508db60d
commit 462deb0426
3 changed files with 121 additions and 0 deletions

View File

@@ -7894,6 +7894,29 @@ async fn realtime_conversation_list_voices_emits_builtin_list() {
);
}
#[derive(Clone, Copy)]
struct CompletingTask;
impl SessionTask for CompletingTask {
fn kind(&self) -> TaskKind {
TaskKind::Regular
}
fn span_name(&self) -> &'static str {
"session_task.completing"
}
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<TurnInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
None
}
}
#[derive(Clone, Copy)]
struct NeverEndingTask {
kind: TaskKind,
@@ -8238,6 +8261,86 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_finish_emits_thread_idle_lifecycle_after_active_turn_clears() {
struct ThreadIdleRecorder {
calls: Arc<std::sync::atomic::AtomicUsize>,
idle_tx: async_channel::Sender<()>,
expected_thread_id: ThreadId,
}
#[async_trait::async_trait]
impl codex_extension_api::ThreadLifecycleContributor<crate::config::Config> for ThreadIdleRecorder {
async fn on_thread_idle(&self, input: codex_extension_api::ThreadIdleInput<'_>) {
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
);
self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.idle_tx.send(()).await.expect("idle receiver open");
}
}
let (mut session, turn_context) = make_session_and_context().await;
let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let (idle_tx, idle_rx) = async_channel::bounded(1);
let mut builder = codex_extension_api::ExtensionRegistryBuilder::<crate::config::Config>::new();
builder.thread_lifecycle_contributor(Arc::new(ThreadIdleRecorder {
calls: Arc::clone(&calls),
idle_tx,
expected_thread_id: session.conversation_id,
}));
session.services.extensions = Arc::new(builder.build());
let session = Arc::new(session);
session
.spawn_task(Arc::new(turn_context), Vec::new(), CompletingTask)
.await;
timeout(StdDuration::from_secs(2), idle_rx.recv())
.await
.expect("thread idle lifecycle")
.expect("idle receiver open");
assert_eq!(1, calls.load(std::sync::atomic::Ordering::SeqCst));
assert!(session.active_turn.lock().await.is_none());
}
#[tokio::test]
async fn thread_idle_lifecycle_waits_for_trigger_turn_mailbox_work() {
struct ThreadIdleRecorder {
calls: Arc<std::sync::atomic::AtomicUsize>,
}
#[async_trait::async_trait]
impl codex_extension_api::ThreadLifecycleContributor<crate::config::Config> for ThreadIdleRecorder {
async fn on_thread_idle(&self, _input: codex_extension_api::ThreadIdleInput<'_>) {
self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
let (mut session, _turn_context) = make_session_and_context().await;
let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut builder = codex_extension_api::ExtensionRegistryBuilder::<crate::config::Config>::new();
builder.thread_lifecycle_contributor(Arc::new(ThreadIdleRecorder {
calls: Arc::clone(&calls),
}));
session.services.extensions = Arc::new(builder.build());
session
.input_queue
.enqueue_mailbox_communication(InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root(),
Vec::new(),
"pending trigger".to_string(),
/*trigger_turn*/ true,
))
.await;
session.emit_thread_idle_lifecycle_if_idle().await;
assert_eq!(0, calls.load(std::sync::atomic::Ordering::SeqCst));
}
#[tokio::test]
async fn steer_input_requires_active_turn() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;

View File

@@ -38,6 +38,23 @@ impl Session {
}
}
pub(crate) async fn emit_thread_idle_lifecycle_if_idle(&self) {
if self.active_turn.lock().await.is_some()
|| self.input_queue.has_trigger_turn_mailbox_items().await
{
return;
}
for contributor in self.services.extensions.thread_lifecycle_contributors() {
contributor
.on_thread_idle(codex_extension_api::ThreadIdleInput {
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
})
.await;
}
}
pub(super) async fn emit_turn_abort_lifecycle(
&self,
reason: TurnAbortReason,

View File

@@ -793,6 +793,7 @@ impl Session {
{
warn!("failed to apply goal runtime maybe-continue event: {err}");
}
self.emit_thread_idle_lifecycle_if_idle().await;
}
async fn take_active_turn(&self) -> Option<ActiveTurn> {