Files
codex/codex-rs/core/src/compact.rs
Andrei Eternal 527d52df03 Add compact lifecycle hooks (started by vincentkoc - external contrib) (#19905)
Based on work from Vincent K -
https://github.com/openai/codex/pull/19060

<img width="1836" height="642" alt="CleanShot 2026-04-29 at 20 47 40@2x"
src="https://github.com/user-attachments/assets/b647bb89-65fe-40c8-80b0-7a6b7c984634"
/>

## Why

Compaction rewrites the conversation context that future model turns
receive, but hooks currently have no deterministic lifecycle point
around that rewrite. This adds compact lifecycle hooks so users can
audit manual and automatic compaction, surface hook messages in the UI,
and run post-compaction follow-up without overloading tool or prompt
hooks.

## What Changed

- Added `PreCompact` and `PostCompact` hook events across hook config,
discovery, dispatch, generated schemas, app-server notifications,
analytics, and TUI hook rendering.
- Added trigger matching for compact hooks with the documented `manual`
and `auto` matcher values.
- Wired `PreCompact` before both local and remote compaction, and
`PostCompact` after successful local or remote compaction.
- Kept compact hook command input to lifecycle metadata: session id,
Codex turn id, transcript path, cwd, hook event name, model, and
trigger.
- Made compact stdout handling consistent with other hooks: plain stdout
is ignored as debug output, while malformed JSON-looking stdout is
reported as failed hook output.
- Added integration coverage for compact hook dispatch, trigger
matching, post-compact execution, and the audited behavior that
`decision:"block"` does not block compaction.

## Out of Scope

- Hook-specific compaction blocking is not implemented;
`decision:"block"` and exit-code-2 blocking semantics are intentionally
unsupported for `PreCompact`.
- Custom compaction instructions are not exposed to compact hooks in
this PR.
- Compact summaries, summary character counts, and summary previews are
not exposed to compact hooks in this PR.

## Verification

- `cargo test -p codex-hooks`
- `cargo test -p codex-core
manual_pre_compact_block_decision_does_not_block_compaction`
- `cargo test -p codex-app-server hooks_list`
- `cargo test -p codex-core config_schema_matches_fixture`
- `cargo test -p codex-tui hooks_browser`

## Docs

The developer documentation for Codex hooks should be updated alongside
this feature to document `PreCompact` and `PostCompact`, the
`manual`/`auto` matcher values, and the compact hook payload fields.

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-05-06 18:08:31 -07:00

586 lines
20 KiB
Rust

use std::sync::Arc;
use std::time::Instant;
use crate::Prompt;
use crate::client::ModelClientSession;
use crate::client_common::ResponseEvent;
use crate::hook_runtime::PostCompactHookOutcome;
use crate::hook_runtime::PreCompactHookOutcome;
use crate::hook_runtime::run_post_compact_hooks;
use crate::hook_runtime::run_pre_compact_hooks;
#[cfg(test)]
use crate::session::PreviousTurnSettings;
use crate::session::session::Session;
use crate::session::turn::get_last_assistant_message_from_turn;
use crate::session::turn_context::TurnContext;
use crate::util::backoff;
use codex_analytics::CodexCompactionEvent;
use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
use codex_analytics::CompactionStatus;
use codex_analytics::CompactionStrategy;
use codex_analytics::CompactionTrigger;
use codex_analytics::now_unix_seconds;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_rollout_trace::InferenceTraceContext;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::truncate_text;
use futures::prelude::*;
use tracing::error;
use codex_model_provider_info::ModelProviderInfo;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md");
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
/// Controls whether compaction replacement history must include initial context.
///
/// Pre-turn/manual compaction variants use `DoNotInject`: they replace history with a summary and
/// clear `reference_context_item`, so the next regular turn will fully reinject initial context
/// after compaction.
///
/// Mid-turn compaction must use `BeforeLastUserMessage` because the model is trained to see the
/// compaction summary as the last item in history after mid-turn compaction; we therefore inject
/// initial context into the replacement history just above the last real user message.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum InitialContextInjection {
BeforeLastUserMessage,
DoNotInject,
}
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
provider.supports_remote_compaction()
}
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
// Compaction prompt is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}];
run_compact_task_inner(
sess,
turn_context,
input,
initial_context_injection,
CompactionTrigger::Auto,
reason,
phase,
)
.await?;
Ok(())
}
pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> CodexResult<()> {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_context.sub_id.clone(),
started_at: turn_context.turn_timing_state.started_at_unix_secs().await,
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(
sess.clone(),
turn_context,
input,
InitialContextInjection::DoNotInject,
CompactionTrigger::Manual,
CompactionReason::UserRequested,
CompactionPhase::StandaloneTurn,
)
.await?;
Ok(())
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
let attempt = CompactionAnalyticsAttempt::begin(
sess.as_ref(),
turn_context.as_ref(),
trigger,
reason,
CompactionImplementation::Responses,
phase,
)
.await;
let pre_compact_outcome = run_pre_compact_hooks(&sess, &turn_context, trigger).await;
match pre_compact_outcome {
PreCompactHookOutcome::Continue => {}
PreCompactHookOutcome::Stopped { reason } => {
let error = reason.unwrap_or_else(|| "PreCompact hook stopped execution".to_string());
attempt
.track(sess.as_ref(), CompactionStatus::Interrupted, Some(error))
.await;
return Err(CodexErr::TurnAborted);
}
}
let result = run_compact_task_inner_impl(
Arc::clone(&sess),
Arc::clone(&turn_context),
input,
initial_context_injection,
)
.await;
let status = compaction_status_from_result(&result);
let error = result.as_ref().err().map(ToString::to_string);
if result.is_ok() {
let post_compact_outcome = run_post_compact_hooks(&sess, &turn_context, trigger).await;
if let PostCompactHookOutcome::Stopped = post_compact_outcome {
attempt.track(sess.as_ref(), status, error).await;
return Err(CodexErr::TurnAborted);
}
}
attempt.track(sess.as_ref(), status, error).await;
result.map(|_| ())
}
async fn run_compact_task_inner_impl(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
) -> CodexResult<String> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
);
let max_retries = turn_context.provider.info().stream_max_retries();
let mut retries = 0;
let mut client_session = sess.services.model_client.new_session();
// Reuse one client session so turn-scoped state (sticky routing, websocket incremental
// request tracking)
// survives retries within this compact turn.
loop {
// Clone is required because of the loop
let turn_input = history
.clone()
.for_prompt(&turn_context.model_info.input_modalities);
let turn_input_len = turn_input.len();
let prompt = Prompt {
input: turn_input,
base_instructions: sess.get_base_instructions().await,
personality: turn_context.personality,
..Default::default()
};
let turn_metadata_header = turn_context.turn_metadata_state.current_header_value();
let attempt_result = drain_to_completed(
&sess,
turn_context.as_ref(),
&mut client_session,
turn_metadata_header.as_deref(),
&prompt,
)
.await;
match attempt_result {
Ok(()) => {
break;
}
Err(CodexErr::Interrupted) => {
return Err(CodexErr::Interrupted);
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
retries = 0;
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
Err(e) => {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_stream_error(
turn_context.as_ref(),
format!("Reconnecting... {retries}/{max_retries}"),
e,
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
}
}
}
let history_snapshot = sess.clone_history().await;
let history_items = history_snapshot.raw_items();
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(history_items);
let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text);
if matches!(
initial_context_injection,
InitialContextInjection::BeforeLastUserMessage
) {
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
new_history =
insert_initial_context_before_last_real_user_or_summary(new_history, initial_context);
}
let reference_context_item = match initial_context_injection {
InitialContextInjection::DoNotInject => None,
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
};
let compacted_item = CompactedItem {
message: summary_text.clone(),
replacement_history: Some(new_history.clone()),
};
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
client_session.reset_websocket_session();
sess.recompute_token_usage(&turn_context).await;
sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
let warning = EventMsg::Warning(WarningEvent {
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
Ok(summary_suffix)
}
pub(crate) struct CompactionAnalyticsAttempt {
thread_id: String,
turn_id: String,
trigger: CompactionTrigger,
reason: CompactionReason,
implementation: CompactionImplementation,
phase: CompactionPhase,
active_context_tokens_before: i64,
started_at: u64,
start_instant: Instant,
}
impl CompactionAnalyticsAttempt {
pub(crate) async fn begin(
sess: &Session,
turn_context: &TurnContext,
trigger: CompactionTrigger,
reason: CompactionReason,
implementation: CompactionImplementation,
phase: CompactionPhase,
) -> Self {
let active_context_tokens_before = sess.get_total_token_usage().await;
Self {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
trigger,
reason,
implementation,
phase,
active_context_tokens_before,
started_at: now_unix_seconds(),
start_instant: Instant::now(),
}
}
pub(crate) async fn track(
self,
sess: &Session,
status: CompactionStatus,
error: Option<String>,
) {
let active_context_tokens_after = sess.get_total_token_usage().await;
sess.services
.analytics_events_client
.track_compaction(CodexCompactionEvent {
thread_id: self.thread_id,
turn_id: self.turn_id,
trigger: self.trigger,
reason: self.reason,
implementation: self.implementation,
phase: self.phase,
strategy: CompactionStrategy::Memento,
status,
error,
active_context_tokens_before: self.active_context_tokens_before,
active_context_tokens_after,
started_at: self.started_at,
completed_at: now_unix_seconds(),
duration_ms: Some(
u64::try_from(self.start_instant.elapsed().as_millis()).unwrap_or(u64::MAX),
),
});
}
}
pub(crate) fn compaction_status_from_result<T>(result: &CodexResult<T>) -> CompactionStatus {
match result {
Ok(_) => CompactionStatus::Completed,
Err(CodexErr::Interrupted | CodexErr::TurnAborted) => CompactionStatus::Interrupted,
Err(_) => CompactionStatus::Failed,
}
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
let mut pieces = Vec::new();
for item in content {
match item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
if !text.is_empty() {
pieces.push(text.as_str());
}
}
ContentItem::InputImage { .. } => {}
}
}
if pieces.is_empty() {
None
} else {
Some(pieces.join("\n"))
}
}
pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec<String> {
items
.iter()
.filter_map(|item| match crate::event_mapping::parse_turn_item(item) {
Some(TurnItem::UserMessage(user)) => {
if is_summary_message(&user.message()) {
None
} else {
Some(user.message())
}
}
_ => None,
})
.collect()
}
pub(crate) fn is_summary_message(message: &str) -> bool {
message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str())
}
/// Inserts canonical initial context into compacted replacement history at the
/// model-expected boundary.
///
/// Placement rules:
/// - Prefer immediately before the last real user message.
/// - If no real user messages remain, insert before the compaction summary so
/// the summary stays last.
/// - If there are no user messages, insert before the last compaction item so
/// that item remains last (remote compaction may return only compaction items).
/// - If there are no user messages or compaction items, append the context.
pub(crate) fn insert_initial_context_before_last_real_user_or_summary(
mut compacted_history: Vec<ResponseItem>,
initial_context: Vec<ResponseItem>,
) -> Vec<ResponseItem> {
let mut last_user_or_summary_index = None;
let mut last_real_user_index = None;
for (i, item) in compacted_history.iter().enumerate().rev() {
let Some(TurnItem::UserMessage(user)) = crate::event_mapping::parse_turn_item(item) else {
continue;
};
// Compaction summaries are encoded as user messages, so track both:
// the last real user message (preferred insertion point) and the last
// user-message-like item (fallback summary insertion point).
last_user_or_summary_index.get_or_insert(i);
if !is_summary_message(&user.message()) {
last_real_user_index = Some(i);
break;
}
}
let last_compaction_index = compacted_history
.iter()
.enumerate()
.rev()
.find_map(|(i, item)| {
matches!(
item,
ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. }
)
.then_some(i)
});
let insertion_index = last_real_user_index
.or(last_user_or_summary_index)
.or(last_compaction_index);
// Re-inject canonical context from the current session since we stripped it
// from the pre-compaction history. Prefer placing it before the last real
// user message; if there is no real user message left, place it before the
// summary or compaction item so the compaction item remains last.
if let Some(insertion_index) = insertion_index {
compacted_history.splice(insertion_index..insertion_index, initial_context);
} else {
compacted_history.extend(initial_context);
}
compacted_history
}
pub(crate) fn build_compacted_history(
initial_context: Vec<ResponseItem>,
user_messages: &[String],
summary_text: &str,
) -> Vec<ResponseItem> {
build_compacted_history_with_limit(
initial_context,
user_messages,
summary_text,
COMPACT_USER_MESSAGE_MAX_TOKENS,
)
}
fn build_compacted_history_with_limit(
mut history: Vec<ResponseItem>,
user_messages: &[String],
summary_text: &str,
max_tokens: usize,
) -> Vec<ResponseItem> {
let mut selected_messages: Vec<String> = Vec::new();
if max_tokens > 0 {
let mut remaining = max_tokens;
for message in user_messages.iter().rev() {
if remaining == 0 {
break;
}
let tokens = approx_token_count(message);
if tokens <= remaining {
selected_messages.push(message.clone());
remaining = remaining.saturating_sub(tokens);
} else {
let truncated = truncate_text(message, TruncationPolicy::Tokens(remaining));
selected_messages.push(truncated);
break;
}
}
selected_messages.reverse();
}
for message in &selected_messages {
history.push(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: message.clone(),
}],
phase: None,
});
}
let summary_text = if summary_text.is_empty() {
"(no summary available)".to_string()
} else {
summary_text.to_string()
};
history.push(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: summary_text }],
phase: None,
});
history
}
async fn drain_to_completed(
sess: &Session,
turn_context: &TurnContext,
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
prompt: &Prompt,
) -> CodexResult<()> {
let mut stream = client_session
.stream(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
turn_context.config.service_tier.clone(),
turn_metadata_header,
// Rollout tracing currently models remote compaction only; local compaction streams
// are left untraced until the reducer has a first-class local compaction lifecycle.
&InferenceTraceContext::disabled(),
)
.await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
match event {
Ok(ResponseEvent::OutputItemDone(item)) => {
sess.record_into_history(std::slice::from_ref(&item), turn_context)
.await;
}
Ok(ResponseEvent::ServerReasoningIncluded(included)) => {
sess.set_server_reasoning_included(included).await;
}
Ok(ResponseEvent::RateLimits(snapshot)) => {
sess.update_rate_limits(turn_context, snapshot).await;
}
Ok(ResponseEvent::Completed { token_usage, .. }) => {
sess.update_token_usage_info(turn_context, token_usage.as_ref())
.await;
return Ok(());
}
Ok(_) => continue,
Err(e) => return Err(e),
}
}
}
#[cfg(test)]
#[path = "compact_tests.rs"]
mod tests;