Files
Felipe Coury ec8d4bfc77 fix(app-server): replay token usage after resume and fork (#18023)
## Problem

When a user resumed or forked a session, the TUI could render the
restored thread history immediately, but it did not receive token usage
until a later model turn emitted a fresh usage event. That left the
context/status UI blank or stale during the exact window where the user
expects resumed state to look complete. Core already reconstructed token
usage from the rollout; the missing behavior was app-server lifecycle
replay to the client that just attached.

## Mental model

Token usage has two representations. The rollout is the durable source
of historical `TokenCount` events, and the core session cache is the
in-memory snapshot reconstructed from that rollout on resume or fork.
App-server v2 clients do not read core state directly; they learn about
usage through `thread/tokenUsage/updated`. The fix keeps those roles
separate: core exposes the restored `TokenUsageInfo`, and app-server
sends one targeted notification after a successful `thread/resume` or
`thread/fork` response when that restored snapshot exists.

This notification is not a new model event. It is a replay of
already-persisted state for the client that just attached. That
distinction matters because using the normal core event path here would
risk duplicating `TokenCount` entries in the rollout and making future
resumes count historical usage twice.

## Non-goals

This change does not add a new protocol method or payload shape. It
reuses the existing v2 `thread/tokenUsage/updated` notification and the
TUI’s existing handler for that notification.

This change does not alter how token usage is computed, accumulated,
compacted, or written during turns. It only exposes the token usage that
resume and fork reconstruction already restored.

This change does not broadcast historical usage replay to every
subscribed client. The replay is intentionally scoped to the connection
that requested resume or fork so already-attached clients are not
surprised by an old usage update while they may be rendering live
activity.

## Tradeoffs

Sending the usage notification after the JSON-RPC response preserves a
clear lifecycle order: the client first receives the thread object, then
receives restored usage for that thread. The tradeoff is that usage is
still a notification rather than part of the `thread/resume` or
`thread/fork` response. That keeps the protocol shape stable and avoids
duplicating usage fields across response types, but clients must
continue listening for notifications after receiving the response.

The helper selects the latest non-in-progress turn id for the replayed
usage notification. This is conservative because restored usage belongs
to completed persisted accounting, not to newly attached in-flight work.
The fallback to the last turn preserves a stable wire payload for
unusual histories, but histories with no meaningful completed turn still
have a weak attribution story.

## Architecture

Core already seeds `Session` token state from the last persisted rollout
`TokenCount` during `InitialHistory::Resumed` and
`InitialHistory::Forked`. The new core accessor exposes the complete
`TokenUsageInfo` through `CodexThread` without giving app-server direct
session mutation authority.

App-server calls that accessor from three lifecycle paths: cold
`thread/resume`, running-thread resume/rejoin, and `thread/fork`. In
each path, the server sends the normal response first, then calls a
shared helper that converts core usage into
`ThreadTokenUsageUpdatedNotification` and sends it only to the
requesting connection.

The tests build fake rollouts with a user turn plus a persisted token
usage event. They then exercise `thread/resume` and `thread/fork`
without starting another model turn, proving that restored usage arrives
before any next-turn token event could be produced.

## Observability

The primary debug path is the app-server JSON-RPC stream. After
`thread/resume` or `thread/fork`, a client should see the response
followed by `thread/tokenUsage/updated` when the source rollout includes
token usage. If the notification is absent, check whether the rollout
contains an `event_msg` payload of type `token_count`, whether core
reconstruction seeded `Session::token_usage_info`, and whether the
connection stayed attached long enough to receive the targeted
notification.

The notification is sent through the existing
`OutgoingMessageSender::send_server_notification_to_connections` path,
so existing app-server tracing around server notifications still
applies. Because this is a replay, not a model turn event, debugging
should start at the resume/fork handlers rather than the turn event
translation in `bespoke_event_handling`.

## Tests

The focused regression coverage is `cargo test -p codex-app-server
emits_restored_token_usage`, which covers both resume and fork. The core
reconstruction guard is `cargo test -p codex-core
record_initial_history_seeds_token_info_from_rollout`.

Formatting and lint/fix passes were run with `just fmt`, `just fix -p
codex-core`, and `just fix -p codex-app-server`. Full crate test runs
surfaced pre-existing unrelated failures in command execution and plugin
marketplace tests; the new token usage tests passed in focused runs and
within the app-server suite before the unrelated command execution
failure.
2026-04-16 17:29:34 -03:00

270 lines
8.0 KiB
Rust

use anyhow::Result;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenCountEvent;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use serde_json::json;
use std::fs;
use std::fs::FileTimes;
use std::path::Path;
use std::path::PathBuf;
use uuid::Uuid;
pub fn rollout_path(codex_home: &Path, filename_ts: &str, thread_id: &str) -> PathBuf {
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
codex_home
.join("sessions")
.join(year)
.join(month)
.join(day)
.join(format!("rollout-{filename_ts}-{thread_id}.jsonl"))
}
/// Create a minimal rollout file under `CODEX_HOME/sessions/YYYY/MM/DD/`.
///
/// - `filename_ts` is the filename timestamp component in `YYYY-MM-DDThh-mm-ss` format.
/// - `meta_rfc3339` is the envelope timestamp used in JSON lines.
/// - `preview` is the user message preview text.
/// - `model_provider` optionally sets the provider in the session meta payload.
///
/// Returns the generated conversation/session UUID as a string.
pub fn create_fake_rollout(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
git_info: Option<GitInfo>,
) -> Result<String> {
create_fake_rollout_with_source(
codex_home,
filename_ts,
meta_rfc3339,
preview,
model_provider,
git_info,
SessionSource::Cli,
)
}
/// Creates a minimal rollout whose history includes a persisted token usage event.
///
/// Resume and fork tests use this fixture to verify lifecycle replay of restored
/// usage without starting a model turn. The exact token values are intentionally
/// non-zero and asymmetric so assertions catch swapped total/last fields and
/// dropped cached or reasoning counters.
pub fn create_fake_rollout_with_token_usage(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
) -> Result<String> {
let thread_id = create_fake_rollout(
codex_home,
filename_ts,
meta_rfc3339,
preview,
model_provider,
/*git_info*/ None,
)?;
let payload = serde_json::to_value(EventMsg::TokenCount(TokenCountEvent {
info: Some(TokenUsageInfo {
total_token_usage: TokenUsage {
input_tokens: 120,
cached_input_tokens: 20,
output_tokens: 30,
reasoning_output_tokens: 10,
total_tokens: 150,
},
last_token_usage: TokenUsage {
input_tokens: 70,
cached_input_tokens: 10,
output_tokens: 20,
reasoning_output_tokens: 5,
total_tokens: 90,
},
model_context_window: Some(200_000),
}),
rate_limits: None,
}))?;
let file_path = rollout_path(codex_home, filename_ts, &thread_id);
let line = json!({
"timestamp": meta_rfc3339,
"type": "event_msg",
"payload": payload
})
.to_string();
fs::write(
&file_path,
format!("{}{}\n", fs::read_to_string(&file_path)?, line),
)?;
Ok(thread_id)
}
/// Create a minimal rollout file with an explicit session source.
pub fn create_fake_rollout_with_source(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
git_info: Option<GitInfo>,
source: SessionSource,
) -> Result<String> {
let uuid = Uuid::new_v4();
let uuid_str = uuid.to_string();
let conversation_id = ThreadId::from_string(&uuid_str)?;
let file_path = rollout_path(codex_home, filename_ts, &uuid_str);
let dir = file_path
.parent()
.ok_or_else(|| anyhow::anyhow!("missing rollout parent directory"))?;
fs::create_dir_all(dir)?;
// Build JSONL lines
let meta = SessionMeta {
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: model_provider.map(str::to_string),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let payload = serde_json::to_value(SessionMetaLine {
meta,
git: git_info,
})?;
let lines = [
json!({
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": payload
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
}
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": preview,
"kind": "plain"
}
})
.to_string(),
];
fs::write(&file_path, lines.join("\n") + "\n")?;
let parsed = chrono::DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&chrono::Utc);
let times = FileTimes::new().set_modified(parsed.into());
std::fs::OpenOptions::new()
.append(true)
.open(&file_path)?
.set_times(times)?;
Ok(uuid_str)
}
pub fn create_fake_rollout_with_text_elements(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
text_elements: Vec<serde_json::Value>,
model_provider: Option<&str>,
git_info: Option<GitInfo>,
) -> Result<String> {
let uuid = Uuid::new_v4();
let uuid_str = uuid.to_string();
let conversation_id = ThreadId::from_string(&uuid_str)?;
// sessions/YYYY/MM/DD derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
// Build JSONL lines
let meta = SessionMeta {
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: model_provider.map(str::to_string),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let payload = serde_json::to_value(SessionMetaLine {
meta,
git: git_info,
})?;
let lines = [
json!( {
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": payload
})
.to_string(),
json!( {
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
}
})
.to_string(),
json!( {
"timestamp": meta_rfc3339,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": preview,
"text_elements": text_elements,
"local_images": []
}
})
.to_string(),
];
fs::write(file_path, lines.join("\n") + "\n")?;
Ok(uuid_str)
}