From bee78806a9b8fad5f250f7822da08008b2c1dd01 Mon Sep 17 00:00:00 2001 From: ningyi-oai Date: Wed, 27 May 2026 11:09:33 -0700 Subject: [PATCH] [codex] add compaction metadata to turn headers (#24368) ## Summary - Add `request_kind` values for foreground turn, startup prewarm, compaction, and detached memory model requests. - Attach compaction dispatch metadata to local Responses, legacy `/v1/responses/compact`, and remote v2 compact requests. - Add the existing logical context-window identifier as `window_id` on turn-owned model request metadata. - Keep identity fields optional for detached memory requests, while still emitting `request_kind="memory"` in non-git/no-sandbox workspaces. ## Root Cause `x-codex-turn-metadata` has more than one producer. Foreground turns and compaction requests own a real turn and should carry that turn identity. Detached memory stage-one requests do not own a foreground turn, so absent identity fields are valid rather than missing data. Startup websocket prewarm is also a model request, but it has `generate=false` and must not be counted as a foreground turn. `thread_source` or session source identifies where a thread came from (for example review, guardian, or another subagent). `request_kind` identifies what the current outbound model request is doing (`turn`, `prewarm`, `compaction`, or `memory`). A review or guardian thread can issue either a normal turn request or a compaction request, so source cannot replace request kind. ## Behavior / Impact - Ordinary foreground requests send `request_kind="turn"`, their real identity fields, and `window_id=":"`. - Startup websocket warmup requests send `request_kind="prewarm"` so they are not counted as foreground turns. - Compaction requests send `request_kind="compaction"`, their real owning turn identity, the existing `window_id`, and `compaction.{trigger,reason,implementation,phase,strategy}`. - Detached memory stage-one requests send `request_kind="memory"` without `session_id`, `thread_id`, `turn_id`, or `window_id`; when no workspace metadata exists, the kind-only header is still emitted. - `session_id`, `thread_id`, `turn_id`, and `window_id` remain optional in the header schema because detached memory requests do not own a foreground turn or context window. - `window_id` is not a new ID system: it is copied from the already-sent `x-codex-window-id` / WS client metadata value at model-request dispatch time. - Existing `x-codex-window-id` HTTP/WS emission, value format, generation advancement, resume behavior, and fork reset behavior are unchanged. - `request_kind`, `window_id`, and upstream turn-owned identity fields remain schema-owned; input `responsesapi_client_metadata` cannot replace their canonical values. - No table, DAG, export, app-server API, or MCP `_meta` schema changes are included. A compaction attempt stopped by a pre-compact hook issues no model request and therefore has no request header; its outcome remains in analytics events. Status, error, duration, and token deltas also remain analytics fields rather than request-header fields. Future detached-memory attribution using a real initiating turn ID as `trigger_turn_id` is intentionally not part of this PR. ## Sync With Main - Final pushed head `716342e79` is rebased onto `origin/main@0d37db4b2`. - The metadata conflict came from upstream `#24160`, which added `forked_from_thread_id` on the same `turn_metadata` surface. Resolution preserves that field and its protection from client metadata override alongside this PR's request-kind, compaction, and window-id fields. - While resolving the overlapping commits, I removed an accidental recursive model-request overlay and a duplicate detached-memory header builder before completing the rebase. ## Latency / User Experience Boundary - Foreground turns perform no new filesystem, git, or network work. New fields are inserted into metadata already serialized for outgoing requests. - Compaction issues the same model/HTTP requests with the same prompt, model, service tier, and sampling settings; only metadata bytes change. - Startup prewarm already sent metadata; it is now correctly classified as `prewarm`. - Non-git detached memory now sends a small kind-only metadata header rather than no header. - This client diff adds no user-visible latency mechanism beyond negligible serialization and header bytes on already-existing requests. ## Validation On conflict-resolved head `1d35c2cfb` based on `origin/main@487521733`: - `just fmt` (passed) - `just fix -p codex-core` (passed) - `git diff --check origin/main...HEAD` (passed) - `just test -p codex-core -E 'test(turn_metadata) | test(websocket_first_turn_uses_startup_prewarm_and_create) | test(responses_stream_includes_turn_metadata_header_for_git_workspace_e2e) | test(responses_websocket_forwards_turn_metadata_on_initial_and_incremental_create) | test(remote_compact_v2_retries_failures_with_stream_retry_budget) | test(window_id_advances_after_compact_persists_on_resume_and_resets_on_fork)'` (`23 passed`; `bench-smoke` passed) - `just test -p codex-app-server -E 'test(turn_start_forwards_client_metadata_to_responses_request_v2) | test(turn_start_forwards_client_metadata_to_responses_websocket_request_body_v2) | test(auto_compaction_remote_emits_started_and_completed_items)'` (`3 passed`; `bench-smoke` passed) - `just test -p codex-memories-write` (`29 passed`; `bench-smoke` passed) --- .../tests/suite/v2/client_metadata.rs | 8 + .../app-server/tests/suite/v2/compaction.rs | 56 +++++ codex-rs/core/src/client.rs | 5 +- codex-rs/core/src/compact.rs | 10 +- codex-rs/core/src/compact_remote.rs | 22 +- codex-rs/core/src/compact_remote_v2.rs | 14 +- codex-rs/core/src/session/turn.rs | 5 +- codex-rs/core/src/session_startup_prewarm.rs | 3 +- codex-rs/core/src/turn_metadata.rs | 216 ++++++++++++++---- codex-rs/core/src/turn_metadata_tests.rs | 98 +++++++- codex-rs/core/tests/suite/agent_websocket.rs | 16 ++ codex-rs/core/tests/suite/compact.rs | 47 ++++ codex-rs/core/tests/suite/compact_remote.rs | 149 +++++++++++- codex-rs/memories/write/src/runtime.rs | 2 +- codex-rs/memories/write/src/startup_tests.rs | 33 ++- 15 files changed, 628 insertions(+), 56 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/client_metadata.rs b/codex-rs/app-server/tests/suite/v2/client_metadata.rs index 3a00a433d1..9daa8d34f1 100644 --- a/codex-rs/app-server/tests/suite/v2/client_metadata.rs +++ b/codex-rs/app-server/tests/suite/v2/client_metadata.rs @@ -105,6 +105,10 @@ async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result assert_eq!(metadata["thread_source"].as_str(), Some("client-supplied")); assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str())); assert!(metadata.get("session_id").is_some()); + assert_eq!( + metadata["window_id"].as_str(), + request.header("x-codex-window-id").as_deref() + ); Ok(()) } @@ -497,6 +501,10 @@ async fn turn_start_forwards_client_metadata_to_responses_websocket_request_body assert_eq!(metadata["origin"].as_str(), Some("gaas")); assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str())); assert!(metadata.get("session_id").is_some()); + assert_eq!( + metadata["window_id"].as_str(), + request["client_metadata"]["x-codex-window-id"].as_str() + ); websocket_server.shutdown().await; Ok(()) diff --git a/codex-rs/app-server/tests/suite/v2/compaction.rs b/codex-rs/app-server/tests/suite/v2/compaction.rs index 6db031b278..2cf9f6b36c 100644 --- a/codex-rs/app-server/tests/suite/v2/compaction.rs +++ b/codex-rs/app-server/tests/suite/v2/compaction.rs @@ -190,6 +190,58 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<() let response_requests = responses_log.requests(); assert_eq!(response_requests.len(), 3); + let turn_metadata = response_requests + .iter() + .map(|request| { + request + .header("x-codex-turn-metadata") + .as_deref() + .map(parse_json_header) + .unwrap_or_else(|| panic!("turn request should include turn metadata")) + }) + .collect::>(); + for (request, metadata) in response_requests.iter().zip(&turn_metadata) { + assert_eq!(metadata["request_kind"].as_str(), Some("turn")); + assert!( + metadata["turn_id"] + .as_str() + .is_some_and(|turn_id| !turn_id.is_empty()), + "turn request should carry a non-empty turn id" + ); + assert_eq!( + metadata["window_id"].as_str(), + request.header("x-codex-window-id").as_deref() + ); + assert!(metadata.get("compaction").is_none()); + } + + let compact_metadata = compact_requests[0] + .header("x-codex-turn-metadata") + .as_deref() + .map(parse_json_header) + .unwrap_or_else(|| panic!("compact request should include turn metadata")); + assert_eq!( + compact_metadata["request_kind"].as_str(), + Some("compaction") + ); + assert_eq!( + compact_metadata["compaction"], + serde_json::json!({ + "trigger": "auto", + "reason": "context_limit", + "implementation": "responses_compact", + "phase": "pre_turn", + "strategy": "memento", + }) + ); + assert_eq!( + compact_metadata["turn_id"], turn_metadata[2]["turn_id"], + "pre-turn compaction should carry the current turn id" + ); + assert_eq!( + compact_metadata["window_id"].as_str(), + compact_requests[0].header("x-codex-window-id").as_deref() + ); Ok(()) } @@ -407,3 +459,7 @@ async fn wait_for_context_compaction_completed( } } } + +fn parse_json_header(value: &str) -> serde_json::Value { + serde_json::from_str(value).unwrap_or_else(|err| panic!("turn metadata should be json: {err}")) +} diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index d84b32a8b4..6c87400edd 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -383,7 +383,7 @@ impl ModelClient { self.store_cached_websocket_session(WebsocketSession::default()); } - fn current_window_id(&self) -> String { + pub(crate) fn current_window_id(&self) -> String { let thread_id = self.state.thread_id; let window_generation = self.state.window_generation.load(Ordering::Relaxed); format!("{thread_id}:{window_generation}") @@ -441,6 +441,7 @@ impl ModelClient { settings: CompactConversationRequestSettings, session_telemetry: &SessionTelemetry, compaction_trace: &CompactionTraceContext, + turn_metadata_header: Option<&str>, ) -> Result> { if prompt.input.is_empty() { return Ok(Vec::new()); @@ -496,7 +497,7 @@ impl ModelClient { extra_headers.extend(build_responses_headers( self.state.beta_features_header.as_deref(), /*turn_state*/ None, - /*turn_metadata_header*/ None, + parse_turn_metadata_header(turn_metadata_header).as_ref(), )); extra_headers.extend(self.build_responses_identity_headers()); extra_headers.extend(build_session_headers( diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index f2bded8d06..1a6256ad76 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -13,6 +13,7 @@ use crate::session::PreviousTurnSettings; use crate::session::session::Session; use crate::session::turn::get_last_assistant_message_from_turn; use crate::session::turn_context::TurnContext; +use crate::turn_metadata::CompactionTurnMetadata; use crate::util::backoff; use codex_analytics::CodexCompactionEvent; use codex_analytics::CompactionImplementation; @@ -128,6 +129,8 @@ async fn run_compact_task_inner( reason: CompactionReason, phase: CompactionPhase, ) -> CodexResult<()> { + let compaction_metadata = + CompactionTurnMetadata::new(trigger, reason, CompactionImplementation::Responses, phase); let attempt = CompactionAnalyticsAttempt::begin( sess.as_ref(), turn_context.as_ref(), @@ -153,6 +156,7 @@ async fn run_compact_task_inner( Arc::clone(&turn_context), input, initial_context_injection, + compaction_metadata, ) .await; let status = compaction_status_from_result(&result); @@ -173,6 +177,7 @@ async fn run_compact_task_inner_impl( turn_context: Arc, input: Vec, initial_context_injection: InitialContextInjection, + compaction_metadata: CompactionTurnMetadata, ) -> CodexResult { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(&turn_context, &compaction_item) @@ -204,7 +209,10 @@ async fn run_compact_task_inner_impl( personality: turn_context.personality, ..Default::default() }; - let turn_metadata_header = turn_context.turn_metadata_state.current_header_value(); + let window_id = sess.services.model_client.current_window_id(); + let turn_metadata_header = turn_context + .turn_metadata_state + .current_header_value_for_compaction(&window_id, compaction_metadata); let attempt_result = drain_to_completed( &sess, turn_context.as_ref(), diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 969da47544..b4456dc717 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -17,6 +17,7 @@ use crate::hook_runtime::run_pre_compact_hooks; use crate::session::session::Session; use crate::session::turn::built_tools; use crate::session::turn_context::TurnContext; +use crate::turn_metadata::CompactionTurnMetadata; use codex_analytics::CompactionImplementation; use codex_analytics::CompactionPhase; use codex_analytics::CompactionReason; @@ -89,6 +90,12 @@ async fn run_remote_compact_task_inner( reason: CompactionReason, phase: CompactionPhase, ) -> CodexResult<()> { + let compaction_metadata = CompactionTurnMetadata::new( + trigger, + reason, + CompactionImplementation::ResponsesCompact, + phase, + ); let attempt = CompactionAnalyticsAttempt::begin( sess.as_ref(), turn_context.as_ref(), @@ -113,8 +120,13 @@ async fn run_remote_compact_task_inner( return Err(CodexErr::TurnAborted); } } - let result = - run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await; + let result = run_remote_compact_task_inner_impl( + sess, + turn_context, + initial_context_injection, + compaction_metadata, + ) + .await; let status = compaction_status_from_result(&result); let error = result.as_ref().err().map(ToString::to_string); if result.is_ok() { @@ -139,6 +151,7 @@ async fn run_remote_compact_task_inner_impl( sess: &Arc, turn_context: &Arc, initial_context_injection: InitialContextInjection, + compaction_metadata: CompactionTurnMetadata, ) -> CodexResult<()> { let context_compaction_item = ContextCompactionItem::new(); // Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events, @@ -186,6 +199,10 @@ async fn run_remote_compact_task_inner_impl( output_schema: None, output_schema_strict: true, }; + let window_id = sess.services.model_client.current_window_id(); + let turn_metadata_header = turn_context + .turn_metadata_state + .current_header_value_for_compaction(&window_id, compaction_metadata); let mut new_history = sess .services .model_client @@ -203,6 +220,7 @@ async fn run_remote_compact_task_inner_impl( }, &turn_context.session_telemetry, &compaction_trace, + turn_metadata_header.as_deref(), ) .or_else(|err| async { let total_usage_breakdown = sess.get_total_token_usage_breakdown().await; diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index dcf241f990..0da3017f7d 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -21,6 +21,7 @@ use crate::responses_retry::handle_retryable_response_stream_error; use crate::session::session::Session; use crate::session::turn::built_tools; use crate::session::turn_context::TurnContext; +use crate::turn_metadata::CompactionTurnMetadata; use codex_analytics::CompactionImplementation; use codex_analytics::CompactionPhase; use codex_analytics::CompactionReason; @@ -105,6 +106,12 @@ async fn run_remote_compact_task_inner( reason: CompactionReason, phase: CompactionPhase, ) -> CodexResult<()> { + let compaction_metadata = CompactionTurnMetadata::new( + trigger, + reason, + CompactionImplementation::ResponsesCompactionV2, + phase, + ); let attempt = CompactionAnalyticsAttempt::begin( sess.as_ref(), turn_context.as_ref(), @@ -134,6 +141,7 @@ async fn run_remote_compact_task_inner( turn_context, client_session, initial_context_injection, + compaction_metadata, ) .await; let status = compaction_status_from_result(&result); @@ -161,6 +169,7 @@ async fn run_remote_compact_task_inner_impl( turn_context: &Arc, client_session: Option<&mut ModelClientSession>, initial_context_injection: InitialContextInjection, + compaction_metadata: CompactionTurnMetadata, ) -> CodexResult<()> { let context_compaction_item = ContextCompactionItem::new(); let compaction_trace = sess.services.rollout_thread_trace.compaction_trace_context( @@ -208,7 +217,10 @@ async fn run_remote_compact_task_inner_impl( output_schema_strict: true, }; - let turn_metadata_header = turn_context.turn_metadata_state.current_header_value(); + let window_id = sess.services.model_client.current_window_id(); + let turn_metadata_header = turn_context + .turn_metadata_state + .current_header_value_for_compaction(&window_id, compaction_metadata); let trace_attempt = compaction_trace.start_attempt(&serde_json::json!({ "model": turn_context.model_info.slug.as_str(), "instructions": prompt.base_instructions.text.as_str(), diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 2b9fa02529..fe20b76e4d 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -234,7 +234,10 @@ pub(crate) async fn run_turn( .for_prompt(&turn_context.model_info.input_modalities) }; - let turn_metadata_header = turn_context.turn_metadata_state.current_header_value(); + let window_id = sess.services.model_client.current_window_id(); + let turn_metadata_header = turn_context + .turn_metadata_state + .current_header_value_for_model_request(&window_id); match run_sampling_request( Arc::clone(&sess), Arc::clone(&turn_context), diff --git a/codex-rs/core/src/session_startup_prewarm.rs b/codex-rs/core/src/session_startup_prewarm.rs index ca21e3ea72..0669e54469 100644 --- a/codex-rs/core/src/session_startup_prewarm.rs +++ b/codex-rs/core/src/session_startup_prewarm.rs @@ -256,9 +256,10 @@ async fn schedule_startup_prewarm_inner( build_prompt_started_at.elapsed(), /*status*/ None, ); + let window_id = session.services.model_client.current_window_id(); let startup_turn_metadata_header = startup_turn_context .turn_metadata_state - .current_header_value(); + .current_header_value_for_prewarm(&window_id); let mut client_session = session.services.model_client.new_session(); let websocket_warmup_started_at = Instant::now(); client_session diff --git a/codex-rs/core/src/turn_metadata.rs b/codex-rs/core/src/turn_metadata.rs index 52a714960a..4f9406d4b0 100644 --- a/codex-rs/core/src/turn_metadata.rs +++ b/codex-rs/core/src/turn_metadata.rs @@ -6,6 +6,11 @@ use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use codex_analytics::CompactionImplementation; +use codex_analytics::CompactionPhase; +use codex_analytics::CompactionReason; +use codex_analytics::CompactionStrategy; +use codex_analytics::CompactionTrigger; use codex_utils_string::to_ascii_json_string; use serde::Serialize; use serde_json::Value; @@ -27,12 +32,54 @@ const MODEL_KEY: &str = "model"; const REASONING_EFFORT_KEY: &str = "reasoning_effort"; const TURN_STARTED_AT_UNIX_MS_KEY: &str = "turn_started_at_unix_ms"; const USER_INPUT_REQUESTED_DURING_TURN_KEY: &str = "user_input_requested_during_turn"; +const REQUEST_KIND_KEY: &str = "request_kind"; +const COMPACTION_KEY: &str = "compaction"; +const WINDOW_ID_KEY: &str = "window_id"; pub(crate) struct McpTurnMetadataContext<'a> { pub(crate) model: &'a str, pub(crate) reasoning_effort: Option, } +/// Metadata present only on outbound model requests that perform compaction. +/// +/// These fields describe the operation at dispatch time. Post-response outcomes such as status, +/// error, duration, and token deltas remain in compaction analytics events. +#[derive(Clone, Copy, Debug, Serialize)] +pub(crate) struct CompactionTurnMetadata { + trigger: CompactionTrigger, + reason: CompactionReason, + implementation: CompactionImplementation, + phase: CompactionPhase, + strategy: CompactionStrategy, +} + +impl CompactionTurnMetadata { + pub(crate) fn new( + trigger: CompactionTrigger, + reason: CompactionReason, + implementation: CompactionImplementation, + phase: CompactionPhase, + ) -> Self { + Self { + trigger, + reason, + implementation, + phase, + strategy: CompactionStrategy::Memento, + } + } +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "snake_case")] +enum TurnMetadataRequestKind { + Turn, + Prewarm, + Compaction, + Memory, +} + #[derive(Clone, Debug, Default)] struct WorkspaceGitMetadata { associated_remote_urls: Option>, @@ -68,8 +115,15 @@ impl From for TurnMetadataWorkspace { } } -#[derive(Clone, Debug, Serialize, Default)] +/// Base payload for the outbound model request `x-codex-turn-metadata` header. +/// +/// Turn-owned state populates identity fields, including optional fork lineage. A concrete +/// request kind is added at outbound model dispatch so turns, startup prewarm, and compaction +/// remain distinguishable. Detached memory requests are constructed as `memory` directly. +#[derive(Clone, Debug, Serialize)] pub(crate) struct TurnMetadataBag { + #[serde(default, skip_serializing_if = "Option::is_none")] + request_kind: Option, #[serde(default, skip_serializing_if = "Option::is_none")] session_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -87,6 +141,41 @@ pub(crate) struct TurnMetadataBag { } impl TurnMetadataBag { + fn new( + request_kind: Option, + session_id: Option, + thread_id: Option, + forked_from_thread_id: Option, + thread_source: Option, + turn_id: Option, + sandbox: Option, + ) -> Self { + Self { + request_kind, + session_id, + thread_id, + forked_from_thread_id, + thread_source, + turn_id, + workspaces: BTreeMap::new(), + sandbox, + } + } + + fn with_workspace_git_metadata( + mut self, + repo_root: Option, + workspace_git_metadata: Option, + ) -> Self { + if let (Some(repo_root), Some(workspace_git_metadata)) = (repo_root, workspace_git_metadata) + && !workspace_git_metadata.is_empty() + { + self.workspaces + .insert(repo_root, workspace_git_metadata.into()); + } + self + } + fn to_header_value(&self) -> Option { to_ascii_json_string(self).ok() } @@ -117,6 +206,9 @@ fn merge_turn_metadata( | "turn_id" | TURN_STARTED_AT_UNIX_MS_KEY | "forked_from_thread_id" + | REQUEST_KIND_KEY + | COMPACTION_KEY + | WINDOW_ID_KEY ) { continue; } @@ -140,32 +232,24 @@ pub async fn build_turn_metadata_header( get_has_changes(cwd), ); let latest_git_commit_hash = head_commit_hash.map(|sha| sha.0); - if latest_git_commit_hash.is_none() - && associated_remote_urls.is_none() - && has_changes.is_none() - && sandbox.is_none() - { - return None; - } - - let workspace_git_metadata = WorkspaceGitMetadata { - associated_remote_urls, - latest_git_commit_hash, - has_changes, - }; - let mut metadata = TurnMetadataBag { - sandbox: sandbox.map(ToString::to_string), - ..Default::default() - }; - if let Some(repo_root) = repo_root - && !workspace_git_metadata.is_empty() - { - metadata - .workspaces - .insert(repo_root, workspace_git_metadata.into()); - } - - metadata.to_header_value() + TurnMetadataBag::new( + Some(TurnMetadataRequestKind::Memory), + /*session_id*/ None, + /*thread_id*/ None, + /*forked_from_thread_id*/ None, + /*thread_source*/ None, + /*turn_id*/ None, + sandbox.map(ToString::to_string), + ) + .with_workspace_git_metadata( + repo_root, + Some(WorkspaceGitMetadata { + associated_remote_urls, + latest_git_commit_hash, + has_changes, + }), + ) + .to_header_value() } #[derive(Clone, Debug)] @@ -173,7 +257,7 @@ pub(crate) struct TurnMetadataState { cwd: AbsolutePathBuf, repo_root: Option, base_metadata: TurnMetadataBag, - base_header: String, + base_header: Option, enriched_header: Arc>>, turn_started_at_unix_ms: Arc>>, responsesapi_client_metadata: Arc>>>, @@ -203,18 +287,16 @@ impl TurnMetadataState { ) .to_string(), ); - let base_metadata = TurnMetadataBag { - session_id: Some(session_id), - thread_id: Some(thread_id), + let base_metadata = TurnMetadataBag::new( + /*request_kind*/ None, + Some(session_id), + Some(thread_id), forked_from_thread_id, thread_source, - turn_id: Some(turn_id), + Some(turn_id), sandbox, - ..Default::default() - }; - let base_header = base_metadata - .to_header_value() - .unwrap_or_else(|| "{}".to_string()); + ); + let base_header = base_metadata.to_header_value(); Self { cwd, @@ -239,7 +321,7 @@ impl TurnMetadataState { { header } else { - self.base_header.clone() + self.base_header.clone()? }; let turn_started_at_unix_ms = *self .turn_started_at_unix_ms @@ -264,6 +346,7 @@ impl TurnMetadataState { ) -> Option { let header = self.current_header_value()?; let mut metadata = serde_json::from_str::>(&header).ok()?; + metadata.remove(REQUEST_KIND_KEY); metadata.insert( MODEL_KEY.to_string(), Value::String(context.model.to_string()), @@ -293,6 +376,52 @@ impl TurnMetadataState { Some(Value::Object(metadata)) } + fn current_header_value_for_model_request_kind( + &self, + window_id: &str, + request_kind: TurnMetadataRequestKind, + ) -> Option { + let header = self.current_header_value()?; + let mut metadata = serde_json::from_str::>(&header).ok()?; + metadata.insert( + REQUEST_KIND_KEY.to_string(), + serde_json::to_value(request_kind).ok()?, + ); + metadata.insert( + WINDOW_ID_KEY.to_string(), + Value::String(window_id.to_string()), + ); + to_ascii_json_string(&metadata).ok() + } + + pub(crate) fn current_header_value_for_model_request(&self, window_id: &str) -> Option { + self.current_header_value_for_model_request_kind(window_id, TurnMetadataRequestKind::Turn) + } + + pub(crate) fn current_header_value_for_prewarm(&self, window_id: &str) -> Option { + self.current_header_value_for_model_request_kind( + window_id, + TurnMetadataRequestKind::Prewarm, + ) + } + + pub(crate) fn current_header_value_for_compaction( + &self, + window_id: &str, + compaction: CompactionTurnMetadata, + ) -> Option { + let header = self.current_header_value_for_model_request_kind( + window_id, + TurnMetadataRequestKind::Compaction, + )?; + let mut metadata = serde_json::from_str::>(&header).ok()?; + metadata.insert( + COMPACTION_KEY.to_string(), + serde_json::to_value(compaction).ok()?, + ); + to_ascii_json_string(&metadata).ok() + } + pub(crate) fn mark_user_input_requested_during_turn(&self) { self.user_input_requested_during_turn .store(true, Ordering::Relaxed); @@ -335,13 +464,14 @@ impl TurnMetadataState { let Some(repo_root) = state.repo_root.clone() else { return; }; - if workspace_git_metadata.is_empty() { + + let enriched_metadata = state + .base_metadata + .clone() + .with_workspace_git_metadata(Some(repo_root), Some(workspace_git_metadata)); + if enriched_metadata.workspaces.is_empty() { return; } - let mut enriched_metadata = state.base_metadata.clone(); - enriched_metadata - .workspaces - .insert(repo_root, workspace_git_metadata.into()); if let Some(header_value) = enriched_metadata.to_header_value() { *state diff --git a/codex-rs/core/src/turn_metadata_tests.rs b/codex-rs/core/src/turn_metadata_tests.rs index a874b0db9b..a482392726 100644 --- a/codex-rs/core/src/turn_metadata_tests.rs +++ b/codex-rs/core/src/turn_metadata_tests.rs @@ -20,7 +20,7 @@ fn test_mcp_turn_metadata_context() -> McpTurnMetadataContext<'static> { } #[tokio::test] -async fn build_turn_metadata_header_includes_has_changes_for_clean_repo() { +async fn build_turn_metadata_header_marks_detached_memory_without_turn_identity() { let temp_dir = TempDir::new().expect("temp dir"); let repo_path = temp_dir.path().join("repo-東京").abs(); std::fs::create_dir_all(&repo_path).expect("create repo"); @@ -64,6 +64,13 @@ async fn build_turn_metadata_header_includes_has_changes_for_clean_repo() { assert!(header.is_ascii()); assert!(!header.contains("東京")); let parsed: Value = serde_json::from_str(&header).expect("valid json"); + assert_eq!(parsed["request_kind"].as_str(), Some("memory")); + assert!(parsed.get("session_id").is_none()); + assert!(parsed.get("thread_id").is_none()); + assert!(parsed.get("forked_from_thread_id").is_none()); + assert!(parsed.get("turn_id").is_none()); + assert!(parsed.get(WINDOW_ID_KEY).is_none()); + let expected_repo_path = repo_path.to_string_lossy().into_owned(); let actual_repo_path = parsed .get("workspaces") @@ -77,13 +84,25 @@ async fn build_turn_metadata_header_includes_has_changes_for_clean_repo() { .and_then(|workspaces| workspaces.values().next()) .cloned() .expect("workspace"); - assert_eq!( workspace.get("has_changes").and_then(Value::as_bool), Some(false) ); } +#[tokio::test] +async fn build_turn_metadata_header_marks_memory_without_workspace_metadata() { + let temp_dir = TempDir::new().expect("temp dir"); + let cwd = temp_dir.path().abs(); + + let header = build_turn_metadata_header(&cwd, /*sandbox*/ None) + .await + .expect("detached memory should emit its request kind"); + let parsed: Value = serde_json::from_str(&header).expect("valid json"); + + assert_eq!(parsed, serde_json::json!({"request_kind": "memory"})); +} + #[test] fn turn_metadata_state_uses_platform_sandbox_tag() { let temp_dir = TempDir::new().expect("temp dir"); @@ -109,6 +128,7 @@ fn turn_metadata_state_uses_platform_sandbox_tag() { let thread_id = json.get("thread_id").and_then(Value::as_str); let thread_source = json.get("thread_source").and_then(Value::as_str); + assert!(json.get("request_kind").is_none()); let expected_sandbox = permission_profile_sandbox_tag( &permission_profile, WindowsSandboxLevel::Disabled, @@ -228,6 +248,7 @@ fn turn_metadata_state_includes_model_and_reasoning_effort_only_in_request_meta( let meta = state .current_meta_value_for_mcp_request(test_mcp_turn_metadata_context()) .expect("turn metadata should be present"); + assert!(meta.get("request_kind").is_none()); assert_eq!(meta["model"].as_str(), Some("gpt-5.4")); assert_eq!(meta["reasoning_effort"].as_str(), Some("high")); @@ -367,7 +388,10 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields( "forked_from_thread_id".to_string(), "client-supplied".to_string(), ), + ("turn_id".to_string(), "client-supplied".to_string()), + (WINDOW_ID_KEY.to_string(), "client-supplied".to_string()), ("thread_source".to_string(), "client-supplied".to_string()), + ("request_kind".to_string(), "client-supplied".to_string()), ( "turn_started_at_unix_ms".to_string(), "client-supplied".to_string(), @@ -392,14 +416,84 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields( ); assert_eq!(json["thread_source"].as_str(), Some("user")); assert_eq!(json["turn_id"].as_str(), Some("turn-a")); + assert!(json.get("request_kind").is_none()); + assert!(json.get(WINDOW_ID_KEY).is_none()); assert_eq!( json["turn_started_at_unix_ms"].as_i64(), Some(1_700_000_000_123) ); + let model_request_header = state + .current_header_value_for_model_request("thread-a:1") + .expect("model request header"); + let model_request_json: Value = + serde_json::from_str(&model_request_header).expect("model request json"); + assert_eq!(model_request_json["request_kind"].as_str(), Some("turn")); + assert_eq!( + model_request_json[WINDOW_ID_KEY].as_str(), + Some("thread-a:1") + ); + let meta = state .current_meta_value_for_mcp_request(test_mcp_turn_metadata_context()) .expect("turn metadata should be present"); assert_eq!(meta["model"].as_str(), Some("gpt-5.4")); assert_eq!(meta["reasoning_effort"].as_str(), Some("high")); + assert!(meta.get(WINDOW_ID_KEY).is_none()); +} + +#[test] +fn turn_metadata_state_overlays_compaction_only_on_compaction_requests() { + let temp_dir = TempDir::new().expect("temp dir"); + let cwd = temp_dir.path().abs(); + let permission_profile = PermissionProfile::read_only(); + let state = TurnMetadataState::new( + "session-a".to_string(), + "thread-a".to_string(), + /*forked_from_thread_id*/ None, + Some(ThreadSource::User), + "turn-a".to_string(), + cwd, + &permission_profile, + WindowsSandboxLevel::Disabled, + /*enforce_managed_network*/ false, + ); + state.set_responsesapi_client_metadata(HashMap::from([( + "compaction".to_string(), + "client-supplied".to_string(), + )])); + + let compact_header = state + .current_header_value_for_compaction( + "thread-a:2", + CompactionTurnMetadata::new( + CompactionTrigger::Auto, + CompactionReason::ContextLimit, + CompactionImplementation::ResponsesCompactionV2, + CompactionPhase::MidTurn, + ), + ) + .expect("compact header"); + let compact_json: Value = serde_json::from_str(&compact_header).expect("json"); + assert_eq!(compact_json["request_kind"].as_str(), Some("compaction")); + assert_eq!(compact_json["turn_id"].as_str(), Some("turn-a")); + assert_eq!(compact_json[WINDOW_ID_KEY].as_str(), Some("thread-a:2")); + assert_eq!( + compact_json["compaction"], + serde_json::json!({ + "trigger": "auto", + "reason": "context_limit", + "implementation": "responses_compaction_v2", + "phase": "mid_turn", + "strategy": "memento", + }) + ); + + let regular_header = state + .current_header_value_for_model_request("thread-a:3") + .expect("regular header"); + let regular_json: Value = serde_json::from_str(®ular_header).expect("json"); + assert_eq!(regular_json["request_kind"].as_str(), Some("turn")); + assert_eq!(regular_json[WINDOW_ID_KEY].as_str(), Some("thread-a:3")); + assert!(regular_json.get("compaction").is_none()); } diff --git a/codex-rs/core/tests/suite/agent_websocket.rs b/codex-rs/core/tests/suite/agent_websocket.rs index 6fe697d2b1..c405e35995 100644 --- a/codex-rs/core/tests/suite/agent_websocket.rs +++ b/codex-rs/core/tests/suite/agent_websocket.rs @@ -95,6 +95,16 @@ async fn websocket_first_turn_uses_startup_prewarm_and_create() -> Result<()> { let turn = connection.get(1).expect("missing turn request").body_json(); assert_eq!(warmup["type"].as_str(), Some("response.create")); assert_eq!(warmup["generate"].as_bool(), Some(false)); + let warmup_metadata: Value = serde_json::from_str( + warmup["client_metadata"]["x-codex-turn-metadata"] + .as_str() + .expect("warmup turn metadata"), + )?; + assert_eq!(warmup_metadata["request_kind"].as_str(), Some("prewarm")); + assert_eq!( + warmup_metadata["window_id"].as_str(), + warmup["client_metadata"]["x-codex-window-id"].as_str() + ); assert!( turn["tools"] .as_array() @@ -102,6 +112,12 @@ async fn websocket_first_turn_uses_startup_prewarm_and_create() -> Result<()> { "expected request tools to be populated" ); assert_eq!(turn["type"].as_str(), Some("response.create")); + let turn_metadata: Value = serde_json::from_str( + turn["client_metadata"]["x-codex-turn-metadata"] + .as_str() + .expect("turn metadata"), + )?; + assert_eq!(turn_metadata["request_kind"].as_str(), Some("turn")); server.shutdown().await; Ok(()) diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 20f4eae489..0e2c02de91 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -2788,11 +2788,58 @@ async fn manual_compact_twice_preserves_latest_user_messages() { contains_user_text(&requests[1], first_user_message), "first compact request should include history before compaction" ); + let compact_metadata: Value = serde_json::from_str( + &requests[1] + .header("x-codex-turn-metadata") + .expect("local compact request should include turn metadata"), + ) + .expect("local compact turn metadata should be valid json"); + assert_eq!( + compact_metadata["request_kind"].as_str(), + Some("compaction") + ); + assert_eq!( + compact_metadata["window_id"].as_str(), + requests[1].header("x-codex-window-id").as_deref() + ); + assert_eq!( + compact_metadata["compaction"], + json!({ + "trigger": "manual", + "reason": "user_requested", + "implementation": "responses", + "phase": "standalone_turn", + "strategy": "memento", + }) + ); assert!( contains_user_text(&requests[2], second_user_message), "second turn request missing second user message" ); + let next_turn_metadata: Value = serde_json::from_str( + &requests[2] + .header("x-codex-turn-metadata") + .expect("next regular request should include turn metadata"), + ) + .expect("next regular turn metadata should be valid json"); + assert_eq!( + next_turn_metadata["request_kind"].as_str(), + Some("turn"), + "regular requests after compaction should remain turn requests" + ); + assert_eq!( + next_turn_metadata["window_id"].as_str(), + requests[2].header("x-codex-window-id").as_deref() + ); + assert_ne!( + compact_metadata["window_id"], next_turn_metadata["window_id"], + "the next request should use the new compacted context window" + ); + assert!( + next_turn_metadata.get("compaction").is_none(), + "regular requests after compaction should not be marked as compact requests" + ); assert!( contains_user_text(&requests[2], first_user_message), "second turn request should include the compacted user history" diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index bd290cad41..1c5b80f35f 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -368,6 +368,36 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { compact_request.header("thread-id").as_deref(), Some(thread_id.as_str()) ); + let compact_metadata: Value = serde_json::from_str( + &compact_request + .header("x-codex-turn-metadata") + .expect("remote compact request should include turn metadata"), + ) + .expect("remote compact turn metadata should be valid json"); + assert!( + compact_metadata["turn_id"] + .as_str() + .is_some_and(|id| !id.is_empty()), + "remote compact turn metadata should include its turn id" + ); + assert_eq!( + compact_metadata["request_kind"].as_str(), + Some("compaction") + ); + assert_eq!( + compact_metadata["window_id"].as_str(), + compact_request.header("x-codex-window-id").as_deref() + ); + assert_eq!( + compact_metadata["compaction"], + json!({ + "trigger": "manual", + "reason": "user_requested", + "implementation": "responses_compact", + "phase": "standalone_turn", + "strategy": "memento", + }) + ); let compact_body = compact_request.body_json(); assert_eq!( compact_body.get("model").and_then(|v| v.as_str()), @@ -375,6 +405,16 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { ); let response_requests = responses_mock.requests(); let first_response_request = response_requests.first().expect("initial request missing"); + let first_response_metadata: Value = serde_json::from_str( + &first_response_request + .header("x-codex-turn-metadata") + .expect("initial request should include turn metadata"), + ) + .expect("initial turn metadata should be valid json"); + assert_ne!( + first_response_metadata["turn_id"], compact_metadata["turn_id"], + "manual compaction should use its own turn id" + ); assert_eq!( compact_body["tools"], first_response_request.body_json()["tools"], @@ -407,6 +447,33 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { let response_requests = responses_mock.requests(); let follow_up_request = response_requests.last().expect("follow-up request missing"); + let follow_up_metadata: Value = serde_json::from_str( + &follow_up_request + .header("x-codex-turn-metadata") + .expect("follow-up request should include turn metadata"), + ) + .expect("follow-up turn metadata should be valid json"); + assert_eq!( + follow_up_metadata["request_kind"].as_str(), + Some("turn"), + "regular requests after compaction should remain turn requests" + ); + assert!( + follow_up_metadata.get("compaction").is_none(), + "regular requests after compaction should not be marked as compact requests" + ); + assert_ne!( + follow_up_metadata["turn_id"], compact_metadata["turn_id"], + "the following user turn should not reuse a manual compact turn id" + ); + assert_eq!( + follow_up_metadata["window_id"].as_str(), + follow_up_request.header("x-codex-window-id").as_deref() + ); + assert_ne!( + follow_up_metadata["window_id"], compact_metadata["window_id"], + "the following user turn should use the new compacted context window" + ); let follow_up_body = follow_up_request.body_json().to_string(); assert!( follow_up_body.contains("\"type\":\"compaction\""), @@ -797,6 +864,30 @@ async fn remote_compact_v2_reuses_compaction_trigger_for_followups() -> Result<( "expected compact request to advertise the remote_compaction_v2 beta feature" ); assert_eq!(compact_request.path(), "/v1/responses"); + let compact_metadata: Value = serde_json::from_str( + &compact_request + .header("x-codex-turn-metadata") + .expect("v2 compact request should include turn metadata"), + ) + .expect("v2 compact turn metadata should be valid json"); + assert_eq!( + compact_metadata["request_kind"].as_str(), + Some("compaction") + ); + assert_eq!( + compact_metadata["window_id"].as_str(), + compact_request.header("x-codex-window-id").as_deref() + ); + assert_eq!( + compact_metadata["compaction"], + json!({ + "trigger": "manual", + "reason": "user_requested", + "implementation": "responses_compaction_v2", + "phase": "standalone_turn", + "strategy": "memento", + }) + ); let compact_body = compact_request.body_json().to_string(); assert!( compact_body.contains("\"type\":\"compaction_trigger\""), @@ -1139,7 +1230,7 @@ async fn remote_compact_runs_automatically() -> Result<()> { let session_id = harness.test().session_configured.session_id.to_string(); let thread_id = harness.test().session_configured.thread_id.to_string(); - mount_sse_once( + let initial_request = mount_sse_once( harness.server(), sse(vec![ responses::ev_shell_command_call("m1", "echo 'hi'"), @@ -1195,7 +1286,63 @@ async fn remote_compact_runs_automatically() -> Result<()> { compact_mock.single_request().header("thread-id").as_deref(), Some(thread_id.as_str()) ); + let compact_metadata: Value = serde_json::from_str( + &compact_mock + .single_request() + .header("x-codex-turn-metadata") + .expect("auto remote compact request should include turn metadata"), + ) + .expect("auto remote compact turn metadata should be valid json"); + assert_eq!( + compact_metadata["request_kind"].as_str(), + Some("compaction") + ); + assert_eq!( + compact_metadata["compaction"], + json!({ + "trigger": "auto", + "reason": "context_limit", + "implementation": "responses_compact", + "phase": "mid_turn", + "strategy": "memento", + }) + ); + let initial_metadata: Value = serde_json::from_str( + &initial_request + .single_request() + .header("x-codex-turn-metadata") + .expect("initial request should include turn metadata"), + ) + .expect("initial turn metadata should be valid json"); + assert_eq!( + initial_metadata["turn_id"], compact_metadata["turn_id"], + "automatic mid-turn compaction should keep the current turn id" + ); + assert_eq!( + initial_metadata["window_id"], compact_metadata["window_id"], + "automatic mid-turn compaction summarizes the current context window" + ); let follow_up_request = responses_mock.single_request(); + let follow_up_metadata: Value = serde_json::from_str( + &follow_up_request + .header("x-codex-turn-metadata") + .expect("post-compaction continuation should include turn metadata"), + ) + .expect("post-compaction turn metadata should be valid json"); + assert_eq!( + follow_up_metadata["request_kind"].as_str(), + Some("turn"), + "post-compaction continuation should be a regular request" + ); + assert!(follow_up_metadata.get("compaction").is_none()); + assert_eq!( + follow_up_metadata["turn_id"], compact_metadata["turn_id"], + "automatic mid-turn continuation should keep the current turn id" + ); + assert_ne!( + follow_up_metadata["window_id"], compact_metadata["window_id"], + "post-compaction continuation should use the next context window" + ); let follow_up_body = follow_up_request.body_json().to_string(); assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY")); diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 7bf2fe159d..2aeda32a55 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -153,7 +153,6 @@ impl MemoryStartupContext { StageOneRequestContext { model_info, - turn_metadata_header, session_telemetry: self .session_telemetry .clone() @@ -161,6 +160,7 @@ impl MemoryStartupContext { reasoning_effort: Some(reasoning_effort), reasoning_summary, service_tier: config_snapshot.service_tier, + turn_metadata_header, } } diff --git a/codex-rs/memories/write/src/startup_tests.rs b/codex-rs/memories/write/src/startup_tests.rs index 4af8560e67..1a735b3668 100644 --- a/codex-rs/memories/write/src/startup_tests.rs +++ b/codex-rs/memories/write/src/startup_tests.rs @@ -237,11 +237,13 @@ async fn memories_startup_phase2_prunes_old_extension_resources_without_stage1_i } #[tokio::test] -async fn memories_startup_phase1_uses_live_thread_service_tier() -> anyhow::Result<()> { +async fn memories_startup_phase1_uses_live_thread_service_tier_and_detached_metadata() +-> anyhow::Result<()> { let server = start_mock_server().await; let home = Arc::new(TempDir::new()?); let test = build_test_codex(&server, home).await?; assert_eq!(test.config.service_tier, None); + reset_git_repository(&test.config.cwd).await?; core_test_support::submit_thread_settings( &test.codex, @@ -279,6 +281,35 @@ async fn memories_startup_phase1_uses_live_thread_service_tier() -> anyhow::Resu Some(ServiceTier::Fast.request_value().to_string()) ); + let stage_one = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-phase1"), + ev_assistant_message("msg-phase1", "phase1 complete"), + ev_completed("resp-phase1"), + ]), + ) + .await; + context + .stream_stage_one_prompt( + &test.config, + &codex_core::Prompt::default(), + &request_context, + ) + .await?; + let request = wait_for_single_request(&stage_one).await; + let metadata_header = request + .header("x-codex-turn-metadata") + .expect("detached memory request should include workspace metadata"); + let metadata: serde_json::Value = + serde_json::from_str(&metadata_header).expect("turn metadata json"); + assert_eq!(metadata["request_kind"].as_str(), Some("memory")); + assert!(metadata.get("session_id").is_none()); + assert!(metadata.get("thread_id").is_none()); + assert!(metadata.get("turn_id").is_none()); + assert!(metadata.get("window_id").is_none()); + assert!(metadata.get("workspaces").is_some()); + shutdown_test_codex(&test).await?; Ok(()) }