mirror of
https://github.com/openai/codex.git
synced 2026-05-29 23:40:29 +00:00
## Summary
- Add `request_kind` values for foreground turn, startup prewarm,
compaction, and detached memory model requests.
- Attach compaction dispatch metadata to local Responses, legacy
`/v1/responses/compact`, and remote v2 compact requests.
- Add the existing logical context-window identifier as `window_id` on
turn-owned model request metadata.
- Keep identity fields optional for detached memory requests, while
still emitting `request_kind="memory"` in non-git/no-sandbox workspaces.
## Root Cause
`x-codex-turn-metadata` has more than one producer. Foreground turns and
compaction requests own a real turn and should carry that turn identity.
Detached memory stage-one requests do not own a foreground turn, so
absent identity fields are valid rather than missing data. Startup
websocket prewarm is also a model request, but it has `generate=false`
and must not be counted as a foreground turn.
`thread_source` or session source identifies where a thread came from
(for example review, guardian, or another subagent). `request_kind`
identifies what the current outbound model request is doing (`turn`,
`prewarm`, `compaction`, or `memory`). A review or guardian thread can
issue either a normal turn request or a compaction request, so source
cannot replace request kind.
## Behavior / Impact
- Ordinary foreground requests send `request_kind="turn"`, their real
identity fields, and `window_id="<thread_id>:<window_generation>"`.
- Startup websocket warmup requests send `request_kind="prewarm"` so
they are not counted as foreground turns.
- Compaction requests send `request_kind="compaction"`, their real
owning turn identity, the existing `window_id`, and
`compaction.{trigger,reason,implementation,phase,strategy}`.
- Detached memory stage-one requests send `request_kind="memory"`
without `session_id`, `thread_id`, `turn_id`, or `window_id`; when no
workspace metadata exists, the kind-only header is still emitted.
- `session_id`, `thread_id`, `turn_id`, and `window_id` remain optional
in the header schema because detached memory requests do not own a
foreground turn or context window.
- `window_id` is not a new ID system: it is copied from the already-sent
`x-codex-window-id` / WS client metadata value at model-request dispatch
time.
- Existing `x-codex-window-id` HTTP/WS emission, value format,
generation advancement, resume behavior, and fork reset behavior are
unchanged.
- `request_kind`, `window_id`, and upstream turn-owned identity fields
remain schema-owned; input `responsesapi_client_metadata` cannot replace
their canonical values.
- No table, DAG, export, app-server API, or MCP `_meta` schema changes
are included.
A compaction attempt stopped by a pre-compact hook issues no model
request and therefore has no request header; its outcome remains in
analytics events. Status, error, duration, and token deltas also remain
analytics fields rather than request-header fields.
Future detached-memory attribution using a real initiating turn ID as
`trigger_turn_id` is intentionally not part of this PR.
## Sync With Main
- Final pushed head `716342e79` is rebased onto `origin/main@0d37db4b2`.
- The metadata conflict came from upstream `#24160`, which added
`forked_from_thread_id` on the same `turn_metadata` surface. Resolution
preserves that field and its protection from client metadata override
alongside this PR's request-kind, compaction, and window-id fields.
- While resolving the overlapping commits, I removed an accidental
recursive model-request overlay and a duplicate detached-memory header
builder before completing the rebase.
## Latency / User Experience Boundary
- Foreground turns perform no new filesystem, git, or network work. New
fields are inserted into metadata already serialized for outgoing
requests.
- Compaction issues the same model/HTTP requests with the same prompt,
model, service tier, and sampling settings; only metadata bytes change.
- Startup prewarm already sent metadata; it is now correctly classified
as `prewarm`.
- Non-git detached memory now sends a small kind-only metadata header
rather than no header.
- This client diff adds no user-visible latency mechanism beyond
negligible serialization and header bytes on already-existing requests.
## Validation
On conflict-resolved head `1d35c2cfb` based on `origin/main@487521733`:
- `just fmt` (passed)
- `just fix -p codex-core` (passed)
- `git diff --check origin/main...HEAD` (passed)
- `just test -p codex-core -E 'test(turn_metadata) |
test(websocket_first_turn_uses_startup_prewarm_and_create) |
test(responses_stream_includes_turn_metadata_header_for_git_workspace_e2e)
|
test(responses_websocket_forwards_turn_metadata_on_initial_and_incremental_create)
| test(remote_compact_v2_retries_failures_with_stream_retry_budget) |
test(window_id_advances_after_compact_persists_on_resume_and_resets_on_fork)'`
(`23 passed`; `bench-smoke` passed)
- `just test -p codex-app-server -E
'test(turn_start_forwards_client_metadata_to_responses_request_v2) |
test(turn_start_forwards_client_metadata_to_responses_websocket_request_body_v2)
| test(auto_compaction_remote_emits_started_and_completed_items)'` (`3
passed`; `bench-smoke` passed)
- `just test -p codex-memories-write` (`29 passed`; `bench-smoke`
passed)
404 lines
15 KiB
Rust
404 lines
15 KiB
Rust
use std::sync::Arc;
|
|
|
|
use crate::Prompt;
|
|
use crate::client::CompactConversationRequestSettings;
|
|
use crate::compact::CompactionAnalyticsAttempt;
|
|
use crate::compact::InitialContextInjection;
|
|
use crate::compact::compaction_status_from_result;
|
|
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
|
|
use crate::context_manager::ContextManager;
|
|
use crate::context_manager::TotalTokenUsageBreakdown;
|
|
use crate::context_manager::estimate_response_item_model_visible_bytes;
|
|
use crate::context_manager::is_codex_generated_item;
|
|
use crate::hook_runtime::PostCompactHookOutcome;
|
|
use crate::hook_runtime::PreCompactHookOutcome;
|
|
use crate::hook_runtime::run_post_compact_hooks;
|
|
use crate::hook_runtime::run_pre_compact_hooks;
|
|
use crate::session::session::Session;
|
|
use crate::session::turn::built_tools;
|
|
use crate::session::turn_context::TurnContext;
|
|
use crate::turn_metadata::CompactionTurnMetadata;
|
|
use codex_analytics::CompactionImplementation;
|
|
use codex_analytics::CompactionPhase;
|
|
use codex_analytics::CompactionReason;
|
|
use codex_analytics::CompactionTrigger;
|
|
use codex_app_server_protocol::AuthMode;
|
|
use codex_protocol::error::CodexErr;
|
|
use codex_protocol::error::Result as CodexResult;
|
|
use codex_protocol::items::ContextCompactionItem;
|
|
use codex_protocol::items::TurnItem;
|
|
use codex_protocol::models::BaseInstructions;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::CompactedItem;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use codex_rollout_trace::CompactionCheckpointTracePayload;
|
|
use futures::TryFutureExt;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::error;
|
|
use tracing::info;
|
|
|
|
pub(crate) async fn run_inline_remote_auto_compact_task(
|
|
sess: Arc<Session>,
|
|
turn_context: Arc<TurnContext>,
|
|
initial_context_injection: InitialContextInjection,
|
|
reason: CompactionReason,
|
|
phase: CompactionPhase,
|
|
) -> CodexResult<()> {
|
|
run_remote_compact_task_inner(
|
|
&sess,
|
|
&turn_context,
|
|
initial_context_injection,
|
|
CompactionTrigger::Auto,
|
|
reason,
|
|
phase,
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn run_remote_compact_task(
|
|
sess: Arc<Session>,
|
|
turn_context: Arc<TurnContext>,
|
|
) -> CodexResult<()> {
|
|
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: turn_context.sub_id.clone(),
|
|
trace_id: turn_context.trace_id.clone(),
|
|
started_at: turn_context.turn_timing_state.started_at_unix_secs().await,
|
|
model_context_window: turn_context.model_context_window(),
|
|
collaboration_mode_kind: turn_context.collaboration_mode.mode,
|
|
});
|
|
sess.send_event(&turn_context, start_event).await;
|
|
|
|
run_remote_compact_task_inner(
|
|
&sess,
|
|
&turn_context,
|
|
InitialContextInjection::DoNotInject,
|
|
CompactionTrigger::Manual,
|
|
CompactionReason::UserRequested,
|
|
CompactionPhase::StandaloneTurn,
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn run_remote_compact_task_inner(
|
|
sess: &Arc<Session>,
|
|
turn_context: &Arc<TurnContext>,
|
|
initial_context_injection: InitialContextInjection,
|
|
trigger: CompactionTrigger,
|
|
reason: CompactionReason,
|
|
phase: CompactionPhase,
|
|
) -> CodexResult<()> {
|
|
let compaction_metadata = CompactionTurnMetadata::new(
|
|
trigger,
|
|
reason,
|
|
CompactionImplementation::ResponsesCompact,
|
|
phase,
|
|
);
|
|
let attempt = CompactionAnalyticsAttempt::begin(
|
|
sess.as_ref(),
|
|
turn_context.as_ref(),
|
|
trigger,
|
|
reason,
|
|
CompactionImplementation::ResponsesCompact,
|
|
phase,
|
|
)
|
|
.await;
|
|
let pre_compact_outcome = run_pre_compact_hooks(sess, turn_context, trigger).await;
|
|
match pre_compact_outcome {
|
|
PreCompactHookOutcome::Continue => {}
|
|
PreCompactHookOutcome::Stopped { reason } => {
|
|
let error = reason.unwrap_or_else(|| "PreCompact hook stopped execution".to_string());
|
|
attempt
|
|
.track(
|
|
sess.as_ref(),
|
|
codex_analytics::CompactionStatus::Interrupted,
|
|
Some(error),
|
|
)
|
|
.await;
|
|
return Err(CodexErr::TurnAborted);
|
|
}
|
|
}
|
|
let result = run_remote_compact_task_inner_impl(
|
|
sess,
|
|
turn_context,
|
|
initial_context_injection,
|
|
compaction_metadata,
|
|
)
|
|
.await;
|
|
let status = compaction_status_from_result(&result);
|
|
let error = result.as_ref().err().map(ToString::to_string);
|
|
if result.is_ok() {
|
|
let post_compact_outcome = run_post_compact_hooks(sess, turn_context, trigger).await;
|
|
if let PostCompactHookOutcome::Stopped = post_compact_outcome {
|
|
attempt.track(sess.as_ref(), status, error).await;
|
|
return Err(CodexErr::TurnAborted);
|
|
}
|
|
}
|
|
attempt.track(sess.as_ref(), status, error.clone()).await;
|
|
if let Err(err) = result {
|
|
let event = EventMsg::Error(
|
|
err.to_error_event(Some("Error running remote compact task".to_string())),
|
|
);
|
|
sess.send_event(turn_context, event).await;
|
|
return Err(err);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn run_remote_compact_task_inner_impl(
|
|
sess: &Arc<Session>,
|
|
turn_context: &Arc<TurnContext>,
|
|
initial_context_injection: InitialContextInjection,
|
|
compaction_metadata: CompactionTurnMetadata,
|
|
) -> CodexResult<()> {
|
|
let context_compaction_item = ContextCompactionItem::new();
|
|
// Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events,
|
|
// endpoint attempts, and the installed history checkpoint all have one join key.
|
|
let compaction_trace = sess.services.rollout_thread_trace.compaction_trace_context(
|
|
turn_context.sub_id.as_str(),
|
|
context_compaction_item.id.as_str(),
|
|
turn_context.model_info.slug.as_str(),
|
|
turn_context.provider.info().name.as_str(),
|
|
);
|
|
let compaction_item = TurnItem::ContextCompaction(context_compaction_item);
|
|
sess.emit_turn_item_started(turn_context, &compaction_item)
|
|
.await;
|
|
let mut history = sess.clone_history().await;
|
|
let base_instructions = sess.get_base_instructions().await;
|
|
let deleted_items = trim_function_call_history_to_fit_context_window(
|
|
&mut history,
|
|
turn_context.as_ref(),
|
|
&base_instructions,
|
|
);
|
|
if deleted_items > 0 {
|
|
info!(
|
|
turn_id = %turn_context.sub_id,
|
|
deleted_items,
|
|
"trimmed history items before remote compaction"
|
|
);
|
|
}
|
|
// This is the history selected for remote compaction, after any trimming required to fit the
|
|
// compact endpoint. The checkpoint below records it separately from the next sampling request,
|
|
// whose prompt will repeat current developer/context prefix items.
|
|
let trace_input_history = history.raw_items().to_vec();
|
|
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
|
|
let tool_router = built_tools(
|
|
sess.as_ref(),
|
|
turn_context.as_ref(),
|
|
&CancellationToken::new(),
|
|
)
|
|
.await?;
|
|
let prompt = Prompt {
|
|
input: prompt_input,
|
|
tools: tool_router.model_visible_specs(),
|
|
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
|
|
base_instructions,
|
|
personality: turn_context.personality,
|
|
output_schema: None,
|
|
output_schema_strict: true,
|
|
};
|
|
let window_id = sess.services.model_client.current_window_id();
|
|
let turn_metadata_header = turn_context
|
|
.turn_metadata_state
|
|
.current_header_value_for_compaction(&window_id, compaction_metadata);
|
|
let mut new_history = sess
|
|
.services
|
|
.model_client
|
|
.compact_conversation_history(
|
|
&prompt,
|
|
&turn_context.model_info,
|
|
CompactConversationRequestSettings {
|
|
effort: turn_context.reasoning_effort,
|
|
summary: turn_context.reasoning_summary,
|
|
service_tier: if sess.services.auth_manager.auth_mode() == Some(AuthMode::ApiKey) {
|
|
None
|
|
} else {
|
|
turn_context.config.service_tier.clone()
|
|
},
|
|
},
|
|
&turn_context.session_telemetry,
|
|
&compaction_trace,
|
|
turn_metadata_header.as_deref(),
|
|
)
|
|
.or_else(|err| async {
|
|
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
|
|
let compact_request_log_data =
|
|
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
|
|
log_remote_compact_failure(
|
|
turn_context,
|
|
&compact_request_log_data,
|
|
total_usage_breakdown,
|
|
&err,
|
|
);
|
|
Err(err)
|
|
})
|
|
.await?;
|
|
new_history = process_compacted_history(
|
|
sess.as_ref(),
|
|
turn_context.as_ref(),
|
|
new_history,
|
|
initial_context_injection,
|
|
)
|
|
.await;
|
|
|
|
let reference_context_item = match initial_context_injection {
|
|
InitialContextInjection::DoNotInject => None,
|
|
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
|
|
};
|
|
let compacted_item = CompactedItem {
|
|
message: String::new(),
|
|
replacement_history: Some(new_history.clone()),
|
|
};
|
|
// Install is the semantic boundary where the compact endpoint's output becomes live
|
|
// thread history. Keep it distinct from the later inference request so the reducer can
|
|
// still represent repeated developer/context prefix items exactly as the model saw them.
|
|
compaction_trace.record_installed(&CompactionCheckpointTracePayload {
|
|
input_history: &trace_input_history,
|
|
replacement_history: &new_history,
|
|
});
|
|
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
|
|
.await;
|
|
sess.recompute_token_usage(turn_context).await;
|
|
|
|
sess.emit_turn_item_completed(turn_context, compaction_item)
|
|
.await;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn process_compacted_history(
|
|
sess: &Session,
|
|
turn_context: &TurnContext,
|
|
mut compacted_history: Vec<ResponseItem>,
|
|
initial_context_injection: InitialContextInjection,
|
|
) -> Vec<ResponseItem> {
|
|
// Mid-turn compaction is the only path that must inject initial context above the last user
|
|
// message in the replacement history. Pre-turn compaction instead injects context after the
|
|
// compaction item, but mid-turn compaction keeps the compaction item last for model training.
|
|
let initial_context = if matches!(
|
|
initial_context_injection,
|
|
InitialContextInjection::BeforeLastUserMessage
|
|
) {
|
|
sess.build_initial_context(turn_context).await
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
compacted_history.retain(should_keep_compacted_history_item);
|
|
insert_initial_context_before_last_real_user_or_summary(compacted_history, initial_context)
|
|
}
|
|
|
|
/// Returns whether an item from remote compaction output should be preserved.
|
|
///
|
|
/// Called while processing the model-provided compacted transcript, before we
|
|
/// append fresh canonical context from the current session.
|
|
///
|
|
/// We drop:
|
|
/// - `developer` messages because remote output can include stale/duplicated
|
|
/// instruction content.
|
|
/// - non-user-content `user` messages (session prefix/instruction wrappers),
|
|
/// while preserving real user messages and persisted hook prompts.
|
|
///
|
|
/// This intentionally keeps:
|
|
/// - `assistant` messages (future remote compaction models may emit them)
|
|
/// - `user`-role warnings that parse as `TurnItem::UserMessage` and compaction-generated summary
|
|
/// messages. Legacy warning fragments are filtered by `parse_turn_item` before they reach this
|
|
/// check.
|
|
pub(crate) fn should_keep_compacted_history_item(item: &ResponseItem) -> bool {
|
|
match item {
|
|
ResponseItem::Message { role, .. } if role == "developer" => false,
|
|
ResponseItem::Message { role, .. } if role == "user" => {
|
|
matches!(
|
|
crate::event_mapping::parse_turn_item(item),
|
|
Some(TurnItem::UserMessage(_) | TurnItem::HookPrompt(_))
|
|
)
|
|
}
|
|
ResponseItem::Message { role, .. } if role == "assistant" => true,
|
|
ResponseItem::Message { .. } => false,
|
|
ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true,
|
|
ResponseItem::CompactionTrigger => false,
|
|
ResponseItem::Reasoning { .. }
|
|
| ResponseItem::LocalShellCall { .. }
|
|
| ResponseItem::FunctionCall { .. }
|
|
| ResponseItem::ToolSearchCall { .. }
|
|
| ResponseItem::FunctionCallOutput { .. }
|
|
| ResponseItem::ToolSearchOutput { .. }
|
|
| ResponseItem::CustomToolCall { .. }
|
|
| ResponseItem::CustomToolCallOutput { .. }
|
|
| ResponseItem::WebSearchCall { .. }
|
|
| ResponseItem::ImageGenerationCall { .. }
|
|
| ResponseItem::Other => false,
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) struct CompactRequestLogData {
|
|
failing_compaction_request_model_visible_bytes: i64,
|
|
}
|
|
|
|
pub(crate) fn build_compact_request_log_data(
|
|
input: &[ResponseItem],
|
|
instructions: &str,
|
|
) -> CompactRequestLogData {
|
|
let failing_compaction_request_model_visible_bytes = input
|
|
.iter()
|
|
.map(estimate_response_item_model_visible_bytes)
|
|
.fold(
|
|
i64::try_from(instructions.len()).unwrap_or(i64::MAX),
|
|
i64::saturating_add,
|
|
);
|
|
|
|
CompactRequestLogData {
|
|
failing_compaction_request_model_visible_bytes,
|
|
}
|
|
}
|
|
|
|
pub(crate) fn log_remote_compact_failure(
|
|
turn_context: &TurnContext,
|
|
log_data: &CompactRequestLogData,
|
|
total_usage_breakdown: TotalTokenUsageBreakdown,
|
|
err: &CodexErr,
|
|
) {
|
|
error!(
|
|
turn_id = %turn_context.sub_id,
|
|
last_api_response_total_tokens = total_usage_breakdown.last_api_response_total_tokens,
|
|
all_history_items_model_visible_bytes = total_usage_breakdown.all_history_items_model_visible_bytes,
|
|
estimated_tokens_of_items_added_since_last_successful_api_response = total_usage_breakdown.estimated_tokens_of_items_added_since_last_successful_api_response,
|
|
estimated_bytes_of_items_added_since_last_successful_api_response = total_usage_breakdown.estimated_bytes_of_items_added_since_last_successful_api_response,
|
|
model_context_window_tokens = ?turn_context.model_context_window(),
|
|
failing_compaction_request_model_visible_bytes = log_data.failing_compaction_request_model_visible_bytes,
|
|
compact_error = %err,
|
|
"remote compaction failed"
|
|
);
|
|
}
|
|
|
|
pub(crate) fn trim_function_call_history_to_fit_context_window(
|
|
history: &mut ContextManager,
|
|
turn_context: &TurnContext,
|
|
base_instructions: &BaseInstructions,
|
|
) -> usize {
|
|
let mut deleted_items = 0usize;
|
|
let Some(context_window) = turn_context.model_context_window() else {
|
|
return deleted_items;
|
|
};
|
|
|
|
while history
|
|
.estimate_token_count_with_base_instructions(base_instructions)
|
|
.is_some_and(|estimated_tokens| estimated_tokens > context_window)
|
|
{
|
|
let Some(last_item) = history.raw_items().last() else {
|
|
break;
|
|
};
|
|
if !is_codex_generated_item(last_item) {
|
|
break;
|
|
}
|
|
if !history.remove_last_item() {
|
|
break;
|
|
}
|
|
deleted_items += 1;
|
|
}
|
|
|
|
deleted_items
|
|
}
|