diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index ccbd85b8e9..1a9b40bfd3 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -255,6 +255,7 @@ use codex_core::CodexThreadTurnContextOverrides; use codex_core::ForkSnapshot; use codex_core::NewThread; use codex_core::RolloutRecorder; +use codex_core::SessionIdleReason; use codex_core::SessionMeta; use codex_core::SessionRuntimeEvent; use codex_core::SessionRuntimeExtension; @@ -4562,12 +4563,9 @@ impl CodexMessageProcessor { self.emit_goal_snapshot(thread_id).await; // App-server owns resume response and snapshot ordering, so wait // until those are sent before letting the goal runtime continue. - if let Err(err) = codex_thread - .apply_runtime_extension_event(SessionRuntimeEvent::MaybeContinueIfIdle) - .await - { - tracing::warn!("failed to continue active goal after resume: {err}"); - } + codex_thread + .maybe_start_extension_background_turn(SessionIdleReason::ThreadResumed) + .await; } } Err(err) => { @@ -8584,12 +8582,10 @@ async fn handle_pending_thread_resume_request( .await; // App-server owns resume response and snapshot ordering, so wait until // replay completes before letting the goal runtime start continuation. - if pending.emit_goal_update - && let Err(err) = conversation - .apply_runtime_extension_event(SessionRuntimeEvent::MaybeContinueIfIdle) - .await - { - tracing::warn!("failed to continue active goal after running-thread resume: {err}"); + if pending.emit_goal_update { + conversation + .maybe_start_extension_background_turn(SessionIdleReason::ThreadResumed) + .await; } } diff --git a/codex-rs/app-server/src/codex_message_processor/goal_handlers.rs b/codex-rs/app-server/src/codex_message_processor/goal_handlers.rs index 7082232664..9ad0d5dcd7 100644 --- a/codex-rs/app-server/src/codex_message_processor/goal_handlers.rs +++ b/codex-rs/app-server/src/codex_message_processor/goal_handlers.rs @@ -184,6 +184,11 @@ impl CodexMessageProcessor { runtime .apply_external_goal_set(thread.runtime_handle(), goal_status) .await; + if goal_status == codex_state::ThreadGoalStatus::Active { + thread + .maybe_start_extension_background_turn(SessionIdleReason::HostRequest) + .await; + } } } diff --git a/codex-rs/app-server/src/goal_runtime/accounting.rs b/codex-rs/app-server/src/goal_runtime/accounting.rs index 07dc69c76b..841ba2b483 100644 --- a/codex-rs/app-server/src/goal_runtime/accounting.rs +++ b/codex-rs/app-server/src/goal_runtime/accounting.rs @@ -9,6 +9,8 @@ use super::state::BudgetLimitSteering; use super::state::GoalContinuationCandidate; use super::state::GoalTurnAccountingSnapshot; use anyhow::Context; +use codex_core::SessionBackgroundTurn; +use codex_core::SessionIdleReason; use codex_core::SessionRuntimeEvent; use codex_core::SessionRuntimeHandle; use codex_protocol::ThreadId; @@ -65,10 +67,6 @@ impl GoalRuntime { .await; Ok(()) } - SessionRuntimeEvent::MaybeContinueIfIdle => { - self.maybe_continue_goal_if_idle_runtime(&handle).await; - Ok(()) - } SessionRuntimeEvent::TaskAborted { turn_id, reason } => { self.handle_goal_task_abort(&handle, turn_id, reason).await; Ok(()) @@ -117,7 +115,6 @@ impl GoalRuntime { } Ok(None) => {} } - self.maybe_continue_goal_if_idle_runtime(handle).await; } codex_state::ThreadGoalStatus::BudgetLimited => { if !handle.has_active_turn().await { @@ -590,45 +587,22 @@ impl GoalRuntime { Ok(true) } - async fn maybe_continue_goal_if_idle_runtime(&self, handle: &SessionRuntimeHandle) { - self.maybe_start_goal_continuation_turn(handle).await; - } - - async fn maybe_start_goal_continuation_turn(&self, handle: &SessionRuntimeHandle) { + pub(super) async fn provide_idle_background_turn( + &self, + handle: SessionRuntimeHandle, + _reason: SessionIdleReason, + ) -> anyhow::Result> { let state = self.state(handle.thread_id()).await; let Ok(_continuation_guard) = state.continuation_lock.acquire().await else { tracing::warn!("goal continuation semaphore closed"); - return; + return Ok(None); }; - let Some(candidate) = self.goal_continuation_candidate_if_active(handle).await else { - return; - }; - let started = handle - .try_start_idle_background_turn(candidate.items.clone()) - .await; - if !started { - return; - } - - match handle.state_db_for_persisted_thread().await { - Ok(Some(state_db)) => match state_db.get_thread_goal(handle.thread_id()).await { - Ok(Some(goal)) - if goal.goal_id == candidate.goal_id - && goal.status == codex_state::ThreadGoalStatus::Active => {} - Ok(Some(_)) | Ok(None) => { - tracing::debug!( - "active goal changed after continuation launch; next idle event will settle state" - ); - } - Err(err) => { - tracing::warn!("failed to re-read goal after continuation: {err}"); - } - }, - Ok(None) => {} - Err(err) => { - tracing::warn!("failed to open state db after goal continuation: {err}"); - } - } + Ok(self + .goal_continuation_candidate_if_active(&handle) + .await + .map(|candidate| SessionBackgroundTurn { + items: candidate.items, + })) } async fn goal_continuation_candidate_if_active( @@ -686,10 +660,8 @@ impl GoalRuntime { tracing::debug!("skipping active goal continuation because pending work appeared"); return None; } - let goal_id = goal.goal_id.clone(); let goal = protocol_goal_from_state(goal); Some(GoalContinuationCandidate { - goal_id, items: vec![ResponseInputItem::Message { role: "developer".to_string(), content: vec![ContentItem::InputText { diff --git a/codex-rs/app-server/src/goal_runtime/mod.rs b/codex-rs/app-server/src/goal_runtime/mod.rs index 735a75fb57..e6097bd381 100644 --- a/codex-rs/app-server/src/goal_runtime/mod.rs +++ b/codex-rs/app-server/src/goal_runtime/mod.rs @@ -7,6 +7,8 @@ mod state; mod tests; mod tools; +use codex_core::SessionBackgroundTurn; +use codex_core::SessionIdleReason; use codex_core::SessionRuntimeEvent; use codex_core::SessionRuntimeExtension; use codex_core::SessionRuntimeHandle; @@ -107,4 +109,12 @@ impl SessionRuntimeExtension for GoalRuntime { ) -> BoxFuture<'a, anyhow::Result<()>> { Box::pin(async move { self.apply_event(handle, event).await }) } + + fn next_idle_background_turn<'a>( + &'a self, + handle: SessionRuntimeHandle, + reason: SessionIdleReason, + ) -> BoxFuture<'a, anyhow::Result>> { + Box::pin(async move { self.provide_idle_background_turn(handle, reason).await }) + } } diff --git a/codex-rs/app-server/src/goal_runtime/state.rs b/codex-rs/app-server/src/goal_runtime/state.rs index a0e90fd4b7..a0a78b8748 100644 --- a/codex-rs/app-server/src/goal_runtime/state.rs +++ b/codex-rs/app-server/src/goal_runtime/state.rs @@ -43,7 +43,6 @@ pub(super) struct GoalWallClockAccountingSnapshot { } pub(super) struct GoalContinuationCandidate { - pub(super) goal_id: String, pub(super) items: Vec, } diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 2eb59f465a..d50cfb1e35 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -4,6 +4,7 @@ use crate::file_watcher::WatchRegistration; use crate::session::Codex; use crate::session::SessionSettingsUpdate; use crate::session::SteerInputError; +use crate::session_extension::SessionIdleReason; use crate::session_extension::SessionRuntimeEvent; use crate::session_extension::SessionRuntimeHandle; use codex_features::Feature; @@ -148,6 +149,13 @@ impl CodexThread { .await } + pub async fn maybe_start_extension_background_turn(&self, reason: SessionIdleReason) -> bool { + self.codex + .session + .maybe_start_extension_background_turn(reason) + .await + } + #[doc(hidden)] pub async fn ensure_rollout_materialized(&self) { self.codex.session.ensure_rollout_materialized().await; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 5e8a13cbe4..b371ab1757 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -197,6 +197,8 @@ pub use exec_policy::format_exec_policy_error_with_source; pub use exec_policy::load_exec_policy; pub use file_watcher::FileWatcherEvent; pub use installation_id::resolve_installation_id; +pub use session_extension::SessionBackgroundTurn; +pub use session_extension::SessionIdleReason; pub use session_extension::SessionRuntimeEvent; pub use session_extension::SessionRuntimeExtension; pub use session_extension::SessionRuntimeHandle; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index f230c08dc1..ff52f06449 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -509,6 +509,7 @@ struct RecordingRuntimeExtension { events: tokio::sync::Mutex>, notify: tokio::sync::Notify, tool_specs: Vec, + idle_background_turn: tokio::sync::Mutex>>, } impl RecordingRuntimeExtension { @@ -517,6 +518,16 @@ impl RecordingRuntimeExtension { events: tokio::sync::Mutex::new(Vec::new()), notify: tokio::sync::Notify::new(), tool_specs, + idle_background_turn: tokio::sync::Mutex::new(None), + } + } + + fn with_idle_background_turn(items: Vec) -> Self { + Self { + events: tokio::sync::Mutex::new(Vec::new()), + notify: tokio::sync::Notify::new(), + tool_specs: Vec::new(), + idle_background_turn: tokio::sync::Mutex::new(Some(items)), } } @@ -601,9 +612,6 @@ impl crate::session_extension::SessionRuntimeExtension for RecordingRuntimeExten } => { format!("turn_finished:{turn_id}:{turn_completed}") } - crate::session_extension::SessionRuntimeEvent::MaybeContinueIfIdle => { - "maybe_continue_if_idle".to_string() - } crate::session_extension::SessionRuntimeEvent::TaskAborted { turn_id, reason } => { format!("task_aborted:{turn_id:?}:{reason:?}") } @@ -615,6 +623,25 @@ impl crate::session_extension::SessionRuntimeExtension for RecordingRuntimeExten Ok(()) }) } + + fn next_idle_background_turn<'a>( + &'a self, + _handle: crate::session_extension::SessionRuntimeHandle, + reason: crate::session_extension::SessionIdleReason, + ) -> futures::future::BoxFuture< + 'a, + anyhow::Result>, + > { + Box::pin(async move { + self.record(format!("idle_provider:{reason:?}")).await; + Ok(self + .idle_background_turn + .lock() + .await + .take() + .map(|items| crate::session_extension::SessionBackgroundTurn { items })) + }) + } } fn make_connector(id: &str, name: &str) -> AppInfo { @@ -7122,13 +7149,12 @@ async fn runtime_extension_receives_turn_lifecycle_events_in_order() { ) .await; - let events = extension.wait_for_events(/*count*/ 3).await; + let events = extension.wait_for_events(/*count*/ 2).await; assert_eq!( - &events[..3], + &events[..2], &[ format!("turn_started:{}", turn_context.sub_id), format!("turn_finished:{}:true", turn_context.sub_id), - "maybe_continue_if_idle".to_string(), ] ); } @@ -7163,7 +7189,7 @@ async fn runtime_extension_hidden_item_injection_reaches_active_turn() { #[tokio::test] async fn idle_background_turn_refuses_to_race_pending_user_work() { - let (sess, _tc, _rx) = make_session_and_context_with_rx().await; + let (mut session, _turn_context) = make_session_and_context().await; let existing_item = ResponseInputItem::Message { role: "user".to_string(), content: vec![ContentItem::InputText { @@ -7178,11 +7204,20 @@ async fn idle_background_turn_refuses_to_race_pending_user_work() { }], phase: None, }; + let extension = Arc::new(RecordingRuntimeExtension::with_idle_background_turn(vec![ + background_item, + ])); + let runtime_extension: Arc = + extension.clone(); + session.services.runtime_extension = Some(runtime_extension); + let sess = Arc::new(session); sess.queue_response_items_for_next_turn(vec![existing_item.clone()]) .await; - let started = crate::session_extension::SessionRuntimeHandle::new(Arc::clone(&sess)) - .try_start_idle_background_turn(vec![background_item]) + let started = sess + .maybe_start_extension_background_turn( + crate::session_extension::SessionIdleReason::HostRequest, + ) .await; assert!(!started); diff --git a/codex-rs/core/src/session_extension.rs b/codex-rs/core/src/session_extension.rs index c588b62b77..5229732f88 100644 --- a/codex-rs/core/src/session_extension.rs +++ b/codex-rs/core/src/session_extension.rs @@ -48,7 +48,6 @@ pub enum SessionRuntimeEvent { mode: ModeKind, turn_completed: bool, }, - MaybeContinueIfIdle, TaskAborted { turn_id: Option, reason: TurnAbortReason, @@ -56,6 +55,14 @@ pub enum SessionRuntimeEvent { ThreadResumed, } +/// Reason core is asking the extension whether idle background work is ready. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SessionIdleReason { + TurnCompleted, + ThreadResumed, + HostRequest, +} + /// Tool invocation delivered to a host extension. #[derive(Clone)] pub struct SessionToolInvocation { @@ -87,6 +94,15 @@ pub enum SessionToolError { Fatal(String), } +/// Hidden input for an extension-provided background turn. +/// +/// Extensions should return this only when the thread is idle and host-owned +/// work is ready to continue. Core performs final pending-work and active-turn +/// checks before starting the turn. +pub struct SessionBackgroundTurn { + pub items: Vec, +} + /// Host extension installed into a core session. /// /// Implementations should keep their own state outside core, keyed by @@ -119,6 +135,21 @@ pub trait SessionRuntimeExtension: Send + Sync { ) -> BoxFuture<'a, anyhow::Result<()>> { Box::pin(async { Ok(()) }) } + + /// Offer hidden input for a background turn when core observes an idle + /// thread. + /// + /// Implementations should return `Ok(None)` unless the extension has + /// process-owned work that should continue without a user-visible request. + /// Core owns the final start decision, so a returned turn may still be + /// discarded if user work appears or another turn starts concurrently. + fn next_idle_background_turn<'a>( + &'a self, + _handle: SessionRuntimeHandle, + _reason: SessionIdleReason, + ) -> BoxFuture<'a, anyhow::Result>> { + Box::pin(async { Ok(None) }) + } } /// Safe operations exposed by core to a host-owned session extension. @@ -176,14 +207,6 @@ impl SessionRuntimeHandle { self.session.has_trigger_turn_mailbox_items().await } - pub async fn maybe_start_turn_for_pending_work(&self) { - self.session.maybe_start_turn_for_pending_work().await; - } - - pub async fn try_start_idle_background_turn(&self, items: Vec) -> bool { - self.session.try_start_idle_background_turn(items).await - } - /// Open the state DB for a persisted local thread, materializing and /// reconciling the rollout first when necessary. pub async fn state_db_for_persisted_thread(&self) -> anyhow::Result> { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index cf863c6ef1..addf6c45f3 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -27,7 +27,9 @@ use crate::hook_runtime::record_additional_contexts; use crate::hook_runtime::record_pending_input; use crate::session::session::Session; use crate::session::turn_context::TurnContext; +use crate::session_extension::SessionIdleReason; use crate::session_extension::SessionRuntimeEvent; +use crate::session_extension::SessionRuntimeHandle; use crate::state::ActiveTurn; use crate::state::RunningTask; use crate::state::TaskKind; @@ -491,6 +493,40 @@ impl Session { .await } + /// Asks the installed runtime extension for idle background work and starts + /// it only if user-visible pending work has not won the race. + pub(crate) async fn maybe_start_extension_background_turn( + self: &Arc, + reason: SessionIdleReason, + ) -> bool { + self.maybe_start_turn_for_pending_work().await; + + if self.active_turn.lock().await.is_some() + || self.has_queued_response_items_for_next_turn().await + || self.has_trigger_turn_mailbox_items().await + { + return false; + } + + let Some(extension) = self.runtime_extension() else { + return false; + }; + let handle = SessionRuntimeHandle::new(Arc::clone(self)); + let background_turn = match extension.next_idle_background_turn(handle, reason).await { + Ok(background_turn) => background_turn, + Err(err) => { + warn!("runtime extension idle background provider failed: {err}"); + return false; + } + }; + let Some(background_turn) = background_turn else { + return false; + }; + + self.try_start_idle_background_turn(background_turn.items) + .await + } + async fn submit_pending_work_wakeup(&self, sub_id: String) -> bool { self.tx_sub .send(Submission { @@ -816,13 +852,8 @@ impl Session { } let sess = Arc::clone(self); tokio::spawn(async move { - sess.maybe_start_turn_for_pending_work().await; - if let Err(err) = sess - .apply_runtime_extension_event(SessionRuntimeEvent::MaybeContinueIfIdle) - .await - { - warn!("failed to apply runtime extension maybe-continue event: {err}"); - } + sess.maybe_start_extension_background_turn(SessionIdleReason::TurnCompleted) + .await; }); } }