mirror of
https://github.com/openai/codex.git
synced 2026-05-28 06:55:01 +00:00
fix: restore goal accounting after thread resume (#24626)
## Why Goal idle accounting is supposed to survive a thread resume. Previously, the resume hook restored the active goal state inline from the extension lifecycle contributor, which left the runtime handle without a reusable restoration path and made the behavior hard to cover directly. When a thread with an active goal was resumed, goal accounting could lose track of the active idle goal instead of continuing to accrue elapsed time. ## What changed - Moved thread-resume restoration into `GoalRuntimeHandle::restore_after_resume()` so the runtime owns rehydrating active goal accounting from persisted thread goal state. - Kept disabled goal runtimes as a no-op and preserved the existing warning path when persisted goal state cannot be loaded. - Added a backend regression test that seeds an active goal, resumes the thread, waits briefly, and verifies elapsed idle time is reflected on the next external goal mutation. ## Testing - Not run locally; this metadata update only rewrote the PR title/body.
This commit is contained in:
@@ -112,33 +112,12 @@ where
|
||||
let Some(runtime) = goal_runtime_handle(input.thread_store) else {
|
||||
return;
|
||||
};
|
||||
if !runtime.is_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
let goal = match self
|
||||
.state_dbs
|
||||
.thread_goals()
|
||||
.get_thread_goal(runtime.thread_id())
|
||||
.await
|
||||
{
|
||||
Ok(goal) => goal,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"failed to restore goal runtime after thread resume for {}: {err}",
|
||||
runtime.thread_id()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
match goal {
|
||||
Some(goal) if goal.status == codex_state::ThreadGoalStatus::Active => {
|
||||
runtime
|
||||
.accounting_state()
|
||||
.mark_idle_goal_active(goal.goal_id);
|
||||
self.metrics.record_resumed();
|
||||
}
|
||||
Some(_) | None => runtime.accounting_state().clear_active_goal(),
|
||||
if let Err(err) = runtime.restore_after_resume().await {
|
||||
tracing::warn!(
|
||||
"failed to restore goal runtime after thread resume for {}: {err}",
|
||||
runtime.thread_id()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,6 +192,30 @@ impl GoalRuntimeHandle {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn restore_after_resume(&self) -> Result<(), String> {
|
||||
if !self.is_enabled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let goal = self
|
||||
.inner
|
||||
.state_dbs
|
||||
.thread_goals()
|
||||
.get_thread_goal(self.thread_id())
|
||||
.await
|
||||
.map_err(|err| err.to_string())?;
|
||||
match goal {
|
||||
Some(goal) if goal.status == codex_state::ThreadGoalStatus::Active => {
|
||||
self.inner
|
||||
.accounting_state
|
||||
.mark_idle_goal_active(goal.goal_id);
|
||||
self.inner.metrics.record_resumed();
|
||||
}
|
||||
Some(_) | None => self.inner.accounting_state.clear_active_goal(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn inject_active_turn_steering(&self, item: ResponseInputItem) {
|
||||
let Some(thread_manager) = self.inner.thread_manager.upgrade() else {
|
||||
tracing::debug!("skipping goal steering because thread manager is unavailable");
|
||||
|
||||
@@ -2,11 +2,13 @@ use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::PoisonError;
|
||||
use std::sync::Weak;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_extension_api::ExtensionData;
|
||||
use codex_extension_api::ExtensionEventSink;
|
||||
use codex_extension_api::ExtensionRegistryBuilder;
|
||||
use codex_extension_api::FunctionCallError;
|
||||
use codex_extension_api::ThreadResumeInput;
|
||||
use codex_extension_api::ThreadStartInput;
|
||||
use codex_extension_api::ToolCall;
|
||||
use codex_extension_api::ToolCallOutcome;
|
||||
@@ -589,6 +591,52 @@ async fn external_goal_set_active_resets_baseline_without_live_thread() -> anyho
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_rehydrates_active_goal_idle_accounting() -> anyhow::Result<()> {
|
||||
let runtime = test_runtime().await?;
|
||||
let thread_id = test_thread_id()?;
|
||||
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
|
||||
runtime
|
||||
.thread_goals()
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"ship goal extension backend",
|
||||
codex_state::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ None,
|
||||
)
|
||||
.await?;
|
||||
let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?;
|
||||
|
||||
harness.resume_thread().await;
|
||||
tokio::time::sleep(Duration::from_millis(1_100)).await;
|
||||
harness
|
||||
.runtime_handle()
|
||||
.prepare_external_goal_mutation()
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
|
||||
let goal = runtime
|
||||
.thread_goals()
|
||||
.get_thread_goal(thread_id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
|
||||
assert_eq!(ThreadGoalStatus::Active, protocol_status(goal.status));
|
||||
assert!(
|
||||
goal.time_used_seconds >= 1,
|
||||
"resumed idle accounting should add elapsed wall-clock time"
|
||||
);
|
||||
assert_eq!(
|
||||
vec![CapturedGoalEvent {
|
||||
event_id: format!("{thread_id}:external-goal-mutation"),
|
||||
turn_id: None,
|
||||
status: ThreadGoalStatus::Active,
|
||||
tokens_used: 0,
|
||||
}],
|
||||
harness.sink.goal_events()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn installed_tools(
|
||||
runtime: Arc<codex_state::StateRuntime>,
|
||||
thread_id: ThreadId,
|
||||
@@ -719,6 +767,17 @@ impl GoalExtensionHarness {
|
||||
}
|
||||
}
|
||||
|
||||
async fn resume_thread(&self) {
|
||||
for contributor in self.registry.thread_lifecycle_contributors() {
|
||||
contributor
|
||||
.on_thread_resume(ThreadResumeInput {
|
||||
session_store: &self.session_store,
|
||||
thread_store: &self.thread_store,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn notify_tool_finish(&self, turn_id: &str, call_id: &str, tool_name: &str) {
|
||||
let turn_store = ExtensionData::new(turn_id);
|
||||
let tool_name = codex_extension_api::ToolName::plain(tool_name);
|
||||
|
||||
Reference in New Issue
Block a user