diff --git a/codex-rs/ext/goal/src/accounting.rs b/codex-rs/ext/goal/src/accounting.rs index 2a679837cc..e0a84f5f8b 100644 --- a/codex-rs/ext/goal/src/accounting.rs +++ b/codex-rs/ext/goal/src/accounting.rs @@ -83,6 +83,17 @@ impl GoalAccountingState { self.inner().current_turn_id.clone() } + pub(crate) fn turn_is_current_active_goal(&self, turn_id: &str) -> bool { + let inner = self.inner(); + if inner.current_turn_id.as_deref() != Some(turn_id) { + return false; + } + let Some(turn) = inner.turns.get(turn_id) else { + return false; + }; + turn.account_tokens && turn.active_goal_id.is_some() + } + pub(crate) fn record_token_usage( &self, turn_id: impl Into, diff --git a/codex-rs/ext/goal/src/runtime.rs b/codex-rs/ext/goal/src/runtime.rs index 0139cc2c52..16a4ff6a2f 100644 --- a/codex-rs/ext/goal/src/runtime.rs +++ b/codex-rs/ext/goal/src/runtime.rs @@ -192,6 +192,54 @@ impl GoalRuntimeHandle { Ok(()) } + pub async fn usage_limit_active_goal_for_turn(&self, turn_id: &str) -> Result<(), String> { + if !self.is_enabled() { + return Ok(()); + } + + if !self + .inner + .accounting_state + .turn_is_current_active_goal(turn_id) + { + return Ok(()); + } + + let progress_event_id = format!("{turn_id}:usage-limit-progress"); + self.account_active_goal_progress( + turn_id, + progress_event_id.as_str(), + codex_state::GoalAccountingMode::ActiveOnly, + BudgetLimitedGoalDisposition::ClearActive, + ) + .await?; + + let previous_status = self + .current_goal_status_for_metrics(/*expected_goal_id*/ None) + .await?; + let Some(goal) = self + .inner + .state_dbs + .thread_goals() + .usage_limit_active_thread_goal(self.thread_id()) + .await + .map_err(|err| err.to_string())? + else { + return Ok(()); + }; + self.inner + .metrics + .record_terminal_if_status_changed(previous_status, &goal); + self.inner.accounting_state.clear_active_goal(); + let goal = protocol_goal_from_state(goal); + self.inner.event_emitter.thread_goal_updated( + format!("{turn_id}:usage-limit"), + Some(turn_id.to_string()), + goal, + ); + Ok(()) + } + pub async fn restore_after_resume(&self) -> Result<(), String> { if !self.is_enabled() { return Ok(()); diff --git a/codex-rs/ext/goal/tests/goal_extension_backend.rs b/codex-rs/ext/goal/tests/goal_extension_backend.rs index 0a36439b25..584f7362b7 100644 --- a/codex-rs/ext/goal/tests/goal_extension_backend.rs +++ b/codex-rs/ext/goal/tests/goal_extension_backend.rs @@ -363,6 +363,243 @@ async fn budget_limited_goal_keeps_accounting_after_later_tool_finish() -> anyho Ok(()) } +#[tokio::test] +async fn usage_limit_active_goal_accounts_progress_and_clears_accounting() -> anyhow::Result<()> { + let runtime = test_runtime().await?; + let thread_id = test_thread_id()?; + seed_thread_metadata(runtime.as_ref(), thread_id).await?; + let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?; + harness.start_turn("turn-1", &TokenUsage::default()).await; + + let tools = harness.tools(); + let create_tool = tool_by_name(&tools, "create_goal"); + create_tool + .handle(tool_call( + "create_goal", + "call-create-goal", + json!({ "objective": "ship goal extension backend" }), + )) + .await?; + harness.sink.clear(); + + harness + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 20, /*cached_input_tokens*/ 5, /*output_tokens*/ 8, + /*reasoning_output_tokens*/ 2, /*total_tokens*/ 30, + ), + ) + .await; + harness + .runtime_handle() + .usage_limit_active_goal_for_turn("turn-1") + .await + .map_err(anyhow::Error::msg)?; + + let goal = runtime + .thread_goals() + .get_thread_goal(thread_id) + .await? + .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; + assert_eq!(23, goal.tokens_used); + assert_eq!(codex_state::ThreadGoalStatus::UsageLimited, goal.status); + assert_eq!( + vec![ + CapturedGoalEvent { + event_id: "turn-1:usage-limit-progress".to_string(), + turn_id: Some("turn-1".to_string()), + status: ThreadGoalStatus::Active, + tokens_used: 23, + }, + CapturedGoalEvent { + event_id: "turn-1:usage-limit".to_string(), + turn_id: Some("turn-1".to_string()), + status: ThreadGoalStatus::UsageLimited, + tokens_used: 23, + }, + ], + harness.sink.goal_events() + ); + + harness + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 50, /*cached_input_tokens*/ 5, + /*output_tokens*/ 20, /*reasoning_output_tokens*/ 0, + /*total_tokens*/ 70, + ), + ) + .await; + harness + .notify_tool_finish("turn-1", "call-shell-after-usage-limit", "shell") + .await; + harness.stop_turn("turn-1").await; + + let goal = runtime + .thread_goals() + .get_thread_goal(thread_id) + .await? + .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; + assert_eq!(23, goal.tokens_used); + assert_eq!(codex_state::ThreadGoalStatus::UsageLimited, goal.status); + Ok(()) +} + +#[tokio::test] +async fn usage_limit_budget_limited_goal_accounts_remaining_progress() -> anyhow::Result<()> { + let runtime = test_runtime().await?; + let thread_id = test_thread_id()?; + seed_thread_metadata(runtime.as_ref(), thread_id).await?; + let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?; + harness.start_turn("turn-1", &TokenUsage::default()).await; + + let tools = harness.tools(); + let create_tool = tool_by_name(&tools, "create_goal"); + create_tool + .handle(tool_call( + "create_goal", + "call-create-goal", + json!({ + "objective": "ship goal extension backend", + "token_budget": 25, + }), + )) + .await?; + + harness + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 20, /*cached_input_tokens*/ 5, + /*output_tokens*/ 10, /*reasoning_output_tokens*/ 0, + /*total_tokens*/ 30, + ), + ) + .await; + harness + .notify_tool_finish("turn-1", "call-shell", "shell") + .await; + harness.sink.clear(); + + harness + .record_token_usage( + "turn-1", + &token_usage( + /*input_tokens*/ 24, /*cached_input_tokens*/ 5, + /*output_tokens*/ 16, /*reasoning_output_tokens*/ 0, + /*total_tokens*/ 40, + ), + ) + .await; + harness + .runtime_handle() + .usage_limit_active_goal_for_turn("turn-1") + .await + .map_err(anyhow::Error::msg)?; + + let goal = runtime + .thread_goals() + .get_thread_goal(thread_id) + .await? + .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; + assert_eq!(35, goal.tokens_used); + assert_eq!(codex_state::ThreadGoalStatus::UsageLimited, goal.status); + assert_eq!( + vec![ + CapturedGoalEvent { + event_id: "turn-1:usage-limit-progress".to_string(), + turn_id: Some("turn-1".to_string()), + status: ThreadGoalStatus::BudgetLimited, + tokens_used: 35, + }, + CapturedGoalEvent { + event_id: "turn-1:usage-limit".to_string(), + turn_id: Some("turn-1".to_string()), + status: ThreadGoalStatus::UsageLimited, + tokens_used: 35, + }, + ], + harness.sink.goal_events() + ); + Ok(()) +} + +#[tokio::test] +async fn usage_limit_plan_turn_does_not_stop_goal() -> anyhow::Result<()> { + let runtime = test_runtime().await?; + let thread_id = test_thread_id()?; + seed_thread_metadata(runtime.as_ref(), thread_id).await?; + let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?; + + let tools = harness.tools(); + let create_tool = tool_by_name(&tools, "create_goal"); + create_tool + .handle(tool_call( + "create_goal", + "call-create-goal", + json!({ "objective": "ship goal extension backend" }), + )) + .await?; + + harness + .start_turn_with_mode("turn-plan", ModeKind::Plan, &TokenUsage::default()) + .await; + harness.sink.clear(); + harness + .runtime_handle() + .usage_limit_active_goal_for_turn("turn-plan") + .await + .map_err(anyhow::Error::msg)?; + + let goal = runtime + .thread_goals() + .get_thread_goal(thread_id) + .await? + .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; + assert_eq!(codex_state::ThreadGoalStatus::Active, goal.status); + assert_eq!(Vec::::new(), harness.sink.goal_events()); + Ok(()) +} + +#[tokio::test] +async fn usage_limit_stale_turn_does_not_stop_current_goal() -> anyhow::Result<()> { + let runtime = test_runtime().await?; + let thread_id = test_thread_id()?; + seed_thread_metadata(runtime.as_ref(), thread_id).await?; + let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?; + harness.start_turn("turn-1", &TokenUsage::default()).await; + + let tools = harness.tools(); + let create_tool = tool_by_name(&tools, "create_goal"); + create_tool + .handle(tool_call( + "create_goal", + "call-create-goal", + json!({ "objective": "ship goal extension backend" }), + )) + .await?; + harness.stop_turn("turn-1").await; + harness.start_turn("turn-2", &TokenUsage::default()).await; + harness.sink.clear(); + + harness + .runtime_handle() + .usage_limit_active_goal_for_turn("turn-1") + .await + .map_err(anyhow::Error::msg)?; + + let goal = runtime + .thread_goals() + .get_thread_goal(thread_id) + .await? + .ok_or_else(|| anyhow::anyhow!("goal should exist"))?; + assert_eq!(codex_state::ThreadGoalStatus::Active, goal.status); + assert_eq!(Vec::::new(), harness.sink.goal_events()); + Ok(()) +} + #[tokio::test] async fn update_goal_can_block_and_accounts_final_progress() -> anyhow::Result<()> { let runtime = test_runtime().await?; @@ -719,8 +956,14 @@ impl GoalExtensionHarness { } async fn start_turn(&self, turn_id: &str, usage: &TokenUsage) { + self.start_turn_with_mode(turn_id, ModeKind::Default, usage) + .await; + } + + async fn start_turn_with_mode(&self, turn_id: &str, mode: ModeKind, usage: &TokenUsage) { let turn_store = ExtensionData::new(turn_id); - let collaboration_mode = default_collaboration_mode(); + let mut collaboration_mode = default_collaboration_mode(); + collaboration_mode.mode = mode; for contributor in self.registry.turn_lifecycle_contributors() { contributor .on_turn_start(TurnStartInput {