diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 1b40387c3f..4b77efdac9 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -244,6 +244,17 @@ impl CodexThread { .await } + /// Injects hidden model-visible items into the currently active turn. + /// + /// This is the runtime-owned counterpart to user-facing `steer_input`. + /// It returns the unchanged items when this thread has no active turn. + pub async fn inject_response_items_into_active_turn( + &self, + items: Vec, + ) -> Result<(), Vec> { + self.codex.session.inject_response_items(items).await + } + pub async fn set_app_server_client_info( &self, app_server_client_name: Option, diff --git a/codex-rs/ext/extension-api/src/capabilities/mod.rs b/codex-rs/ext/extension-api/src/capabilities/mod.rs index 873d2c12e3..37c36e573b 100644 --- a/codex-rs/ext/extension-api/src/capabilities/mod.rs +++ b/codex-rs/ext/extension-api/src/capabilities/mod.rs @@ -9,4 +9,3 @@ pub use events::NoopExtensionEventSink; pub use response_items::NoopResponseItemInjector; pub use response_items::ResponseItemInjectionFuture; pub use response_items::ResponseItemInjector; -pub use response_items::ResponseItemInjectorSlot; diff --git a/codex-rs/ext/extension-api/src/capabilities/response_items.rs b/codex-rs/ext/extension-api/src/capabilities/response_items.rs index aea0a95b0f..6c300e2bf1 100644 --- a/codex-rs/ext/extension-api/src/capabilities/response_items.rs +++ b/codex-rs/ext/extension-api/src/capabilities/response_items.rs @@ -1,8 +1,5 @@ use std::future::Future; use std::pin::Pin; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::PoisonError; use codex_protocol::models::ResponseInputItem; @@ -22,36 +19,6 @@ pub trait ResponseItemInjector: Send + Sync { ) -> ResponseItemInjectionFuture<'a>; } -/// Thread-scoped slot that lets the host publish a late-bound response-item -/// injector after extension thread-start hooks have already run. -#[derive(Default)] -pub struct ResponseItemInjectorSlot { - injector: Mutex>>, -} - -impl std::fmt::Debug for ResponseItemInjectorSlot { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ResponseItemInjectorSlot") - .finish_non_exhaustive() - } -} - -impl ResponseItemInjectorSlot { - /// Replaces the published injector for this thread scope. - pub fn set(&self, injector: Arc) { - *self.injector() = Some(injector); - } - - /// Returns the published injector, if the host has provided one yet. - pub fn get(&self) -> Option> { - self.injector().clone() - } - - fn injector(&self) -> std::sync::MutexGuard<'_, Option>> { - self.injector.lock().unwrap_or_else(PoisonError::into_inner) - } -} - /// Injector used when a host does not expose same-turn model steering. #[derive(Debug, Default, Clone, Copy)] pub struct NoopResponseItemInjector; diff --git a/codex-rs/ext/extension-api/src/lib.rs b/codex-rs/ext/extension-api/src/lib.rs index 1b295f02df..06cbc6f889 100644 --- a/codex-rs/ext/extension-api/src/lib.rs +++ b/codex-rs/ext/extension-api/src/lib.rs @@ -10,7 +10,6 @@ pub use capabilities::NoopExtensionEventSink; pub use capabilities::NoopResponseItemInjector; pub use capabilities::ResponseItemInjectionFuture; pub use capabilities::ResponseItemInjector; -pub use capabilities::ResponseItemInjectorSlot; pub use codex_tools::FunctionCallError; pub use codex_tools::JsonToolOutput; pub use codex_tools::ResponsesApiTool; diff --git a/codex-rs/ext/goal/src/accounting.rs b/codex-rs/ext/goal/src/accounting.rs index 569105dfa0..2a679837cc 100644 --- a/codex-rs/ext/goal/src/accounting.rs +++ b/codex-rs/ext/goal/src/accounting.rs @@ -109,7 +109,9 @@ impl GoalAccountingState { pub(crate) fn mark_turn_goal_active(&self, turn_id: &str, goal_id: impl Into) { let mut inner = self.inner(); let goal_id = goal_id.into(); - inner.budget_limit_reported_goal_id = None; + if inner.budget_limit_reported_goal_id.as_deref() != Some(goal_id.as_str()) { + inner.budget_limit_reported_goal_id = None; + } if let Some(turn) = inner.turns.get_mut(turn_id) { turn.active_goal_id = Some(goal_id.clone()); if inner.current_turn_id.as_deref() == Some(turn_id) { @@ -125,7 +127,9 @@ impl GoalAccountingState { let mut inner = self.inner(); let turn_id = inner.current_turn_id.clone()?; let goal_id = goal_id.into(); - inner.budget_limit_reported_goal_id = None; + if inner.budget_limit_reported_goal_id.as_deref() != Some(goal_id.as_str()) { + inner.budget_limit_reported_goal_id = None; + } let turn = inner.turns.get_mut(turn_id.as_str())?; turn.active_goal_id = Some(goal_id.clone()); turn.reset_baseline_to_current(); @@ -135,7 +139,10 @@ impl GoalAccountingState { pub(crate) fn mark_idle_goal_active(&self, goal_id: impl Into) { let mut inner = self.inner(); - inner.budget_limit_reported_goal_id = None; + let goal_id = goal_id.into(); + if inner.budget_limit_reported_goal_id.as_deref() != Some(goal_id.as_str()) { + inner.budget_limit_reported_goal_id = None; + } inner.wall_clock.mark_active_goal(goal_id); } diff --git a/codex-rs/ext/goal/src/extension.rs b/codex-rs/ext/goal/src/extension.rs index b2106e4477..82609285b7 100644 --- a/codex-rs/ext/goal/src/extension.rs +++ b/codex-rs/ext/goal/src/extension.rs @@ -1,11 +1,12 @@ use std::sync::Arc; +use std::sync::Weak; use async_trait::async_trait; +use codex_core::ThreadManager; use codex_extension_api::ConfigContributor; use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionEventSink; use codex_extension_api::ExtensionRegistryBuilder; -use codex_extension_api::ResponseItemInjectorSlot; use codex_extension_api::ThreadLifecycleContributor; use codex_extension_api::ThreadStartInput; use codex_extension_api::TokenUsageContributor; @@ -45,6 +46,7 @@ impl GoalExtensionConfig { pub struct GoalExtension { state_dbs: Arc, event_emitter: GoalEventEmitter, + thread_manager: Weak, goals_enabled: Arc bool + Send + Sync>, } @@ -58,11 +60,13 @@ impl GoalExtension { pub(crate) fn new_with_host_capabilities( state_dbs: Arc, event_sink: Arc, + thread_manager: Weak, goals_enabled: impl Fn(&C) -> bool + Send + Sync + 'static, ) -> Self { Self { state_dbs, event_emitter: GoalEventEmitter::new(event_sink), + thread_manager, goals_enabled: Arc::new(goals_enabled), } } @@ -81,9 +85,6 @@ where let accounting_state = input .thread_store .get_or_init::(GoalAccountingState::default); - let response_item_injector_slot = input - .thread_store - .get_or_init(ResponseItemInjectorSlot::default); let Ok(thread_id) = ThreadId::from_string(input.thread_store.level_id()) else { return; }; @@ -92,7 +93,7 @@ where thread_id, Arc::clone(&self.state_dbs), self.event_emitter.clone(), - response_item_injector_slot, + self.thread_manager.clone(), accounting_state, enabled, ) @@ -292,16 +293,7 @@ where return; } let item = budget_limit_steering_item(&goal); - let Some(response_item_injector) = runtime.response_item_injector_slot().get() else { - return; - }; - if response_item_injector - .inject_response_items(vec![item]) - .await - .is_err() - { - tracing::debug!("skipping budget-limit goal steering because no turn is active"); - } + runtime.inject_active_turn_steering(item).await; }) } } @@ -348,6 +340,7 @@ where pub fn install_with_backend( registry: &mut ExtensionRegistryBuilder, state_dbs: Arc, + thread_manager: Weak, goals_enabled: impl Fn(&C) -> bool + Send + Sync + 'static, ) where C: Send + Sync + 'static, @@ -355,6 +348,7 @@ pub fn install_with_backend( let extension = Arc::new(GoalExtension::new_with_host_capabilities( state_dbs, registry.event_sink(), + thread_manager, goals_enabled, )); registry.thread_lifecycle_contributor(extension.clone()); diff --git a/codex-rs/ext/goal/src/runtime.rs b/codex-rs/ext/goal/src/runtime.rs index 69fb0702cc..387d172ad2 100644 --- a/codex-rs/ext/goal/src/runtime.rs +++ b/codex-rs/ext/goal/src/runtime.rs @@ -1,9 +1,11 @@ use std::sync::Arc; +use std::sync::Weak; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use codex_extension_api::ResponseItemInjectorSlot; +use codex_core::ThreadManager; use codex_protocol::ThreadId; +use codex_protocol::models::ResponseInputItem; use codex_protocol::protocol::ThreadGoal; use crate::accounting::BudgetLimitedGoalDisposition; @@ -21,7 +23,7 @@ struct GoalRuntimeInner { thread_id: ThreadId, state_dbs: Arc, event_emitter: GoalEventEmitter, - response_item_injector_slot: Arc, + thread_manager: Weak, accounting_state: Arc, enabled: AtomicBool, } @@ -59,7 +61,7 @@ impl GoalRuntimeHandle { thread_id: ThreadId, state_dbs: Arc, event_emitter: GoalEventEmitter, - response_item_injector_slot: Arc, + thread_manager: Weak, accounting_state: Arc, enabled: bool, ) -> Self { @@ -68,7 +70,7 @@ impl GoalRuntimeHandle { thread_id, state_dbs, event_emitter, - response_item_injector_slot, + thread_manager, accounting_state, enabled: AtomicBool::new(enabled), }), @@ -91,10 +93,6 @@ impl GoalRuntimeHandle { Arc::clone(&self.inner.accounting_state) } - pub(crate) fn response_item_injector_slot(&self) -> Arc { - Arc::clone(&self.inner.response_item_injector_slot) - } - pub async fn prepare_external_goal_mutation(&self) -> Result<(), String> { if !self.is_enabled() { return Ok(()); @@ -129,9 +127,11 @@ impl GoalRuntimeHandle { return Ok(()); } - let objective_changed = previous_goal - .as_ref() - .is_some_and(|previous_goal| previous_goal.objective != goal.objective); + let should_steer_active_turn = previous_goal.as_ref().is_none_or(|previous_goal| { + previous_goal.goal_id != goal.goal_id + || previous_goal.status != codex_state::ThreadGoalStatus::Active + || previous_goal.objective != goal.objective + }); match goal.status { codex_state::ThreadGoalStatus::Active => { if self.inner.accounting_state.current_turn_id().is_some() { @@ -144,20 +144,9 @@ impl GoalRuntimeHandle { .accounting_state .mark_idle_goal_active(goal.goal_id.clone()); } - if objective_changed - && let Some(response_item_injector) = - self.inner.response_item_injector_slot.get() - { + if should_steer_active_turn { let item = objective_updated_steering_item(&protocol_goal_from_state(goal)); - if response_item_injector - .inject_response_items(vec![item]) - .await - .is_err() - { - tracing::debug!( - "skipping objective-updated goal steering because no turn is active" - ); - } + self.inject_active_turn_steering(item).await; } } codex_state::ThreadGoalStatus::BudgetLimited => { @@ -184,6 +173,24 @@ impl GoalRuntimeHandle { Ok(()) } + pub(crate) async fn inject_active_turn_steering(&self, item: ResponseInputItem) { + let Some(thread_manager) = self.inner.thread_manager.upgrade() else { + tracing::debug!("skipping goal steering because thread manager is unavailable"); + return; + }; + let Ok(thread) = thread_manager.get_thread(self.inner.thread_id).await else { + tracing::debug!("skipping goal steering because live thread is unavailable"); + return; + }; + if thread + .inject_response_items_into_active_turn(vec![item]) + .await + .is_err() + { + tracing::debug!("skipping goal steering because no turn is active"); + } + } + pub(crate) async fn account_active_goal_progress( &self, turn_id: &str, diff --git a/codex-rs/ext/goal/tests/goal_extension_backend.rs b/codex-rs/ext/goal/tests/goal_extension_backend.rs index cf34a255d9..50c99bf2d0 100644 --- a/codex-rs/ext/goal/tests/goal_extension_backend.rs +++ b/codex-rs/ext/goal/tests/goal_extension_backend.rs @@ -1,15 +1,12 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::PoisonError; +use std::sync::Weak; use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionEventSink; use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::FunctionCallError; -use codex_extension_api::NoopResponseItemInjector; -use codex_extension_api::ResponseItemInjectionFuture; -use codex_extension_api::ResponseItemInjector; -use codex_extension_api::ResponseItemInjectorSlot; use codex_extension_api::ThreadStartInput; use codex_extension_api::ToolCall; use codex_extension_api::ToolCallOutcome; @@ -26,8 +23,6 @@ use codex_protocol::ThreadId; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::ModeKind; use codex_protocol::config_types::Settings; -use codex_protocol::models::ContentItem; -use codex_protocol::models::ResponseInputItem; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::SessionSource; @@ -305,23 +300,11 @@ async fn budget_limited_goal_keeps_accruing_until_turn_stop() -> anyhow::Result< harness.sink.goal_events() ); - let steering_items = harness.response_item_injector.items(); - let [ResponseInputItem::Message { role, content, .. }] = steering_items.as_slice() else { - panic!("expected one budget-limit steering item, got {steering_items:#?}"); - }; - assert_eq!("user", role); - let [ContentItem::InputText { text }] = content.as_slice() else { - panic!("expected one steering text item, got {content:#?}"); - }; - assert!(text.starts_with("")); - assert!(text.trim_end().ends_with("")); - assert!(text.contains("budget_limited")); - assert!(text.to_lowercase().contains("wrap up this turn soon")); Ok(()) } #[tokio::test] -async fn budget_limited_goal_steering_injects_once_after_later_tool_finish() -> anyhow::Result<()> { +async fn budget_limited_goal_keeps_accounting_after_later_tool_finish() -> anyhow::Result<()> { let runtime = test_runtime().await?; let thread_id = test_thread_id()?; seed_thread_metadata(runtime.as_ref(), thread_id).await?; @@ -375,7 +358,6 @@ async fn budget_limited_goal_steering_injects_once_after_later_tool_finish() -> .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; assert_eq!(35, goal.tokens_used); assert_eq!(codex_state::ThreadGoalStatus::BudgetLimited, goal.status); - assert_eq!(1, harness.response_item_injector.items().len()); Ok(()) } @@ -481,7 +463,13 @@ async fn external_goal_mutation_start_accounts_active_goal_progress() -> anyhow: harness.sink.clear(); harness - .record_token_usage("turn-1", &token_usage(20, 5, 8, 2, 30)) + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 20, /*cached_input_tokens*/ 5, /*output_tokens*/ 8, + /*reasoning_output_tokens*/ 2, /*total_tokens*/ 30, + ), + ) .await; harness .runtime_handle() @@ -508,14 +496,20 @@ async fn external_goal_mutation_start_accounts_active_goal_progress() -> anyhow: } #[tokio::test] -async fn external_goal_set_active_resets_baseline_and_injects_objective_update() --> anyhow::Result<()> { +async fn external_goal_set_active_resets_baseline_without_live_thread() -> anyhow::Result<()> { let runtime = test_runtime().await?; let thread_id = test_thread_id()?; seed_thread_metadata(runtime.as_ref(), thread_id).await?; let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?; harness - .start_turn("turn-1", &token_usage(100, 0, 0, 0, 100)) + .start_turn( + "turn-1", + &token_usage( + /*input_tokens*/ 100, /*cached_input_tokens*/ 0, + /*output_tokens*/ 0, /*reasoning_output_tokens*/ 0, + /*total_tokens*/ 100, + ), + ) .await; let tools = harness.tools(); @@ -528,10 +522,16 @@ async fn external_goal_set_active_resets_baseline_and_injects_objective_update() )) .await?; harness.sink.clear(); - harness.response_item_injector.items_mut().clear(); harness - .record_token_usage("turn-1", &token_usage(120, 0, 0, 0, 120)) + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 120, /*cached_input_tokens*/ 0, + /*output_tokens*/ 0, /*reasoning_output_tokens*/ 0, + /*total_tokens*/ 120, + ), + ) .await; harness .runtime_handle() @@ -567,7 +567,14 @@ async fn external_goal_set_active_resets_baseline_and_injects_objective_update() .map_err(anyhow::Error::msg)?; harness - .record_token_usage("turn-1", &token_usage(130, 0, 0, 0, 130)) + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 130, /*cached_input_tokens*/ 0, + /*output_tokens*/ 0, /*reasoning_output_tokens*/ 0, + /*total_tokens*/ 130, + ), + ) .await; harness .notify_tool_finish("turn-1", "call-shell", "shell") @@ -579,20 +586,6 @@ async fn external_goal_set_active_resets_baseline_and_injects_objective_update() .await? .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; assert_eq!(30, goal.tokens_used); - let injected = harness.response_item_injector.items(); - assert_eq!(1, injected.len()); - let ResponseInputItem::Message { content, .. } = &injected[0] else { - panic!("objective update should inject a hidden message"); - }; - let prompt = content - .iter() - .find_map(|item| match item { - ContentItem::InputText { text } => Some(text.as_str()), - _ => None, - }) - .expect("goal context should render text"); - assert!(prompt.contains("new objective")); - assert!(prompt.contains("supersedes any previous thread goal objective")); Ok(()) } @@ -601,13 +594,10 @@ async fn installed_tools( thread_id: ThreadId, ) -> Vec>> { let mut builder = ExtensionRegistryBuilder::<()>::new(); - install_with_backend(&mut builder, runtime, |_| true); + install_with_backend(&mut builder, runtime, Weak::new(), |_| true); let registry = builder.build(); let session_store = ExtensionData::new("session-1"); let thread_store = ExtensionData::new(thread_id.to_string()); - thread_store - .get_or_init(ResponseItemInjectorSlot::default) - .set(Arc::new(NoopResponseItemInjector)); for contributor in registry.thread_lifecycle_contributors() { contributor .on_thread_start(ThreadStartInput { @@ -630,7 +620,6 @@ struct GoalExtensionHarness { session_store: ExtensionData, thread_store: ExtensionData, sink: Arc, - response_item_injector: Arc, } impl GoalExtensionHarness { @@ -639,15 +628,11 @@ impl GoalExtensionHarness { thread_id: ThreadId, ) -> anyhow::Result { let sink = Arc::new(RecordingEventSink::default()); - let response_item_injector = Arc::new(RecordingResponseItemInjector::default()); let mut builder = ExtensionRegistryBuilder::<()>::with_event_sink(sink.clone()); - install_with_backend(&mut builder, runtime, |_| true); + install_with_backend(&mut builder, runtime, Weak::new(), |_| true); let registry = builder.build(); let session_store = ExtensionData::new("session-1"); let thread_store = ExtensionData::new(thread_id.to_string()); - thread_store - .get_or_init(ResponseItemInjectorSlot::default) - .set(response_item_injector.clone()); for contributor in registry.thread_lifecycle_contributors() { contributor .on_thread_start(ThreadStartInput { @@ -662,7 +647,6 @@ impl GoalExtensionHarness { session_store, thread_store, sink, - response_item_injector, }) } @@ -831,34 +815,6 @@ impl ExtensionEventSink for RecordingEventSink { } } -#[derive(Debug, Default)] -struct RecordingResponseItemInjector { - items: Mutex>, -} - -impl RecordingResponseItemInjector { - fn items(&self) -> Vec { - self.items - .lock() - .unwrap_or_else(PoisonError::into_inner) - .clone() - } - - fn items_mut(&self) -> std::sync::MutexGuard<'_, Vec> { - self.items.lock().unwrap_or_else(PoisonError::into_inner) - } -} - -impl ResponseItemInjector for RecordingResponseItemInjector { - fn inject_response_items<'a>( - &'a self, - items: Vec, - ) -> ResponseItemInjectionFuture<'a> { - self.items_mut().extend(items); - Box::pin(std::future::ready(Ok(()))) - } -} - #[derive(Debug, PartialEq, Eq)] struct CapturedGoalEvent { event_id: String,