mirror of
https://github.com/openai/codex.git
synced 2026-06-02 11:22:01 +00:00
fix(app-server): replay token usage after resume and fork (#18023)
## Problem When a user resumed or forked a session, the TUI could render the restored thread history immediately, but it did not receive token usage until a later model turn emitted a fresh usage event. That left the context/status UI blank or stale during the exact window where the user expects resumed state to look complete. Core already reconstructed token usage from the rollout; the missing behavior was app-server lifecycle replay to the client that just attached. ## Mental model Token usage has two representations. The rollout is the durable source of historical `TokenCount` events, and the core session cache is the in-memory snapshot reconstructed from that rollout on resume or fork. App-server v2 clients do not read core state directly; they learn about usage through `thread/tokenUsage/updated`. The fix keeps those roles separate: core exposes the restored `TokenUsageInfo`, and app-server sends one targeted notification after a successful `thread/resume` or `thread/fork` response when that restored snapshot exists. This notification is not a new model event. It is a replay of already-persisted state for the client that just attached. That distinction matters because using the normal core event path here would risk duplicating `TokenCount` entries in the rollout and making future resumes count historical usage twice. ## Non-goals This change does not add a new protocol method or payload shape. It reuses the existing v2 `thread/tokenUsage/updated` notification and the TUI’s existing handler for that notification. This change does not alter how token usage is computed, accumulated, compacted, or written during turns. It only exposes the token usage that resume and fork reconstruction already restored. This change does not broadcast historical usage replay to every subscribed client. The replay is intentionally scoped to the connection that requested resume or fork so already-attached clients are not surprised by an old usage update while they may be rendering live activity. ## Tradeoffs Sending the usage notification after the JSON-RPC response preserves a clear lifecycle order: the client first receives the thread object, then receives restored usage for that thread. The tradeoff is that usage is still a notification rather than part of the `thread/resume` or `thread/fork` response. That keeps the protocol shape stable and avoids duplicating usage fields across response types, but clients must continue listening for notifications after receiving the response. The helper selects the latest non-in-progress turn id for the replayed usage notification. This is conservative because restored usage belongs to completed persisted accounting, not to newly attached in-flight work. The fallback to the last turn preserves a stable wire payload for unusual histories, but histories with no meaningful completed turn still have a weak attribution story. ## Architecture Core already seeds `Session` token state from the last persisted rollout `TokenCount` during `InitialHistory::Resumed` and `InitialHistory::Forked`. The new core accessor exposes the complete `TokenUsageInfo` through `CodexThread` without giving app-server direct session mutation authority. App-server calls that accessor from three lifecycle paths: cold `thread/resume`, running-thread resume/rejoin, and `thread/fork`. In each path, the server sends the normal response first, then calls a shared helper that converts core usage into `ThreadTokenUsageUpdatedNotification` and sends it only to the requesting connection. The tests build fake rollouts with a user turn plus a persisted token usage event. They then exercise `thread/resume` and `thread/fork` without starting another model turn, proving that restored usage arrives before any next-turn token event could be produced. ## Observability The primary debug path is the app-server JSON-RPC stream. After `thread/resume` or `thread/fork`, a client should see the response followed by `thread/tokenUsage/updated` when the source rollout includes token usage. If the notification is absent, check whether the rollout contains an `event_msg` payload of type `token_count`, whether core reconstruction seeded `Session::token_usage_info`, and whether the connection stayed attached long enough to receive the targeted notification. The notification is sent through the existing `OutgoingMessageSender::send_server_notification_to_connections` path, so existing app-server tracing around server notifications still applies. Because this is a replay, not a model turn event, debugging should start at the resume/fork handlers rather than the turn event translation in `bespoke_event_handling`. ## Tests The focused regression coverage is `cargo test -p codex-app-server emits_restored_token_usage`, which covers both resume and fork. The core reconstruction guard is `cargo test -p codex-core record_initial_history_seeds_token_info_from_rollout`. Formatting and lint/fix passes were run with `just fmt`, `just fix -p codex-core`, and `just fix -p codex-app-server`. Full crate test runs surfaced pre-existing unrelated failures in command execution and plugin marketplace tests; the new token usage tests passed in focused runs and within the app-server suite before the unrelated command execution failure.
This commit is contained in:
@@ -123,6 +123,20 @@ impl ThreadHistoryBuilder {
|
||||
.or_else(|| self.turns.last().cloned())
|
||||
}
|
||||
|
||||
/// Returns the index of the active turn snapshot within the finished turn list.
|
||||
///
|
||||
/// When a turn is still open, this is the index it will occupy after
|
||||
/// `finish`. When no turn is open, it is the index of the last finished turn.
|
||||
pub fn active_turn_position(&self) -> Option<usize> {
|
||||
if self.current_turn.is_some() {
|
||||
Some(self.turns.len())
|
||||
} else if self.turns.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.turns.len() - 1)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_active_turn(&self) -> bool {
|
||||
self.current_turn.is_some()
|
||||
}
|
||||
|
||||
@@ -250,7 +250,7 @@ Start a fresh thread when you need a new Codex conversation.
|
||||
|
||||
Valid `personality` values are `"friendly"`, `"pragmatic"`, and `"none"`. When `"none"` is selected, the personality placeholder is replaced with an empty string.
|
||||
|
||||
To continue a stored session, call `thread/resume` with the `thread.id` you previously recorded. The response shape matches `thread/start`, and no additional notifications are emitted. You can also pass the same configuration overrides supported by `thread/start`, including `approvalsReviewer`.
|
||||
To continue a stored session, call `thread/resume` with the `thread.id` you previously recorded. The response shape matches `thread/start`. When the stored session includes persisted token usage, the server emits `thread/tokenUsage/updated` immediately after the response so clients can render restored usage before the next turn starts. You can also pass the same configuration overrides supported by `thread/start`, including `approvalsReviewer`.
|
||||
|
||||
By default, resume uses the latest persisted `model` and `reasoningEffort` values associated with the thread. Supplying any of `model`, `modelProvider`, `config.model`, or `config.model_reasoning_effort` disables that persisted fallback and uses the explicit overrides plus normal config resolution instead.
|
||||
|
||||
@@ -264,7 +264,7 @@ Example:
|
||||
{ "id": 11, "result": { "thread": { "id": "thr_123", … } } }
|
||||
```
|
||||
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only:
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only:
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123", "ephemeral": true } }
|
||||
|
||||
@@ -363,12 +363,17 @@ use codex_app_server_protocol::ServerRequest;
|
||||
mod apps_list_helpers;
|
||||
mod plugin_app_helpers;
|
||||
mod plugin_mcp_oauth;
|
||||
mod token_usage_replay;
|
||||
|
||||
use crate::filters::compute_source_filters;
|
||||
use crate::filters::source_kind_matches;
|
||||
use crate::thread_state::ThreadListenerCommand;
|
||||
use crate::thread_state::ThreadState;
|
||||
use crate::thread_state::ThreadStateManager;
|
||||
use token_usage_replay::latest_token_usage_turn_id_for_thread_path;
|
||||
use token_usage_replay::latest_token_usage_turn_id_from_rollout_items;
|
||||
use token_usage_replay::latest_token_usage_turn_id_from_rollout_path;
|
||||
use token_usage_replay::send_thread_token_usage_update_to_connection;
|
||||
|
||||
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
|
||||
const THREAD_LIST_MAX_LIMIT: usize = 100;
|
||||
@@ -4103,7 +4108,7 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
Ok(NewThread {
|
||||
thread_id,
|
||||
thread,
|
||||
thread: codex_thread,
|
||||
session_configured,
|
||||
}) => {
|
||||
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
|
||||
@@ -4132,7 +4137,7 @@ impl CodexMessageProcessor {
|
||||
let mut thread = match self
|
||||
.load_thread_from_resume_source_or_send_internal(
|
||||
thread_id,
|
||||
thread.as_ref(),
|
||||
codex_thread.as_ref(),
|
||||
&response_history,
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
@@ -4184,7 +4189,25 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
}
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = response.thread.clone();
|
||||
let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items(
|
||||
&response_history.get_rollout_items(),
|
||||
&token_usage_thread,
|
||||
);
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
// The client needs restored usage before it starts another turn.
|
||||
// Sending after the response preserves JSON-RPC request ordering while
|
||||
// still filling the status line before the next turn lifecycle begins.
|
||||
send_thread_token_usage_update_to_connection(
|
||||
&self.outgoing,
|
||||
connection_id,
|
||||
thread_id,
|
||||
&token_usage_thread,
|
||||
codex_thread.as_ref(),
|
||||
token_usage_turn_id,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -4820,7 +4843,31 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
}
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = response.thread.clone();
|
||||
let token_usage_turn_id = if let Some(turn_id) =
|
||||
latest_token_usage_turn_id_for_thread_path(&token_usage_thread).await
|
||||
{
|
||||
Some(turn_id)
|
||||
} else {
|
||||
latest_token_usage_turn_id_from_rollout_path(
|
||||
rollout_path.as_path(),
|
||||
&token_usage_thread,
|
||||
)
|
||||
.await
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
// Mirror the resume contract for forks: the new thread is usable as soon
|
||||
// as the response arrives, so restored usage must follow immediately.
|
||||
send_thread_token_usage_update_to_connection(
|
||||
&self.outgoing,
|
||||
connection_id,
|
||||
thread_id,
|
||||
&token_usage_thread,
|
||||
forked_thread.as_ref(),
|
||||
token_usage_turn_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
self.outgoing
|
||||
@@ -8525,7 +8572,24 @@ async fn handle_pending_thread_resume_request(
|
||||
sandbox: sandbox_policy.into(),
|
||||
reasoning_effort,
|
||||
};
|
||||
let token_usage_thread = response.thread.clone();
|
||||
let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path(
|
||||
pending.rollout_path.as_path(),
|
||||
&token_usage_thread,
|
||||
)
|
||||
.await;
|
||||
outgoing.send_response(request_id, response).await;
|
||||
// Rejoining a loaded thread has the same UI contract as a cold resume, but
|
||||
// uses the live conversation state instead of reconstructing a new session.
|
||||
send_thread_token_usage_update_to_connection(
|
||||
outgoing,
|
||||
connection_id,
|
||||
conversation_id,
|
||||
&token_usage_thread,
|
||||
conversation.as_ref(),
|
||||
token_usage_turn_id,
|
||||
)
|
||||
.await;
|
||||
outgoing
|
||||
.replay_requests_to_connection_for_thread(connection_id, conversation_id)
|
||||
.await;
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
//! Replays persisted token usage snapshots when a client attaches to an existing thread.
|
||||
//!
|
||||
//! The message processor decides when replay is allowed and preserves JSON-RPC response
|
||||
//! ordering. This module owns notification construction and the attribution rules that
|
||||
//! map the latest persisted `TokenCount` back to a v2 turn id.
|
||||
//!
|
||||
//! Rollout histories can contain explicit turn ids or generated turn ids. When explicit
|
||||
//! ids do not match the rebuilt thread, replay falls back to the active turn position at
|
||||
//! the time the `TokenCount` was persisted so the notification still targets the
|
||||
//! corresponding rebuilt turn.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::ThreadTokenUsage;
|
||||
use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core::CodexThread;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
|
||||
use crate::codex_message_processor::read_rollout_items_from_rollout;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
|
||||
/// Sends a restored token usage update to the connection that attached to a thread.
|
||||
///
|
||||
/// This is lifecycle replay rather than a model event: the rollout already contains
|
||||
/// the original `TokenCount`, and emitting through `send_event` here would duplicate
|
||||
/// persisted usage records. Keeping this helper connection-scoped also avoids
|
||||
/// surprising other subscribers with a historical usage update while they may be
|
||||
/// rendering live turn events.
|
||||
pub(super) async fn send_thread_token_usage_update_to_connection(
|
||||
outgoing: &Arc<OutgoingMessageSender>,
|
||||
connection_id: ConnectionId,
|
||||
thread_id: ThreadId,
|
||||
thread: &Thread,
|
||||
conversation: &CodexThread,
|
||||
token_usage_turn_id: Option<String>,
|
||||
) {
|
||||
let Some(info) = conversation.token_usage_info().await else {
|
||||
return;
|
||||
};
|
||||
let notification = ThreadTokenUsageUpdatedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
turn_id: token_usage_turn_id.unwrap_or_else(|| latest_token_usage_turn_id(thread)),
|
||||
token_usage: ThreadTokenUsage::from(info),
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification_to_connections(
|
||||
&[connection_id],
|
||||
ServerNotification::ThreadTokenUsageUpdated(notification),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(super) async fn latest_token_usage_turn_id_for_thread_path(thread: &Thread) -> Option<String> {
|
||||
let rollout_path = thread.path.as_deref()?;
|
||||
latest_token_usage_turn_id_from_rollout_path(rollout_path, thread).await
|
||||
}
|
||||
|
||||
pub(super) async fn latest_token_usage_turn_id_from_rollout_path(
|
||||
rollout_path: &Path,
|
||||
thread: &Thread,
|
||||
) -> Option<String> {
|
||||
let rollout_items = read_rollout_items_from_rollout(rollout_path).await.ok()?;
|
||||
latest_token_usage_turn_id_from_rollout_items(&rollout_items, thread)
|
||||
}
|
||||
|
||||
/// Identifies the turn that was active when a `TokenCount` record appeared.
|
||||
///
|
||||
/// The id is preferred when it still appears in the rebuilt thread. The position is a
|
||||
/// fallback for histories whose implicit turn ids are regenerated during reconstruction.
|
||||
struct TokenUsageTurnOwner {
|
||||
id: String,
|
||||
position: Option<usize>,
|
||||
}
|
||||
|
||||
pub(super) fn latest_token_usage_turn_id_from_rollout_items(
|
||||
rollout_items: &[RolloutItem],
|
||||
thread: &Thread,
|
||||
) -> Option<String> {
|
||||
let owner = latest_token_usage_turn_owner_from_rollout_items(rollout_items)?;
|
||||
if thread.turns.iter().any(|turn| turn.id == owner.id) {
|
||||
return Some(owner.id);
|
||||
}
|
||||
owner
|
||||
.position
|
||||
.and_then(|position| thread.turns.get(position))
|
||||
.map(|turn| turn.id.clone())
|
||||
}
|
||||
|
||||
fn latest_token_usage_turn_owner_from_rollout_items(
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Option<TokenUsageTurnOwner> {
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
let mut token_usage_turn_owner = None;
|
||||
|
||||
for item in rollout_items {
|
||||
if matches!(item, RolloutItem::EventMsg(EventMsg::TokenCount(_))) {
|
||||
token_usage_turn_owner =
|
||||
builder
|
||||
.active_turn_snapshot()
|
||||
.map(|turn| TokenUsageTurnOwner {
|
||||
id: turn.id,
|
||||
position: builder.active_turn_position(),
|
||||
});
|
||||
}
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
|
||||
token_usage_turn_owner
|
||||
}
|
||||
|
||||
/// Chooses a fallback turn id that should own a replayed token usage update.
|
||||
///
|
||||
/// Normal replay derives the owner from the rollout position of the latest
|
||||
/// `TokenCount` event. This fallback only preserves a stable wire shape for
|
||||
/// unusual histories where that rollout information cannot be read.
|
||||
fn latest_token_usage_turn_id(thread: &Thread) -> String {
|
||||
thread
|
||||
.turns
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|turn| matches!(turn.status, TurnStatus::Completed | TurnStatus::Failed))
|
||||
.or_else(|| thread.turns.last())
|
||||
.map(|turn| turn.id.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
@@ -40,6 +40,7 @@ pub use responses::create_shell_command_sse_response;
|
||||
pub use rollout::create_fake_rollout;
|
||||
pub use rollout::create_fake_rollout_with_source;
|
||||
pub use rollout::create_fake_rollout_with_text_elements;
|
||||
pub use rollout::create_fake_rollout_with_token_usage;
|
||||
pub use rollout::rollout_path;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
use anyhow::Result;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::GitInfo;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TokenCountEvent;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use serde_json::json;
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
@@ -50,6 +54,61 @@ pub fn create_fake_rollout(
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a minimal rollout whose history includes a persisted token usage event.
|
||||
///
|
||||
/// Resume and fork tests use this fixture to verify lifecycle replay of restored
|
||||
/// usage without starting a model turn. The exact token values are intentionally
|
||||
/// non-zero and asymmetric so assertions catch swapped total/last fields and
|
||||
/// dropped cached or reasoning counters.
|
||||
pub fn create_fake_rollout_with_token_usage(
|
||||
codex_home: &Path,
|
||||
filename_ts: &str,
|
||||
meta_rfc3339: &str,
|
||||
preview: &str,
|
||||
model_provider: Option<&str>,
|
||||
) -> Result<String> {
|
||||
let thread_id = create_fake_rollout(
|
||||
codex_home,
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
preview,
|
||||
model_provider,
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let payload = serde_json::to_value(EventMsg::TokenCount(TokenCountEvent {
|
||||
info: Some(TokenUsageInfo {
|
||||
total_token_usage: TokenUsage {
|
||||
input_tokens: 120,
|
||||
cached_input_tokens: 20,
|
||||
output_tokens: 30,
|
||||
reasoning_output_tokens: 10,
|
||||
total_tokens: 150,
|
||||
},
|
||||
last_token_usage: TokenUsage {
|
||||
input_tokens: 70,
|
||||
cached_input_tokens: 10,
|
||||
output_tokens: 20,
|
||||
reasoning_output_tokens: 5,
|
||||
total_tokens: 90,
|
||||
},
|
||||
model_context_window: Some(200_000),
|
||||
}),
|
||||
rate_limits: None,
|
||||
}))?;
|
||||
let file_path = rollout_path(codex_home, filename_ts, &thread_id);
|
||||
let line = json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": payload
|
||||
})
|
||||
.to_string();
|
||||
fs::write(
|
||||
&file_path,
|
||||
format!("{}{}\n", fs::read_to_string(&file_path)?, line),
|
||||
)?;
|
||||
Ok(thread_id)
|
||||
}
|
||||
|
||||
/// Create a minimal rollout file with an explicit session source.
|
||||
pub fn create_fake_rollout_with_source(
|
||||
codex_home: &Path,
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
||||
use app_test_support::ChatGptAuthFixture;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_fake_rollout_with_token_usage;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_chatgpt_auth;
|
||||
@@ -9,6 +10,7 @@ use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
@@ -186,6 +188,59 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_token_usage(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
let note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await??;
|
||||
let parsed: ServerNotification = note.try_into()?;
|
||||
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
||||
panic!("expected thread/tokenUsage/updated notification");
|
||||
};
|
||||
|
||||
assert_eq!(notification.thread_id, thread.id);
|
||||
assert_eq!(notification.turn_id, thread.turns[0].id);
|
||||
assert_eq!(notification.token_usage.total.total_tokens, 150);
|
||||
assert_eq!(notification.token_usage.total.input_tokens, 120);
|
||||
assert_eq!(notification.token_usage.total.cached_input_tokens, 20);
|
||||
assert_eq!(notification.token_usage.total.output_tokens, 30);
|
||||
assert_eq!(notification.token_usage.total.reasoning_output_tokens, 10);
|
||||
assert_eq!(notification.token_usage.last.total_tokens, 90);
|
||||
assert_eq!(notification.token_usage.model_context_window, Some(200_000));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -3,6 +3,7 @@ use app_test_support::ChatGptAuthFixture;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_fake_rollout_with_token_usage;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
@@ -23,6 +24,7 @@ use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::PatchApplyStatus;
|
||||
use codex_app_server_protocol::PatchChangeKind;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
@@ -50,6 +52,11 @@ use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
|
||||
use codex_protocol::protocol::TokenCountEvent;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
@@ -278,6 +285,268 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_token_usage(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
let note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await??;
|
||||
let parsed: ServerNotification = note.try_into()?;
|
||||
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
||||
panic!("expected thread/tokenUsage/updated notification");
|
||||
};
|
||||
|
||||
assert_eq!(notification.thread_id, thread.id);
|
||||
assert_eq!(notification.turn_id, thread.turns[0].id);
|
||||
assert_eq!(notification.token_usage.total.total_tokens, 150);
|
||||
assert_eq!(notification.token_usage.total.input_tokens, 120);
|
||||
assert_eq!(notification.token_usage.total.cached_input_tokens, 20);
|
||||
assert_eq!(notification.token_usage.total.output_tokens, 30);
|
||||
assert_eq!(notification.token_usage.total.reasoning_output_tokens, 10);
|
||||
assert_eq!(notification.token_usage.last.total_tokens, 90);
|
||||
assert_eq!(notification.token_usage.model_context_window, Some(200_000));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_token_usage_replay_ignores_stale_interrupted_tail_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
||||
let conversation_id = create_fake_rollout_with_token_usage(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
)?;
|
||||
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
||||
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
||||
let stale_turn_id = "incomplete-turn-after-token-usage";
|
||||
let appended_rollout = [
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: stale_turn_id.to_string(),
|
||||
started_at: None,
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "Still running".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
]
|
||||
.join("\n");
|
||||
std::fs::write(
|
||||
&rollout_file_path,
|
||||
format!("{persisted_rollout}{appended_rollout}\n"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(thread.turns.len(), 2);
|
||||
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
|
||||
assert_eq!(thread.turns[1].id, stale_turn_id);
|
||||
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
|
||||
|
||||
let note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await??;
|
||||
let parsed: ServerNotification = note.try_into()?;
|
||||
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
||||
panic!("expected thread/tokenUsage/updated notification");
|
||||
};
|
||||
|
||||
assert_eq!(notification.thread_id, thread.id);
|
||||
assert_eq!(notification.turn_id, thread.turns[0].id);
|
||||
assert_ne!(notification.turn_id, stale_turn_id);
|
||||
assert_eq!(notification.token_usage.total.total_tokens, 150);
|
||||
assert_eq!(notification.token_usage.last.total_tokens, 90);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_token_usage_replay_can_belong_to_interrupted_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
||||
let conversation_id = create_fake_rollout_with_token_usage(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
)?;
|
||||
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
||||
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
||||
let interrupted_turn_id = "interrupted-turn-with-token-usage";
|
||||
let appended_rollout = [
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: interrupted_turn_id.to_string(),
|
||||
started_at: None,
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "Interrupted after usage".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::TokenCount(TokenCountEvent {
|
||||
info: Some(TokenUsageInfo {
|
||||
total_token_usage: TokenUsage {
|
||||
input_tokens: 180,
|
||||
cached_input_tokens: 40,
|
||||
output_tokens: 50,
|
||||
reasoning_output_tokens: 15,
|
||||
total_tokens: 230,
|
||||
},
|
||||
last_token_usage: TokenUsage {
|
||||
input_tokens: 90,
|
||||
cached_input_tokens: 30,
|
||||
output_tokens: 40,
|
||||
reasoning_output_tokens: 12,
|
||||
total_tokens: 130,
|
||||
},
|
||||
model_context_window: Some(200_000),
|
||||
}),
|
||||
rate_limits: None,
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some(interrupted_turn_id.to_string()),
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
]
|
||||
.join("\n");
|
||||
std::fs::write(
|
||||
&rollout_file_path,
|
||||
format!("{persisted_rollout}{appended_rollout}\n"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(thread.turns.len(), 2);
|
||||
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
|
||||
assert_eq!(thread.turns[1].id, interrupted_turn_id);
|
||||
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
|
||||
|
||||
let note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await??;
|
||||
let parsed: ServerNotification = note.try_into()?;
|
||||
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
||||
panic!("expected thread/tokenUsage/updated notification");
|
||||
};
|
||||
|
||||
assert_eq!(notification.thread_id, thread.id);
|
||||
assert_eq!(notification.turn_id, interrupted_turn_id);
|
||||
assert_eq!(notification.token_usage.total.total_tokens, 230);
|
||||
assert_eq!(notification.token_usage.last.total_tokens, 130);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -2420,6 +2420,17 @@ impl Session {
|
||||
state.token_info().map(|info| info.total_token_usage)
|
||||
}
|
||||
|
||||
/// Returns the complete token usage snapshot currently cached for this session.
|
||||
///
|
||||
/// Resume and fork reconstruction seed this state from the last persisted rollout
|
||||
/// `TokenCount` event. Callers that need to replay restored usage to a client
|
||||
/// should use this accessor instead of `total_token_usage`, because the app-server
|
||||
/// notification includes both total and last-turn usage.
|
||||
pub(crate) async fn token_usage_info(&self) -> Option<TokenUsageInfo> {
|
||||
let state = self.state.lock().await;
|
||||
state.token_info()
|
||||
}
|
||||
|
||||
pub(crate) async fn get_estimated_token_count(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_protocol::protocol::ThreadMemoryMode;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
@@ -144,6 +145,17 @@ impl CodexThread {
|
||||
self.codex.session.total_token_usage().await
|
||||
}
|
||||
|
||||
/// Returns the complete token usage snapshot currently cached for this thread.
|
||||
///
|
||||
/// This accessor is intentionally narrower than direct session access: it lets
|
||||
/// app-server lifecycle paths replay restored usage after resume or fork without
|
||||
/// exposing broader session mutation authority. A caller that only reads
|
||||
/// `total_token_usage` would drop last-turn usage and make the v2
|
||||
/// `thread/tokenUsage/updated` payload incomplete.
|
||||
pub async fn token_usage_info(&self) -> Option<TokenUsageInfo> {
|
||||
self.codex.session.token_usage_info().await
|
||||
}
|
||||
|
||||
/// Records a user-role session-prefix message without creating a new user turn boundary.
|
||||
pub(crate) async fn inject_user_message_without_turn(&self, message: String) {
|
||||
let message = ResponseItem::Message {
|
||||
|
||||
Reference in New Issue
Block a user