Refine goal continuation extension hook

This commit is contained in:
Eric Traut
2026-05-02 12:26:06 -07:00
parent 483f3787c2
commit 987edd053c
10 changed files with 161 additions and 80 deletions

View File

@@ -255,6 +255,7 @@ use codex_core::CodexThreadTurnContextOverrides;
use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::RolloutRecorder;
use codex_core::SessionIdleReason;
use codex_core::SessionMeta;
use codex_core::SessionRuntimeEvent;
use codex_core::SessionRuntimeExtension;
@@ -4562,12 +4563,9 @@ impl CodexMessageProcessor {
self.emit_goal_snapshot(thread_id).await;
// App-server owns resume response and snapshot ordering, so wait
// until those are sent before letting the goal runtime continue.
if let Err(err) = codex_thread
.apply_runtime_extension_event(SessionRuntimeEvent::MaybeContinueIfIdle)
.await
{
tracing::warn!("failed to continue active goal after resume: {err}");
}
codex_thread
.maybe_start_extension_background_turn(SessionIdleReason::ThreadResumed)
.await;
}
}
Err(err) => {
@@ -8584,12 +8582,10 @@ async fn handle_pending_thread_resume_request(
.await;
// App-server owns resume response and snapshot ordering, so wait until
// replay completes before letting the goal runtime start continuation.
if pending.emit_goal_update
&& let Err(err) = conversation
.apply_runtime_extension_event(SessionRuntimeEvent::MaybeContinueIfIdle)
.await
{
tracing::warn!("failed to continue active goal after running-thread resume: {err}");
if pending.emit_goal_update {
conversation
.maybe_start_extension_background_turn(SessionIdleReason::ThreadResumed)
.await;
}
}

View File

@@ -184,6 +184,11 @@ impl CodexMessageProcessor {
runtime
.apply_external_goal_set(thread.runtime_handle(), goal_status)
.await;
if goal_status == codex_state::ThreadGoalStatus::Active {
thread
.maybe_start_extension_background_turn(SessionIdleReason::HostRequest)
.await;
}
}
}

View File

@@ -9,6 +9,8 @@ use super::state::BudgetLimitSteering;
use super::state::GoalContinuationCandidate;
use super::state::GoalTurnAccountingSnapshot;
use anyhow::Context;
use codex_core::SessionBackgroundTurn;
use codex_core::SessionIdleReason;
use codex_core::SessionRuntimeEvent;
use codex_core::SessionRuntimeHandle;
use codex_protocol::ThreadId;
@@ -65,10 +67,6 @@ impl GoalRuntime {
.await;
Ok(())
}
SessionRuntimeEvent::MaybeContinueIfIdle => {
self.maybe_continue_goal_if_idle_runtime(&handle).await;
Ok(())
}
SessionRuntimeEvent::TaskAborted { turn_id, reason } => {
self.handle_goal_task_abort(&handle, turn_id, reason).await;
Ok(())
@@ -117,7 +115,6 @@ impl GoalRuntime {
}
Ok(None) => {}
}
self.maybe_continue_goal_if_idle_runtime(handle).await;
}
codex_state::ThreadGoalStatus::BudgetLimited => {
if !handle.has_active_turn().await {
@@ -590,45 +587,22 @@ impl GoalRuntime {
Ok(true)
}
async fn maybe_continue_goal_if_idle_runtime(&self, handle: &SessionRuntimeHandle) {
self.maybe_start_goal_continuation_turn(handle).await;
}
async fn maybe_start_goal_continuation_turn(&self, handle: &SessionRuntimeHandle) {
pub(super) async fn provide_idle_background_turn(
&self,
handle: SessionRuntimeHandle,
_reason: SessionIdleReason,
) -> anyhow::Result<Option<SessionBackgroundTurn>> {
let state = self.state(handle.thread_id()).await;
let Ok(_continuation_guard) = state.continuation_lock.acquire().await else {
tracing::warn!("goal continuation semaphore closed");
return;
return Ok(None);
};
let Some(candidate) = self.goal_continuation_candidate_if_active(handle).await else {
return;
};
let started = handle
.try_start_idle_background_turn(candidate.items.clone())
.await;
if !started {
return;
}
match handle.state_db_for_persisted_thread().await {
Ok(Some(state_db)) => match state_db.get_thread_goal(handle.thread_id()).await {
Ok(Some(goal))
if goal.goal_id == candidate.goal_id
&& goal.status == codex_state::ThreadGoalStatus::Active => {}
Ok(Some(_)) | Ok(None) => {
tracing::debug!(
"active goal changed after continuation launch; next idle event will settle state"
);
}
Err(err) => {
tracing::warn!("failed to re-read goal after continuation: {err}");
}
},
Ok(None) => {}
Err(err) => {
tracing::warn!("failed to open state db after goal continuation: {err}");
}
}
Ok(self
.goal_continuation_candidate_if_active(&handle)
.await
.map(|candidate| SessionBackgroundTurn {
items: candidate.items,
}))
}
async fn goal_continuation_candidate_if_active(
@@ -686,10 +660,8 @@ impl GoalRuntime {
tracing::debug!("skipping active goal continuation because pending work appeared");
return None;
}
let goal_id = goal.goal_id.clone();
let goal = protocol_goal_from_state(goal);
Some(GoalContinuationCandidate {
goal_id,
items: vec![ResponseInputItem::Message {
role: "developer".to_string(),
content: vec![ContentItem::InputText {

View File

@@ -7,6 +7,8 @@ mod state;
mod tests;
mod tools;
use codex_core::SessionBackgroundTurn;
use codex_core::SessionIdleReason;
use codex_core::SessionRuntimeEvent;
use codex_core::SessionRuntimeExtension;
use codex_core::SessionRuntimeHandle;
@@ -107,4 +109,12 @@ impl SessionRuntimeExtension for GoalRuntime {
) -> BoxFuture<'a, anyhow::Result<()>> {
Box::pin(async move { self.apply_event(handle, event).await })
}
fn next_idle_background_turn<'a>(
&'a self,
handle: SessionRuntimeHandle,
reason: SessionIdleReason,
) -> BoxFuture<'a, anyhow::Result<Option<SessionBackgroundTurn>>> {
Box::pin(async move { self.provide_idle_background_turn(handle, reason).await })
}
}

View File

@@ -43,7 +43,6 @@ pub(super) struct GoalWallClockAccountingSnapshot {
}
pub(super) struct GoalContinuationCandidate {
pub(super) goal_id: String,
pub(super) items: Vec<ResponseInputItem>,
}

View File

@@ -4,6 +4,7 @@ use crate::file_watcher::WatchRegistration;
use crate::session::Codex;
use crate::session::SessionSettingsUpdate;
use crate::session::SteerInputError;
use crate::session_extension::SessionIdleReason;
use crate::session_extension::SessionRuntimeEvent;
use crate::session_extension::SessionRuntimeHandle;
use codex_features::Feature;
@@ -148,6 +149,13 @@ impl CodexThread {
.await
}
pub async fn maybe_start_extension_background_turn(&self, reason: SessionIdleReason) -> bool {
self.codex
.session
.maybe_start_extension_background_turn(reason)
.await
}
#[doc(hidden)]
pub async fn ensure_rollout_materialized(&self) {
self.codex.session.ensure_rollout_materialized().await;

View File

@@ -197,6 +197,8 @@ pub use exec_policy::format_exec_policy_error_with_source;
pub use exec_policy::load_exec_policy;
pub use file_watcher::FileWatcherEvent;
pub use installation_id::resolve_installation_id;
pub use session_extension::SessionBackgroundTurn;
pub use session_extension::SessionIdleReason;
pub use session_extension::SessionRuntimeEvent;
pub use session_extension::SessionRuntimeExtension;
pub use session_extension::SessionRuntimeHandle;

View File

@@ -509,6 +509,7 @@ struct RecordingRuntimeExtension {
events: tokio::sync::Mutex<Vec<String>>,
notify: tokio::sync::Notify,
tool_specs: Vec<codex_tools::ToolSpec>,
idle_background_turn: tokio::sync::Mutex<Option<Vec<ResponseInputItem>>>,
}
impl RecordingRuntimeExtension {
@@ -517,6 +518,16 @@ impl RecordingRuntimeExtension {
events: tokio::sync::Mutex::new(Vec::new()),
notify: tokio::sync::Notify::new(),
tool_specs,
idle_background_turn: tokio::sync::Mutex::new(None),
}
}
fn with_idle_background_turn(items: Vec<ResponseInputItem>) -> Self {
Self {
events: tokio::sync::Mutex::new(Vec::new()),
notify: tokio::sync::Notify::new(),
tool_specs: Vec::new(),
idle_background_turn: tokio::sync::Mutex::new(Some(items)),
}
}
@@ -601,9 +612,6 @@ impl crate::session_extension::SessionRuntimeExtension for RecordingRuntimeExten
} => {
format!("turn_finished:{turn_id}:{turn_completed}")
}
crate::session_extension::SessionRuntimeEvent::MaybeContinueIfIdle => {
"maybe_continue_if_idle".to_string()
}
crate::session_extension::SessionRuntimeEvent::TaskAborted { turn_id, reason } => {
format!("task_aborted:{turn_id:?}:{reason:?}")
}
@@ -615,6 +623,25 @@ impl crate::session_extension::SessionRuntimeExtension for RecordingRuntimeExten
Ok(())
})
}
fn next_idle_background_turn<'a>(
&'a self,
_handle: crate::session_extension::SessionRuntimeHandle,
reason: crate::session_extension::SessionIdleReason,
) -> futures::future::BoxFuture<
'a,
anyhow::Result<Option<crate::session_extension::SessionBackgroundTurn>>,
> {
Box::pin(async move {
self.record(format!("idle_provider:{reason:?}")).await;
Ok(self
.idle_background_turn
.lock()
.await
.take()
.map(|items| crate::session_extension::SessionBackgroundTurn { items }))
})
}
}
fn make_connector(id: &str, name: &str) -> AppInfo {
@@ -7122,13 +7149,12 @@ async fn runtime_extension_receives_turn_lifecycle_events_in_order() {
)
.await;
let events = extension.wait_for_events(/*count*/ 3).await;
let events = extension.wait_for_events(/*count*/ 2).await;
assert_eq!(
&events[..3],
&events[..2],
&[
format!("turn_started:{}", turn_context.sub_id),
format!("turn_finished:{}:true", turn_context.sub_id),
"maybe_continue_if_idle".to_string(),
]
);
}
@@ -7163,7 +7189,7 @@ async fn runtime_extension_hidden_item_injection_reaches_active_turn() {
#[tokio::test]
async fn idle_background_turn_refuses_to_race_pending_user_work() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
let (mut session, _turn_context) = make_session_and_context().await;
let existing_item = ResponseInputItem::Message {
role: "user".to_string(),
content: vec![ContentItem::InputText {
@@ -7178,11 +7204,20 @@ async fn idle_background_turn_refuses_to_race_pending_user_work() {
}],
phase: None,
};
let extension = Arc::new(RecordingRuntimeExtension::with_idle_background_turn(vec![
background_item,
]));
let runtime_extension: Arc<dyn crate::session_extension::SessionRuntimeExtension> =
extension.clone();
session.services.runtime_extension = Some(runtime_extension);
let sess = Arc::new(session);
sess.queue_response_items_for_next_turn(vec![existing_item.clone()])
.await;
let started = crate::session_extension::SessionRuntimeHandle::new(Arc::clone(&sess))
.try_start_idle_background_turn(vec![background_item])
let started = sess
.maybe_start_extension_background_turn(
crate::session_extension::SessionIdleReason::HostRequest,
)
.await;
assert!(!started);

View File

@@ -48,7 +48,6 @@ pub enum SessionRuntimeEvent {
mode: ModeKind,
turn_completed: bool,
},
MaybeContinueIfIdle,
TaskAborted {
turn_id: Option<String>,
reason: TurnAbortReason,
@@ -56,6 +55,14 @@ pub enum SessionRuntimeEvent {
ThreadResumed,
}
/// Reason core is asking the extension whether idle background work is ready.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SessionIdleReason {
TurnCompleted,
ThreadResumed,
HostRequest,
}
/// Tool invocation delivered to a host extension.
#[derive(Clone)]
pub struct SessionToolInvocation {
@@ -87,6 +94,15 @@ pub enum SessionToolError {
Fatal(String),
}
/// Hidden input for an extension-provided background turn.
///
/// Extensions should return this only when the thread is idle and host-owned
/// work is ready to continue. Core performs final pending-work and active-turn
/// checks before starting the turn.
pub struct SessionBackgroundTurn {
pub items: Vec<ResponseInputItem>,
}
/// Host extension installed into a core session.
///
/// Implementations should keep their own state outside core, keyed by
@@ -119,6 +135,21 @@ pub trait SessionRuntimeExtension: Send + Sync {
) -> BoxFuture<'a, anyhow::Result<()>> {
Box::pin(async { Ok(()) })
}
/// Offer hidden input for a background turn when core observes an idle
/// thread.
///
/// Implementations should return `Ok(None)` unless the extension has
/// process-owned work that should continue without a user-visible request.
/// Core owns the final start decision, so a returned turn may still be
/// discarded if user work appears or another turn starts concurrently.
fn next_idle_background_turn<'a>(
&'a self,
_handle: SessionRuntimeHandle,
_reason: SessionIdleReason,
) -> BoxFuture<'a, anyhow::Result<Option<SessionBackgroundTurn>>> {
Box::pin(async { Ok(None) })
}
}
/// Safe operations exposed by core to a host-owned session extension.
@@ -176,14 +207,6 @@ impl SessionRuntimeHandle {
self.session.has_trigger_turn_mailbox_items().await
}
pub async fn maybe_start_turn_for_pending_work(&self) {
self.session.maybe_start_turn_for_pending_work().await;
}
pub async fn try_start_idle_background_turn(&self, items: Vec<ResponseInputItem>) -> bool {
self.session.try_start_idle_background_turn(items).await
}
/// Open the state DB for a persisted local thread, materializing and
/// reconciling the rollout first when necessary.
pub async fn state_db_for_persisted_thread(&self) -> anyhow::Result<Option<StateDbHandle>> {

View File

@@ -27,7 +27,9 @@ use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::session_extension::SessionIdleReason;
use crate::session_extension::SessionRuntimeEvent;
use crate::session_extension::SessionRuntimeHandle;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
@@ -491,6 +493,40 @@ impl Session {
.await
}
/// Asks the installed runtime extension for idle background work and starts
/// it only if user-visible pending work has not won the race.
pub(crate) async fn maybe_start_extension_background_turn(
self: &Arc<Self>,
reason: SessionIdleReason,
) -> bool {
self.maybe_start_turn_for_pending_work().await;
if self.active_turn.lock().await.is_some()
|| self.has_queued_response_items_for_next_turn().await
|| self.has_trigger_turn_mailbox_items().await
{
return false;
}
let Some(extension) = self.runtime_extension() else {
return false;
};
let handle = SessionRuntimeHandle::new(Arc::clone(self));
let background_turn = match extension.next_idle_background_turn(handle, reason).await {
Ok(background_turn) => background_turn,
Err(err) => {
warn!("runtime extension idle background provider failed: {err}");
return false;
}
};
let Some(background_turn) = background_turn else {
return false;
};
self.try_start_idle_background_turn(background_turn.items)
.await
}
async fn submit_pending_work_wakeup(&self, sub_id: String) -> bool {
self.tx_sub
.send(Submission {
@@ -816,13 +852,8 @@ impl Session {
}
let sess = Arc::clone(self);
tokio::spawn(async move {
sess.maybe_start_turn_for_pending_work().await;
if let Err(err) = sess
.apply_runtime_extension_event(SessionRuntimeEvent::MaybeContinueIfIdle)
.await
{
warn!("failed to apply runtime extension maybe-continue event: {err}");
}
sess.maybe_start_extension_background_turn(SessionIdleReason::TurnCompleted)
.await;
});
}
}