more wiirng

This commit is contained in:
jif-oai
2026-05-22 19:39:35 +01:00
parent 61d766901c
commit 7aecca96c8
8 changed files with 97 additions and 157 deletions

View File

@@ -244,6 +244,17 @@ impl CodexThread {
.await
}
/// Injects hidden model-visible items into the currently active turn.
///
/// This is the runtime-owned counterpart to user-facing `steer_input`.
/// It returns the unchanged items when this thread has no active turn.
pub async fn inject_response_items_into_active_turn(
&self,
items: Vec<ResponseInputItem>,
) -> Result<(), Vec<ResponseInputItem>> {
self.codex.session.inject_response_items(items).await
}
pub async fn set_app_server_client_info(
&self,
app_server_client_name: Option<String>,

View File

@@ -9,4 +9,3 @@ pub use events::NoopExtensionEventSink;
pub use response_items::NoopResponseItemInjector;
pub use response_items::ResponseItemInjectionFuture;
pub use response_items::ResponseItemInjector;
pub use response_items::ResponseItemInjectorSlot;

View File

@@ -1,8 +1,5 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::PoisonError;
use codex_protocol::models::ResponseInputItem;
@@ -22,36 +19,6 @@ pub trait ResponseItemInjector: Send + Sync {
) -> ResponseItemInjectionFuture<'a>;
}
/// Thread-scoped slot that lets the host publish a late-bound response-item
/// injector after extension thread-start hooks have already run.
#[derive(Default)]
pub struct ResponseItemInjectorSlot {
injector: Mutex<Option<Arc<dyn ResponseItemInjector>>>,
}
impl std::fmt::Debug for ResponseItemInjectorSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResponseItemInjectorSlot")
.finish_non_exhaustive()
}
}
impl ResponseItemInjectorSlot {
/// Replaces the published injector for this thread scope.
pub fn set(&self, injector: Arc<dyn ResponseItemInjector>) {
*self.injector() = Some(injector);
}
/// Returns the published injector, if the host has provided one yet.
pub fn get(&self) -> Option<Arc<dyn ResponseItemInjector>> {
self.injector().clone()
}
fn injector(&self) -> std::sync::MutexGuard<'_, Option<Arc<dyn ResponseItemInjector>>> {
self.injector.lock().unwrap_or_else(PoisonError::into_inner)
}
}
/// Injector used when a host does not expose same-turn model steering.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopResponseItemInjector;

View File

@@ -10,7 +10,6 @@ pub use capabilities::NoopExtensionEventSink;
pub use capabilities::NoopResponseItemInjector;
pub use capabilities::ResponseItemInjectionFuture;
pub use capabilities::ResponseItemInjector;
pub use capabilities::ResponseItemInjectorSlot;
pub use codex_tools::FunctionCallError;
pub use codex_tools::JsonToolOutput;
pub use codex_tools::ResponsesApiTool;

View File

@@ -109,7 +109,9 @@ impl GoalAccountingState {
pub(crate) fn mark_turn_goal_active(&self, turn_id: &str, goal_id: impl Into<String>) {
let mut inner = self.inner();
let goal_id = goal_id.into();
inner.budget_limit_reported_goal_id = None;
if inner.budget_limit_reported_goal_id.as_deref() != Some(goal_id.as_str()) {
inner.budget_limit_reported_goal_id = None;
}
if let Some(turn) = inner.turns.get_mut(turn_id) {
turn.active_goal_id = Some(goal_id.clone());
if inner.current_turn_id.as_deref() == Some(turn_id) {
@@ -125,7 +127,9 @@ impl GoalAccountingState {
let mut inner = self.inner();
let turn_id = inner.current_turn_id.clone()?;
let goal_id = goal_id.into();
inner.budget_limit_reported_goal_id = None;
if inner.budget_limit_reported_goal_id.as_deref() != Some(goal_id.as_str()) {
inner.budget_limit_reported_goal_id = None;
}
let turn = inner.turns.get_mut(turn_id.as_str())?;
turn.active_goal_id = Some(goal_id.clone());
turn.reset_baseline_to_current();
@@ -135,7 +139,10 @@ impl GoalAccountingState {
pub(crate) fn mark_idle_goal_active(&self, goal_id: impl Into<String>) {
let mut inner = self.inner();
inner.budget_limit_reported_goal_id = None;
let goal_id = goal_id.into();
if inner.budget_limit_reported_goal_id.as_deref() != Some(goal_id.as_str()) {
inner.budget_limit_reported_goal_id = None;
}
inner.wall_clock.mark_active_goal(goal_id);
}

View File

@@ -1,11 +1,12 @@
use std::sync::Arc;
use std::sync::Weak;
use async_trait::async_trait;
use codex_core::ThreadManager;
use codex_extension_api::ConfigContributor;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionEventSink;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::ResponseItemInjectorSlot;
use codex_extension_api::ThreadLifecycleContributor;
use codex_extension_api::ThreadStartInput;
use codex_extension_api::TokenUsageContributor;
@@ -45,6 +46,7 @@ impl GoalExtensionConfig {
pub struct GoalExtension<C> {
state_dbs: Arc<codex_state::StateRuntime>,
event_emitter: GoalEventEmitter,
thread_manager: Weak<ThreadManager>,
goals_enabled: Arc<dyn Fn(&C) -> bool + Send + Sync>,
}
@@ -58,11 +60,13 @@ impl<C> GoalExtension<C> {
pub(crate) fn new_with_host_capabilities(
state_dbs: Arc<codex_state::StateRuntime>,
event_sink: Arc<dyn ExtensionEventSink>,
thread_manager: Weak<ThreadManager>,
goals_enabled: impl Fn(&C) -> bool + Send + Sync + 'static,
) -> Self {
Self {
state_dbs,
event_emitter: GoalEventEmitter::new(event_sink),
thread_manager,
goals_enabled: Arc::new(goals_enabled),
}
}
@@ -81,9 +85,6 @@ where
let accounting_state = input
.thread_store
.get_or_init::<GoalAccountingState>(GoalAccountingState::default);
let response_item_injector_slot = input
.thread_store
.get_or_init(ResponseItemInjectorSlot::default);
let Ok(thread_id) = ThreadId::from_string(input.thread_store.level_id()) else {
return;
};
@@ -92,7 +93,7 @@ where
thread_id,
Arc::clone(&self.state_dbs),
self.event_emitter.clone(),
response_item_injector_slot,
self.thread_manager.clone(),
accounting_state,
enabled,
)
@@ -292,16 +293,7 @@ where
return;
}
let item = budget_limit_steering_item(&goal);
let Some(response_item_injector) = runtime.response_item_injector_slot().get() else {
return;
};
if response_item_injector
.inject_response_items(vec![item])
.await
.is_err()
{
tracing::debug!("skipping budget-limit goal steering because no turn is active");
}
runtime.inject_active_turn_steering(item).await;
})
}
}
@@ -348,6 +340,7 @@ where
pub fn install_with_backend<C>(
registry: &mut ExtensionRegistryBuilder<C>,
state_dbs: Arc<codex_state::StateRuntime>,
thread_manager: Weak<ThreadManager>,
goals_enabled: impl Fn(&C) -> bool + Send + Sync + 'static,
) where
C: Send + Sync + 'static,
@@ -355,6 +348,7 @@ pub fn install_with_backend<C>(
let extension = Arc::new(GoalExtension::new_with_host_capabilities(
state_dbs,
registry.event_sink(),
thread_manager,
goals_enabled,
));
registry.thread_lifecycle_contributor(extension.clone());

View File

@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::sync::Weak;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_extension_api::ResponseItemInjectorSlot;
use codex_core::ThreadManager;
use codex_protocol::ThreadId;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::ThreadGoal;
use crate::accounting::BudgetLimitedGoalDisposition;
@@ -21,7 +23,7 @@ struct GoalRuntimeInner {
thread_id: ThreadId,
state_dbs: Arc<codex_state::StateRuntime>,
event_emitter: GoalEventEmitter,
response_item_injector_slot: Arc<ResponseItemInjectorSlot>,
thread_manager: Weak<ThreadManager>,
accounting_state: Arc<GoalAccountingState>,
enabled: AtomicBool,
}
@@ -59,7 +61,7 @@ impl GoalRuntimeHandle {
thread_id: ThreadId,
state_dbs: Arc<codex_state::StateRuntime>,
event_emitter: GoalEventEmitter,
response_item_injector_slot: Arc<ResponseItemInjectorSlot>,
thread_manager: Weak<ThreadManager>,
accounting_state: Arc<GoalAccountingState>,
enabled: bool,
) -> Self {
@@ -68,7 +70,7 @@ impl GoalRuntimeHandle {
thread_id,
state_dbs,
event_emitter,
response_item_injector_slot,
thread_manager,
accounting_state,
enabled: AtomicBool::new(enabled),
}),
@@ -91,10 +93,6 @@ impl GoalRuntimeHandle {
Arc::clone(&self.inner.accounting_state)
}
pub(crate) fn response_item_injector_slot(&self) -> Arc<ResponseItemInjectorSlot> {
Arc::clone(&self.inner.response_item_injector_slot)
}
pub async fn prepare_external_goal_mutation(&self) -> Result<(), String> {
if !self.is_enabled() {
return Ok(());
@@ -129,9 +127,11 @@ impl GoalRuntimeHandle {
return Ok(());
}
let objective_changed = previous_goal
.as_ref()
.is_some_and(|previous_goal| previous_goal.objective != goal.objective);
let should_steer_active_turn = previous_goal.as_ref().is_none_or(|previous_goal| {
previous_goal.goal_id != goal.goal_id
|| previous_goal.status != codex_state::ThreadGoalStatus::Active
|| previous_goal.objective != goal.objective
});
match goal.status {
codex_state::ThreadGoalStatus::Active => {
if self.inner.accounting_state.current_turn_id().is_some() {
@@ -144,20 +144,9 @@ impl GoalRuntimeHandle {
.accounting_state
.mark_idle_goal_active(goal.goal_id.clone());
}
if objective_changed
&& let Some(response_item_injector) =
self.inner.response_item_injector_slot.get()
{
if should_steer_active_turn {
let item = objective_updated_steering_item(&protocol_goal_from_state(goal));
if response_item_injector
.inject_response_items(vec![item])
.await
.is_err()
{
tracing::debug!(
"skipping objective-updated goal steering because no turn is active"
);
}
self.inject_active_turn_steering(item).await;
}
}
codex_state::ThreadGoalStatus::BudgetLimited => {
@@ -184,6 +173,24 @@ impl GoalRuntimeHandle {
Ok(())
}
pub(crate) async fn inject_active_turn_steering(&self, item: ResponseInputItem) {
let Some(thread_manager) = self.inner.thread_manager.upgrade() else {
tracing::debug!("skipping goal steering because thread manager is unavailable");
return;
};
let Ok(thread) = thread_manager.get_thread(self.inner.thread_id).await else {
tracing::debug!("skipping goal steering because live thread is unavailable");
return;
};
if thread
.inject_response_items_into_active_turn(vec![item])
.await
.is_err()
{
tracing::debug!("skipping goal steering because no turn is active");
}
}
pub(crate) async fn account_active_goal_progress(
&self,
turn_id: &str,

View File

@@ -1,15 +1,12 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::PoisonError;
use std::sync::Weak;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionEventSink;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::FunctionCallError;
use codex_extension_api::NoopResponseItemInjector;
use codex_extension_api::ResponseItemInjectionFuture;
use codex_extension_api::ResponseItemInjector;
use codex_extension_api::ResponseItemInjectorSlot;
use codex_extension_api::ThreadStartInput;
use codex_extension_api::ToolCall;
use codex_extension_api::ToolCallOutcome;
@@ -26,8 +23,6 @@ use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SessionSource;
@@ -305,23 +300,11 @@ async fn budget_limited_goal_keeps_accruing_until_turn_stop() -> anyhow::Result<
harness.sink.goal_events()
);
let steering_items = harness.response_item_injector.items();
let [ResponseInputItem::Message { role, content, .. }] = steering_items.as_slice() else {
panic!("expected one budget-limit steering item, got {steering_items:#?}");
};
assert_eq!("user", role);
let [ContentItem::InputText { text }] = content.as_slice() else {
panic!("expected one steering text item, got {content:#?}");
};
assert!(text.starts_with("<goal_context>"));
assert!(text.trim_end().ends_with("</goal_context>"));
assert!(text.contains("budget_limited"));
assert!(text.to_lowercase().contains("wrap up this turn soon"));
Ok(())
}
#[tokio::test]
async fn budget_limited_goal_steering_injects_once_after_later_tool_finish() -> anyhow::Result<()> {
async fn budget_limited_goal_keeps_accounting_after_later_tool_finish() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
let thread_id = test_thread_id()?;
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
@@ -375,7 +358,6 @@ async fn budget_limited_goal_steering_injects_once_after_later_tool_finish() ->
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(35, goal.tokens_used);
assert_eq!(codex_state::ThreadGoalStatus::BudgetLimited, goal.status);
assert_eq!(1, harness.response_item_injector.items().len());
Ok(())
}
@@ -481,7 +463,13 @@ async fn external_goal_mutation_start_accounts_active_goal_progress() -> anyhow:
harness.sink.clear();
harness
.record_token_usage("turn-1", &token_usage(20, 5, 8, 2, 30))
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 20, /*cached_input_tokens*/ 5, /*output_tokens*/ 8,
/*reasoning_output_tokens*/ 2, /*total_tokens*/ 30,
),
)
.await;
harness
.runtime_handle()
@@ -508,14 +496,20 @@ async fn external_goal_mutation_start_accounts_active_goal_progress() -> anyhow:
}
#[tokio::test]
async fn external_goal_set_active_resets_baseline_and_injects_objective_update()
-> anyhow::Result<()> {
async fn external_goal_set_active_resets_baseline_without_live_thread() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
let thread_id = test_thread_id()?;
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?;
harness
.start_turn("turn-1", &token_usage(100, 0, 0, 0, 100))
.start_turn(
"turn-1",
&token_usage(
/*input_tokens*/ 100, /*cached_input_tokens*/ 0,
/*output_tokens*/ 0, /*reasoning_output_tokens*/ 0,
/*total_tokens*/ 100,
),
)
.await;
let tools = harness.tools();
@@ -528,10 +522,16 @@ async fn external_goal_set_active_resets_baseline_and_injects_objective_update()
))
.await?;
harness.sink.clear();
harness.response_item_injector.items_mut().clear();
harness
.record_token_usage("turn-1", &token_usage(120, 0, 0, 0, 120))
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 120, /*cached_input_tokens*/ 0,
/*output_tokens*/ 0, /*reasoning_output_tokens*/ 0,
/*total_tokens*/ 120,
),
)
.await;
harness
.runtime_handle()
@@ -567,7 +567,14 @@ async fn external_goal_set_active_resets_baseline_and_injects_objective_update()
.map_err(anyhow::Error::msg)?;
harness
.record_token_usage("turn-1", &token_usage(130, 0, 0, 0, 130))
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 130, /*cached_input_tokens*/ 0,
/*output_tokens*/ 0, /*reasoning_output_tokens*/ 0,
/*total_tokens*/ 130,
),
)
.await;
harness
.notify_tool_finish("turn-1", "call-shell", "shell")
@@ -579,20 +586,6 @@ async fn external_goal_set_active_resets_baseline_and_injects_objective_update()
.await?
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(30, goal.tokens_used);
let injected = harness.response_item_injector.items();
assert_eq!(1, injected.len());
let ResponseInputItem::Message { content, .. } = &injected[0] else {
panic!("objective update should inject a hidden message");
};
let prompt = content
.iter()
.find_map(|item| match item {
ContentItem::InputText { text } => Some(text.as_str()),
_ => None,
})
.expect("goal context should render text");
assert!(prompt.contains("new objective"));
assert!(prompt.contains("supersedes any previous thread goal objective"));
Ok(())
}
@@ -601,13 +594,10 @@ async fn installed_tools(
thread_id: ThreadId,
) -> Vec<Arc<dyn ToolExecutor<ToolCall>>> {
let mut builder = ExtensionRegistryBuilder::<()>::new();
install_with_backend(&mut builder, runtime, |_| true);
install_with_backend(&mut builder, runtime, Weak::new(), |_| true);
let registry = builder.build();
let session_store = ExtensionData::new("session-1");
let thread_store = ExtensionData::new(thread_id.to_string());
thread_store
.get_or_init(ResponseItemInjectorSlot::default)
.set(Arc::new(NoopResponseItemInjector));
for contributor in registry.thread_lifecycle_contributors() {
contributor
.on_thread_start(ThreadStartInput {
@@ -630,7 +620,6 @@ struct GoalExtensionHarness {
session_store: ExtensionData,
thread_store: ExtensionData,
sink: Arc<RecordingEventSink>,
response_item_injector: Arc<RecordingResponseItemInjector>,
}
impl GoalExtensionHarness {
@@ -639,15 +628,11 @@ impl GoalExtensionHarness {
thread_id: ThreadId,
) -> anyhow::Result<Self> {
let sink = Arc::new(RecordingEventSink::default());
let response_item_injector = Arc::new(RecordingResponseItemInjector::default());
let mut builder = ExtensionRegistryBuilder::<()>::with_event_sink(sink.clone());
install_with_backend(&mut builder, runtime, |_| true);
install_with_backend(&mut builder, runtime, Weak::new(), |_| true);
let registry = builder.build();
let session_store = ExtensionData::new("session-1");
let thread_store = ExtensionData::new(thread_id.to_string());
thread_store
.get_or_init(ResponseItemInjectorSlot::default)
.set(response_item_injector.clone());
for contributor in registry.thread_lifecycle_contributors() {
contributor
.on_thread_start(ThreadStartInput {
@@ -662,7 +647,6 @@ impl GoalExtensionHarness {
session_store,
thread_store,
sink,
response_item_injector,
})
}
@@ -831,34 +815,6 @@ impl ExtensionEventSink for RecordingEventSink {
}
}
#[derive(Debug, Default)]
struct RecordingResponseItemInjector {
items: Mutex<Vec<ResponseInputItem>>,
}
impl RecordingResponseItemInjector {
fn items(&self) -> Vec<ResponseInputItem> {
self.items
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone()
}
fn items_mut(&self) -> std::sync::MutexGuard<'_, Vec<ResponseInputItem>> {
self.items.lock().unwrap_or_else(PoisonError::into_inner)
}
}
impl ResponseItemInjector for RecordingResponseItemInjector {
fn inject_response_items<'a>(
&'a self,
items: Vec<ResponseInputItem>,
) -> ResponseItemInjectionFuture<'a> {
self.items_mut().extend(items);
Box::pin(std::future::ready(Ok(())))
}
}
#[derive(Debug, PartialEq, Eq)]
struct CapturedGoalEvent {
event_id: String,