feat: move extension scope ids into ExtensionData (#22490)

## Summary
- add a scoped level_id to ExtensionData and expose it through
level_id()
- remove thread_id/turn_id parameters from extension contributor inputs
where the scoped ExtensionData already carries that identity
- move turn-scoped extension data onto TurnContext so token usage and
lifecycle contributors can share the same turn store

## Testing
- cargo check -p codex-extension-api -p codex-core --tests
- cargo test -p codex-extension-api
- cargo test -p codex-guardian
- cargo test -p codex-core --lib
record_token_usage_info_notifies_extension_contributors
- cargo test -p codex-core --lib
submission_loop_channel_close_emits_thread_stop_lifecycle
- cargo test -p codex-core --lib
submission_loop_channel_close_aborts_active_turn_before_thread_stop_lifecycle
- just fix -p codex-extension-api
- just fix -p codex-guardian
- just fix -p codex-core
- just fmt

## Note
- Attempted cargo test -p codex-core; it aborted in
agent::control::tests::spawn_agent_fork_last_n_turns_keeps_only_recent_turns
with the existing stack overflow before the full suite completed.
This commit is contained in:
jif-oai
2026-05-13 16:13:16 +02:00
committed by GitHub
parent 99157f3797
commit 1dcc89f1d4
16 changed files with 77 additions and 107 deletions

View File

@@ -149,7 +149,6 @@ impl CodexThread {
.thread_lifecycle_contributors()
{
contributor.on_thread_resume(codex_extension_api::ThreadResumeInput {
thread_id: self.codex.session.conversation_id,
session_store: &self.codex.session.services.session_extension_data,
thread_store: &self.codex.session.services.thread_extension_data,
});

View File

@@ -635,7 +635,6 @@ async fn shutdown_session_runtime(sess: &Arc<Session>) {
fn emit_thread_stop_lifecycle(sess: &Session) {
for contributor in sess.services.extensions.thread_lifecycle_contributors() {
contributor.on_thread_stop(codex_extension_api::ThreadStopInput {
thread_id: sess.conversation_id,
session_store: &sess.services.session_extension_data,
thread_store: &sess.services.thread_extension_data,
});

View File

@@ -2869,8 +2869,7 @@ impl Session {
contributor.on_token_usage(
&self.services.session_extension_data,
&self.services.thread_extension_data,
self.conversation_id,
&turn_context.sub_id,
turn_context.extension_data.as_ref(),
token_info,
);
}

View File

@@ -113,7 +113,7 @@ pub(super) async fn spawn_review_thread(
));
let review_turn_context = TurnContext {
sub_id: review_turn_id,
sub_id: review_turn_id.clone(),
trace_id: current_span_trace_id(),
realtime_active: parent_turn_context.realtime_active,
config: per_turn_config,
@@ -149,6 +149,7 @@ pub(super) async fn spawn_review_thread(
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
truncation_policy: model_info.truncation_policy.into(),
turn_metadata_state,
extension_data: Arc::new(codex_extension_api::ExtensionData::new(review_turn_id)),
turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()),
turn_timing_state: Arc::new(TurnTimingState::default()),
server_model_warning_emitted: AtomicBool::new(false),

View File

@@ -811,11 +811,12 @@ impl Session {
SessionId::from(thread_id)
};
let agent_control = agent_control.with_session_id(session_id);
let session_extension_data = codex_extension_api::ExtensionData::new();
let thread_extension_data = codex_extension_api::ExtensionData::new();
let session_extension_data =
codex_extension_api::ExtensionData::new(session_id.to_string());
let thread_extension_data =
codex_extension_api::ExtensionData::new(thread_id.to_string());
for contributor in extensions.thread_lifecycle_contributors() {
contributor.on_thread_start(codex_extension_api::ThreadStartInput {
thread_id,
config: config.as_ref(),
session_store: &session_extension_data,
thread_store: &thread_extension_data,

View File

@@ -1807,8 +1807,9 @@ async fn record_token_usage_info_notifies_extension_contributors() {
#[derive(Debug, PartialEq, Eq)]
struct RecordedTokenUsage {
thread_id: ThreadId,
turn_id: String,
session_level_id: String,
thread_level_id: String,
turn_level_id: String,
token_usage: TokenUsageInfo,
saw_session_store: bool,
saw_thread_store: bool,
@@ -1823,16 +1824,16 @@ async fn record_token_usage_info_notifies_extension_contributors() {
&self,
session_store: &codex_extension_api::ExtensionData,
thread_store: &codex_extension_api::ExtensionData,
thread_id: ThreadId,
turn_id: &str,
turn_store: &codex_extension_api::ExtensionData,
token_usage: &TokenUsageInfo,
) {
self.records
.lock()
.expect("token usage records lock")
.push(RecordedTokenUsage {
thread_id,
turn_id: turn_id.to_string(),
session_level_id: session_store.level_id().to_string(),
thread_level_id: thread_store.level_id().to_string(),
turn_level_id: turn_store.level_id().to_string(),
token_usage: token_usage.clone(),
saw_session_store: session_store.get::<SessionTokenUsageMarker>().is_some(),
saw_thread_store: thread_store.get::<ThreadTokenUsageMarker>().is_some(),
@@ -1882,8 +1883,9 @@ async fn record_token_usage_info_notifies_extension_contributors() {
expected_total_usage.add_assign(&second_usage);
let expected = vec![
RecordedTokenUsage {
thread_id: session.conversation_id,
turn_id: turn_context.sub_id.clone(),
session_level_id: session.session_id().to_string(),
thread_level_id: session.conversation_id.to_string(),
turn_level_id: turn_context.sub_id.clone(),
token_usage: TokenUsageInfo {
total_token_usage: first_usage.clone(),
last_token_usage: first_usage,
@@ -1893,8 +1895,9 @@ async fn record_token_usage_info_notifies_extension_contributors() {
saw_thread_store: true,
},
RecordedTokenUsage {
thread_id: session.conversation_id,
turn_id: turn_context.sub_id.clone(),
session_level_id: session.session_id().to_string(),
thread_level_id: session.conversation_id.to_string(),
turn_level_id: turn_context.sub_id.clone(),
token_usage: TokenUsageInfo {
total_token_usage: expected_total_usage,
last_token_usage: second_usage,
@@ -3981,8 +3984,10 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
plugins_manager,
mcp_manager,
extensions: Arc::new(codex_extension_api::ExtensionRegistryBuilder::new().build()),
session_extension_data: codex_extension_api::ExtensionData::new(),
thread_extension_data: codex_extension_api::ExtensionData::new(),
session_extension_data: codex_extension_api::ExtensionData::new(
agent_control.session_id().to_string(),
),
thread_extension_data: codex_extension_api::ExtensionData::new(thread_id.to_string()),
agent_control,
network_proxy: None,
network_approval: Arc::clone(&network_approval),
@@ -5320,7 +5325,10 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() {
impl codex_extension_api::ThreadLifecycleContributor<crate::config::Config> for ThreadStopRecorder {
fn on_thread_stop(&self, input: codex_extension_api::ThreadStopInput<'_>) {
assert_eq!(self.expected_thread_id, input.thread_id);
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
);
assert!(input.session_store.get::<SessionStopMarker>().is_some());
assert!(input.thread_store.get::<ThreadStopMarker>().is_some());
self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
@@ -5362,7 +5370,10 @@ async fn submission_loop_channel_close_aborts_active_turn_before_thread_stop_lif
impl codex_extension_api::ThreadLifecycleContributor<crate::config::Config> for LifecycleRecorder {
fn on_thread_stop(&self, input: codex_extension_api::ThreadStopInput<'_>) {
assert_eq!(self.expected_thread_id, input.thread_id);
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
);
self.calls
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
@@ -5372,8 +5383,11 @@ async fn submission_loop_channel_close_aborts_active_turn_before_thread_stop_lif
impl codex_extension_api::TurnLifecycleContributor for LifecycleRecorder {
fn on_turn_abort(&self, input: codex_extension_api::TurnAbortInput<'_>) {
assert_eq!(self.expected_thread_id, input.thread_id);
assert_eq!(self.expected_turn_id, input.turn_id);
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
);
assert_eq!(self.expected_turn_id, input.turn_store.level_id());
assert_eq!(TurnAbortReason::Interrupted, input.reason);
self.calls
.lock()
@@ -5811,8 +5825,10 @@ where
plugins_manager,
mcp_manager,
extensions: Arc::new(codex_extension_api::ExtensionRegistryBuilder::new().build()),
session_extension_data: codex_extension_api::ExtensionData::new(),
thread_extension_data: codex_extension_api::ExtensionData::new(),
session_extension_data: codex_extension_api::ExtensionData::new(
agent_control.session_id().to_string(),
),
thread_extension_data: codex_extension_api::ExtensionData::new(thread_id.to_string()),
agent_control,
network_proxy: None,
network_approval: Arc::clone(&network_approval),

View File

@@ -92,6 +92,7 @@ pub struct TurnContext {
pub(crate) truncation_policy: TruncationPolicy,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
pub(crate) turn_metadata_state: Arc<TurnMetadataState>,
pub(crate) extension_data: Arc<codex_extension_api::ExtensionData>,
pub(crate) turn_skills: TurnSkillsContext,
pub(crate) turn_timing_state: Arc<TurnTimingState>,
pub(crate) server_model_warning_emitted: AtomicBool,
@@ -274,6 +275,7 @@ impl TurnContext {
truncation_policy,
dynamic_tools: self.dynamic_tools.clone(),
turn_metadata_state: self.turn_metadata_state.clone(),
extension_data: Arc::clone(&self.extension_data),
turn_skills: self.turn_skills.clone(),
turn_timing_state: Arc::clone(&self.turn_timing_state),
server_model_warning_emitted: AtomicBool::new(
@@ -535,6 +537,7 @@ impl Session {
network.is_some(),
));
let (current_date, timezone) = local_time_context();
let extension_data = Arc::new(codex_extension_api::ExtensionData::new(sub_id.clone()));
TurnContext {
sub_id,
trace_id: current_span_trace_id(),
@@ -572,6 +575,7 @@ impl Session {
truncation_policy: model_info.truncation_policy.into(),
dynamic_tools: session_configuration.dynamic_tools.clone(),
turn_metadata_state,
extension_data,
turn_skills: TurnSkillsContext::new(skills_outcome),
turn_timing_state: Arc::new(TurnTimingState::default()),
server_model_warning_emitted: AtomicBool::new(false),

View File

@@ -1,6 +1,5 @@
//! Turn-scoped state and active turn metadata scaffolding.
use codex_extension_api::ExtensionData;
use codex_sandboxing::policy_transforms::merge_permission_profiles;
use indexmap::IndexMap;
use std::collections::HashMap;
@@ -76,7 +75,6 @@ pub(crate) struct RunningTask {
pub(crate) cancellation_token: CancellationToken,
pub(crate) handle: AbortOnDropHandle<()>,
pub(crate) turn_context: Arc<TurnContext>,
pub(crate) turn_extension_data: Arc<ExtensionData>,
// Timer recorded when the task drops to capture the full turn duration.
pub(crate) _timer: Option<codex_otel::Timer>,
}
@@ -84,7 +82,6 @@ pub(crate) struct RunningTask {
pub(crate) struct RemovedTask {
pub(crate) records_turn_token_usage_on_span: bool,
pub(crate) active_turn_is_empty: bool,
pub(crate) turn_extension_data: Arc<ExtensionData>,
}
impl ActiveTurn {
@@ -100,7 +97,6 @@ impl ActiveTurn {
Some(RemovedTask {
records_turn_token_usage_on_span,
active_turn_is_empty: self.tasks.is_empty(),
turn_extension_data: task.turn_extension_data,
})
}
@@ -124,7 +120,6 @@ pub(crate) struct TurnState {
pub(crate) tool_calls: u64,
pub(crate) has_memory_citation: bool,
pub(crate) token_usage_at_turn_start: TokenUsage,
pub(crate) extension_data: Arc<ExtensionData>,
}
pub(crate) struct PendingRequestPermissions {

View File

@@ -2,18 +2,11 @@ use codex_extension_api::ExtensionData;
use codex_protocol::protocol::TurnAbortReason;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
impl Session {
pub(super) fn emit_turn_start_lifecycle(
&self,
turn_context: &TurnContext,
turn_store: &ExtensionData,
) {
pub(super) fn emit_turn_start_lifecycle(&self, turn_store: &ExtensionData) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_start(codex_extension_api::TurnStartInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
@@ -21,15 +14,9 @@ impl Session {
}
}
pub(super) fn emit_turn_stop_lifecycle(
&self,
turn_context: &TurnContext,
turn_store: &ExtensionData,
) {
pub(super) fn emit_turn_stop_lifecycle(&self, turn_store: &ExtensionData) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_stop(codex_extension_api::TurnStopInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
@@ -39,14 +26,11 @@ impl Session {
pub(super) fn emit_turn_abort_lifecycle(
&self,
turn_context: &TurnContext,
reason: TurnAbortReason,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_abort(codex_extension_api::TurnAbortInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
reason: reason.clone(),
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,

View File

@@ -346,7 +346,7 @@ impl Session {
debug_assert!(turn.tasks.is_empty());
Arc::clone(&turn.turn_state)
};
let turn_extension_data = {
{
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
@@ -355,9 +355,8 @@ impl Session {
for item in mailbox_items {
turn_state.push_pending_input(item);
}
Arc::clone(&turn_state.extension_data)
};
self.emit_turn_start_lifecycle(turn_context.as_ref(), turn_extension_data.as_ref());
}
self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref());
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
@@ -428,7 +427,6 @@ impl Session {
task,
cancellation_token,
turn_context: Arc::clone(&turn_context),
turn_extension_data,
_timer: timer,
};
turn.add_task(running_task);
@@ -480,14 +478,10 @@ impl Session {
let mut aborted_turn = false;
let mut active_turn_to_clear = None;
let mut turn_context = None;
let mut turn_extension_data = None;
if let Some(mut active_turn) = self.take_active_turn().await {
let tasks = active_turn.drain_tasks();
aborted_turn = !tasks.is_empty();
turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context));
turn_extension_data = tasks
.first()
.map(|task| Arc::clone(&task.turn_extension_data));
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
@@ -496,10 +490,8 @@ impl Session {
}
}
if let Some(turn_context) = turn_context.as_deref()
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_abort_lifecycle(turn_context, reason.clone(), turn_extension_data);
if let Some(turn_context) = turn_context.as_deref() {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref());
}
if (aborted_turn || reason == TurnAbortReason::Interrupted)
&& let Err(err) = self
@@ -543,16 +535,11 @@ impl Session {
let tasks = active_turn.drain_tasks();
let turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context));
let turn_extension_data = tasks
.first()
.map(|task| Arc::clone(&task.turn_extension_data));
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
if let Some(turn_context) = turn_context.as_deref()
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_abort_lifecycle(turn_context, reason.clone(), turn_extension_data);
if let Some(turn_context) = turn_context.as_deref() {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref());
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
@@ -589,7 +576,6 @@ impl Session {
let mut turn_had_memory_citation = false;
let mut turn_tool_calls = 0_u64;
let mut records_turn_token_usage_on_span = false;
let mut turn_extension_data = None;
let turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
@@ -598,7 +584,6 @@ impl Session {
records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span;
if removed_task.active_turn_is_empty {
should_clear_active_turn = true;
turn_extension_data = Some(removed_task.turn_extension_data);
let turn_state = Arc::clone(&at.turn_state);
Some(turn_state)
} else {
@@ -756,10 +741,8 @@ impl Session {
.turn_timing_state
.time_to_first_token_ms()
.await;
if should_clear_active_turn
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_stop_lifecycle(turn_context.as_ref(), turn_extension_data);
if should_clear_active_turn {
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref());
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TurnFinished {

View File

@@ -13,9 +13,9 @@ fn main() {
let registry = builder.build();
// 2. The host decides which stores are shared.
let session_store = ExtensionData::new();
let first_thread_store = ExtensionData::new();
let second_thread_store = ExtensionData::new();
let session_store = ExtensionData::new("session");
let first_thread_store = ExtensionData::new("thread-1");
let second_thread_store = ExtensionData::new("thread-2");
// 3. Reusing the same session store shares session state across threads.
let first_thread_fragments = contribute_prompt(&registry, &session_store, &first_thread_store);

View File

@@ -1,7 +1,6 @@
use std::future::Future;
use std::sync::Arc;
use codex_protocol::ThreadId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::TokenUsageInfo;
@@ -79,8 +78,7 @@ pub trait TokenUsageContributor: Send + Sync {
&self,
_session_store: &ExtensionData,
_thread_store: &ExtensionData,
_thread_id: ThreadId,
_turn_id: &str,
_turn_store: &ExtensionData,
_token_usage: &TokenUsageInfo,
) {
}

View File

@@ -1,11 +1,7 @@
use codex_protocol::ThreadId;
use crate::ExtensionData;
/// Input supplied when the host starts a runtime for a thread.
pub struct ThreadStartInput<'a, C> {
/// Identifier for the thread whose runtime is starting.
pub thread_id: ThreadId,
/// Host configuration visible at thread start.
pub config: &'a C,
/// Store scoped to the host session runtime.
@@ -16,8 +12,6 @@ pub struct ThreadStartInput<'a, C> {
/// Input supplied when the host resumes an existing thread.
pub struct ThreadResumeInput<'a> {
/// Identifier for the thread being resumed.
pub thread_id: ThreadId,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
@@ -26,8 +20,6 @@ pub struct ThreadResumeInput<'a> {
/// Input supplied when the host stops a thread runtime.
pub struct ThreadStopInput<'a> {
/// Identifier for the thread whose runtime is stopping.
pub thread_id: ThreadId,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.

View File

@@ -1,14 +1,9 @@
use codex_protocol::ThreadId;
use codex_protocol::protocol::TurnAbortReason;
use crate::ExtensionData;
/// Input supplied when the host starts a turn.
pub struct TurnStartInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is starting.
pub turn_id: &'a str,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
@@ -19,10 +14,6 @@ pub struct TurnStartInput<'a> {
/// Input supplied when the host completes a turn.
pub struct TurnStopInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is stopping.
pub turn_id: &'a str,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
@@ -33,10 +24,6 @@ pub struct TurnStopInput<'a> {
/// Input supplied when the host aborts a turn.
pub struct TurnAbortInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is aborting.
pub turn_id: &'a str,
/// Reason the host aborted the turn.
pub reason: TurnAbortReason,
/// Store scoped to the host session runtime.

View File

@@ -8,15 +8,24 @@ use std::sync::PoisonError;
type ErasedData = Arc<dyn Any + Send + Sync>;
/// Typed extension-owned data attached to one host object.
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct ExtensionData {
level_id: String,
entries: Mutex<HashMap<TypeId, ErasedData>>,
}
impl ExtensionData {
/// Creates an empty attachment map.
pub fn new() -> Self {
Self::default()
/// Creates an empty attachment map for one host-owned scope.
pub fn new(level_id: impl Into<String>) -> Self {
Self {
level_id: level_id.into(),
entries: Mutex::new(HashMap::new()),
}
}
/// Returns the host identity for the scope this data is attached to.
pub fn level_id(&self) -> &str {
&self.level_id
}
/// Returns the attached value of type `T`, if one exists.

View File

@@ -52,8 +52,11 @@ where
S: Send + Sync,
{
fn on_thread_start(&self, input: ThreadStartInput<'_, Config>) {
let Ok(forked_from_thread_id) = ThreadId::from_string(input.thread_store.level_id()) else {
return;
};
input.thread_store.insert(GuardianThreadContext {
forked_from_thread_id: input.thread_id,
forked_from_thread_id,
});
}
}