feat: add turn lifecycle contributors (#22480)

## Why

Extensions can already contribute prompt, tool, turn-item, and
thread-lifecycle behavior, but there was no explicit host-owned hook for
per-turn setup and cleanup. That makes extension-private turn state
awkward: an extension either has to stash it outside the turn lifecycle
or depend on core runtime objects.

This adds a small turn lifecycle boundary. Extensions receive stable
identifiers plus the existing session, thread, and turn `ExtensionData`
stores, while core keeps owning task scheduling, cancellation, and turn
teardown.

## What Changed

- Added `TurnLifecycleContributor` with `on_turn_start`, `on_turn_stop`,
and `on_turn_abort` callbacks in `codex-rs/ext/extension-api`.
- Added typed `TurnStartInput`, `TurnStopInput`, and `TurnAbortInput`
payloads that expose `thread_id`, `turn_id`, `session_store`,
`thread_store`, and `turn_store`.
- Registered and re-exported turn lifecycle contributors through
`ExtensionRegistry` and `ExtensionRegistryBuilder`.
- Wired `Session` to emit turn start, stop, and abort callbacks from the
existing turn/task lifecycle paths.
- Carried the turn-scoped `ExtensionData` through `RunningTask` and
`RemovedTask` so stop/abort callbacks receive the same turn store
created at turn start.

## Verification

- Not run locally.
This commit is contained in:
jif-oai
2026-05-13 13:47:27 +02:00
committed by GitHub
parent e831db7a96
commit 27e67a8c2a
7 changed files with 180 additions and 2 deletions

View File

@@ -1,5 +1,6 @@
//! Turn-scoped state and active turn metadata scaffolding.
use codex_extension_api::ExtensionData;
use codex_sandboxing::policy_transforms::merge_permission_profiles;
use indexmap::IndexMap;
use std::collections::HashMap;
@@ -75,6 +76,7 @@ pub(crate) struct RunningTask {
pub(crate) cancellation_token: CancellationToken,
pub(crate) handle: AbortOnDropHandle<()>,
pub(crate) turn_context: Arc<TurnContext>,
pub(crate) turn_extension_data: Arc<ExtensionData>,
// Timer recorded when the task drops to capture the full turn duration.
pub(crate) _timer: Option<codex_otel::Timer>,
}
@@ -82,6 +84,7 @@ pub(crate) struct RunningTask {
pub(crate) struct RemovedTask {
pub(crate) records_turn_token_usage_on_span: bool,
pub(crate) active_turn_is_empty: bool,
pub(crate) turn_extension_data: Arc<ExtensionData>,
}
impl ActiveTurn {
@@ -97,6 +100,7 @@ impl ActiveTurn {
Some(RemovedTask {
records_turn_token_usage_on_span,
active_turn_is_empty: self.tasks.is_empty(),
turn_extension_data: task.turn_extension_data,
})
}
@@ -120,6 +124,7 @@ pub(crate) struct TurnState {
pub(crate) tool_calls: u64,
pub(crate) has_memory_citation: bool,
pub(crate) token_usage_at_turn_start: TokenUsage,
pub(crate) extension_data: Arc<ExtensionData>,
}
pub(crate) struct PendingRequestPermissions {

View File

@@ -0,0 +1,57 @@
use codex_extension_api::ExtensionData;
use codex_protocol::protocol::TurnAbortReason;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
impl Session {
pub(super) fn emit_turn_start_lifecycle(
&self,
turn_context: &TurnContext,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_start(codex_extension_api::TurnStartInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
}
}
pub(super) fn emit_turn_stop_lifecycle(
&self,
turn_context: &TurnContext,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_stop(codex_extension_api::TurnStopInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
}
}
pub(super) fn emit_turn_abort_lifecycle(
&self,
turn_context: &TurnContext,
reason: TurnAbortReason,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_abort(codex_extension_api::TurnAbortInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
reason: reason.clone(),
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
}
}
}

View File

@@ -1,4 +1,5 @@
mod compact;
mod lifecycle;
mod regular;
mod review;
mod user_shell;
@@ -345,7 +346,7 @@ impl Session {
debug_assert!(turn.tasks.is_empty());
Arc::clone(&turn.turn_state)
};
{
let turn_extension_data = {
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
@@ -354,7 +355,9 @@ impl Session {
for item in mailbox_items {
turn_state.push_pending_input(item);
}
}
Arc::clone(&turn_state.extension_data)
};
self.emit_turn_start_lifecycle(turn_context.as_ref(), turn_extension_data.as_ref());
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
@@ -425,6 +428,7 @@ impl Session {
task,
cancellation_token,
turn_context: Arc::clone(&turn_context),
turn_extension_data,
_timer: timer,
};
turn.add_task(running_task);
@@ -476,10 +480,14 @@ impl Session {
let mut aborted_turn = false;
let mut active_turn_to_clear = None;
let mut turn_context = None;
let mut turn_extension_data = None;
if let Some(mut active_turn) = self.take_active_turn().await {
let tasks = active_turn.drain_tasks();
aborted_turn = !tasks.is_empty();
turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context));
turn_extension_data = tasks
.first()
.map(|task| Arc::clone(&task.turn_extension_data));
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
@@ -488,6 +496,11 @@ impl Session {
}
}
if let Some(turn_context) = turn_context.as_deref()
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_abort_lifecycle(turn_context, reason.clone(), turn_extension_data);
}
if (aborted_turn || reason == TurnAbortReason::Interrupted)
&& let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
@@ -530,9 +543,17 @@ impl Session {
let tasks = active_turn.drain_tasks();
let turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context));
let turn_extension_data = tasks
.first()
.map(|task| Arc::clone(&task.turn_extension_data));
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
if let Some(turn_context) = turn_context.as_deref()
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_abort_lifecycle(turn_context, reason.clone(), turn_extension_data);
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
turn_context: turn_context.as_deref(),
@@ -568,6 +589,7 @@ impl Session {
let mut turn_had_memory_citation = false;
let mut turn_tool_calls = 0_u64;
let mut records_turn_token_usage_on_span = false;
let mut turn_extension_data = None;
let turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
@@ -576,6 +598,7 @@ impl Session {
records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span;
if removed_task.active_turn_is_empty {
should_clear_active_turn = true;
turn_extension_data = Some(removed_task.turn_extension_data);
let turn_state = Arc::clone(&at.turn_state);
Some(turn_state)
} else {
@@ -733,6 +756,11 @@ impl Session {
.turn_timing_state
.time_to_first_token_ms()
.await;
if should_clear_active_turn
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_stop_lifecycle(turn_context.as_ref(), turn_extension_data);
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TurnFinished {
turn_context: turn_context.as_ref(),

View File

@@ -9,6 +9,7 @@ use crate::ExtensionData;
mod prompt;
mod thread_lifecycle;
mod tools;
mod turn_lifecycle;
pub use prompt::PromptFragment;
pub use prompt::PromptSlot;
@@ -18,6 +19,9 @@ pub use thread_lifecycle::ThreadStopInput;
pub use tools::ExtensionToolExecutor;
pub use tools::ExtensionToolFuture;
pub use tools::ExtensionToolOutput;
pub use turn_lifecycle::TurnAbortInput;
pub use turn_lifecycle::TurnStartInput;
pub use turn_lifecycle::TurnStopInput;
/// Extension contribution that adds prompt fragments during prompt assembly.
pub trait ContextContributor: Send + Sync {
@@ -45,6 +49,23 @@ pub trait ThreadLifecycleContributor<C>: Send + Sync {
fn on_thread_stop(&self, _input: ThreadStopInput<'_>) {}
}
/// Contributor for host-owned turn lifecycle gates.
///
/// Implementations should use these callbacks to seed, observe, or clear
/// extension-private turn state. The host exposes stable identifiers and
/// extension stores instead of core runtime objects.
pub trait TurnLifecycleContributor: Send + Sync {
/// Called after turn-scoped extension stores are created, before the task
/// for the turn starts running.
fn on_turn_start(&self, _input: TurnStartInput<'_>) {}
/// Called before the host drops the completed turn runtime and turn store.
fn on_turn_stop(&self, _input: TurnStopInput<'_>) {}
/// Called after the host aborts a running turn.
fn on_turn_abort(&self, _input: TurnAbortInput<'_>) {}
}
/// Extension contribution that exposes native tools owned by a feature.
pub trait ToolContributor: Send + Sync {
/// Returns the native tools visible for the supplied extension stores.

View File

@@ -0,0 +1,48 @@
use codex_protocol::ThreadId;
use codex_protocol::protocol::TurnAbortReason;
use crate::ExtensionData;
/// Input supplied when the host starts a turn.
pub struct TurnStartInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is starting.
pub turn_id: &'a str,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}
/// Input supplied when the host completes a turn.
pub struct TurnStopInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is stopping.
pub turn_id: &'a str,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}
/// Input supplied when the host aborts a turn.
pub struct TurnAbortInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is aborting.
pub turn_id: &'a str,
/// Reason the host aborted the turn.
pub reason: TurnAbortReason,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}

View File

@@ -26,8 +26,12 @@ pub use contributors::ThreadResumeInput;
pub use contributors::ThreadStartInput;
pub use contributors::ThreadStopInput;
pub use contributors::ToolContributor;
pub use contributors::TurnAbortInput;
pub use contributors::TurnItemContributionFuture;
pub use contributors::TurnItemContributor;
pub use contributors::TurnLifecycleContributor;
pub use contributors::TurnStartInput;
pub use contributors::TurnStopInput;
pub use registry::ExtensionRegistry;
pub use registry::ExtensionRegistryBuilder;
pub use registry::empty_extension_registry;

View File

@@ -7,10 +7,12 @@ use crate::ExtensionData;
use crate::ThreadLifecycleContributor;
use crate::ToolContributor;
use crate::TurnItemContributor;
use crate::TurnLifecycleContributor;
/// Mutable registry used while hosts register typed runtime contributions.
pub struct ExtensionRegistryBuilder<C> {
thread_lifecycle_contributors: Vec<Arc<dyn ThreadLifecycleContributor<C>>>,
turn_lifecycle_contributors: Vec<Arc<dyn TurnLifecycleContributor>>,
context_contributors: Vec<Arc<dyn ContextContributor>>,
tool_contributors: Vec<Arc<dyn ToolContributor>>,
turn_item_contributors: Vec<Arc<dyn TurnItemContributor>>,
@@ -21,6 +23,7 @@ impl<C> Default for ExtensionRegistryBuilder<C> {
fn default() -> Self {
Self {
thread_lifecycle_contributors: Vec::new(),
turn_lifecycle_contributors: Vec::new(),
approval_review_contributors: Vec::new(),
context_contributors: Vec::new(),
tool_contributors: Vec::new(),
@@ -48,6 +51,11 @@ impl<C> ExtensionRegistryBuilder<C> {
self.thread_lifecycle_contributors.push(contributor);
}
/// Registers one turn-lifecycle contributor.
pub fn turn_lifecycle_contributor(&mut self, contributor: Arc<dyn TurnLifecycleContributor>) {
self.turn_lifecycle_contributors.push(contributor);
}
/// Registers one prompt contributor.
pub fn prompt_contributor(&mut self, contributor: Arc<dyn ContextContributor>) {
self.context_contributors.push(contributor);
@@ -67,6 +75,7 @@ impl<C> ExtensionRegistryBuilder<C> {
pub fn build(self) -> ExtensionRegistry<C> {
ExtensionRegistry {
thread_lifecycle_contributors: self.thread_lifecycle_contributors,
turn_lifecycle_contributors: self.turn_lifecycle_contributors,
approval_review_contributors: self.approval_review_contributors,
context_contributors: self.context_contributors,
tool_contributors: self.tool_contributors,
@@ -78,6 +87,7 @@ impl<C> ExtensionRegistryBuilder<C> {
/// Immutable typed registry produced after extensions are installed.
pub struct ExtensionRegistry<C> {
thread_lifecycle_contributors: Vec<Arc<dyn ThreadLifecycleContributor<C>>>,
turn_lifecycle_contributors: Vec<Arc<dyn TurnLifecycleContributor>>,
context_contributors: Vec<Arc<dyn ContextContributor>>,
tool_contributors: Vec<Arc<dyn ToolContributor>>,
turn_item_contributors: Vec<Arc<dyn TurnItemContributor>>,
@@ -90,6 +100,11 @@ impl<C> ExtensionRegistry<C> {
&self.thread_lifecycle_contributors
}
/// Returns the registered turn-lifecycle contributors.
pub fn turn_lifecycle_contributors(&self) -> &[Arc<dyn TurnLifecycleContributor>] {
&self.turn_lifecycle_contributors
}
/// Claims the first rendered approval-review prompt accepted by an
/// installed contributor.
pub fn approval_review<'a>(