Compare commits

..

25 Commits

Author SHA1 Message Date
Rohit Arunachalam
6c94487709 Merge remote-tracking branch 'origin/main' into codex/prefix-compaction-prewarm
# Conflicts:
#	codex-rs/core/src/compact_tests.rs
#	codex-rs/core/src/session/tests.rs
#	codex-rs/features/src/tests.rs
2026-04-18 23:33:19 -07:00
Rohit Arunachalam
cca2414f20 Simplify prefix compaction prewarm 2026-04-18 21:55:58 -07:00
Rohit Arunachalam
451a8c055d Fix prefix compaction CI after main merge 2026-04-17 15:27:23 -07:00
Rohit Arunachalam
6cf6048964 Merge remote-tracking branch 'origin/main' into codex/prefix-compaction-prewarm 2026-04-17 15:21:54 -07:00
Rohit Arunachalam
837db68562 Fix prefix compaction test path assertion 2026-04-17 14:37:51 -07:00
Rohit Arunachalam
bf532c99f7 Document pre-sampling prefix context capture 2026-04-17 11:49:47 -07:00
Rohit Arunachalam
8e7b9a95d2 Merge remote-tracking branch 'origin/main' into codex/prefix-compaction-prewarm
# Conflicts:
#	codex-rs/core/src/codex.rs
#	codex-rs/core/src/config/mod.rs
2026-04-17 11:38:45 -07:00
Rohit Arunachalam
38994bb50d Assert prefix compaction replacement order 2026-04-17 11:17:45 -07:00
rka-oai
eb9c293edb Merge branch 'main' into codex/prefix-compaction-prewarm 2026-04-16 13:40:12 -07:00
Rohit Arunachalam
c9ea4a902b Box prefix compaction candidate state 2026-04-16 13:26:17 -07:00
Rohit Arunachalam
56e3a8acbf Merge remote-tracking branch 'origin/main' into codex/prefix-compaction-prewarm
# Conflicts:
#	codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts
#	codex-rs/core/src/compact.rs
#	codex-rs/core/src/compact_tests.rs
#	codex-rs/core/src/state/session.rs
#	codex-rs/core/tests/suite/compact_remote.rs
2026-04-16 12:26:38 -07:00
Rohit Arunachalam
80947c1818 Simplify prefix compaction history handling 2026-04-16 11:57:36 -07:00
Rohit Arunachalam
bdd4370c21 Skip prefix compaction when snapshot is oversized 2026-04-16 11:45:22 -07:00
Rohit Arunachalam
9997539fa1 Preserve suffix history for prefix compaction 2026-04-15 22:01:13 -07:00
Rohit Arunachalam
b6b6e3e485 Refine prefix compaction context placement 2026-04-14 05:55:52 -07:00
Rohit Arunachalam
9696c3a8aa Narrow prefix compaction suffix cleanup 2026-04-14 01:43:36 -07:00
Rohit Arunachalam
3f96ed7a7f Fix prefix compaction CI lints 2026-04-13 17:24:46 -07:00
Rohit Arunachalam
a6c594e296 Update prefix compaction mode and suffix cleanup 2026-04-13 17:09:57 -07:00
Rohit Arunachalam
2dc3358a5d Merge remote-tracking branch 'origin/main' into codex/prefix-compaction-prewarm
# Conflicts:
#	codex-rs/app-server/src/bespoke_event_handling.rs
#	codex-rs/tui/src/chatwidget.rs
2026-04-13 15:33:20 -07:00
Rohit Arunachalam
81ecee0ac5 Merge remote-tracking branch 'origin/main' into codex/prefix-compaction-prewarm
# Conflicts:
#	codex-rs/features/src/lib.rs
2026-04-12 13:12:12 -07:00
Rohit Arunachalam
a3d87c7793 Keep compaction test fix scoped 2026-04-10 11:38:20 -07:00
Rohit Arunachalam
13024ed18f Make compaction tests hermetic 2026-04-10 11:12:16 -07:00
rka-oai
c9dac105c0 Merge branch 'main' into codex/prefix-compaction-prewarm 2026-04-10 10:19:48 -07:00
Rohit Arunachalam
1729fa5fb2 Add prefix compaction debug logging 2026-04-09 22:29:30 -07:00
Rohit Arunachalam
ebf13cda36 Add background prefix compaction prewarm 2026-04-09 20:31:41 -07:00
24 changed files with 820 additions and 34 deletions

View File

@@ -6798,6 +6798,31 @@ mod tests {
absolute_path("readable")
}
#[test]
fn context_compaction_thread_item_serializes_without_kind() {
let item: ThreadItem = serde_json::from_value(json!({
"type": "contextCompaction",
"id": "compact-1"
}))
.expect("thread item should deserialize");
assert_eq!(
item,
ThreadItem::ContextCompaction {
id: "compact-1".to_string(),
}
);
let serialized = serde_json::to_value(&item).expect("thread item should serialize");
assert_eq!(
serialized,
json!({
"type": "contextCompaction",
"id": "compact-1"
})
);
}
#[test]
fn collab_agent_state_maps_interrupted_status() {
assert_eq!(

View File

@@ -1349,7 +1349,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_server_notification(ServerNotification::PlanDelta(notification))
.await;
}
EventMsg::ContextCompacted(..) => {
EventMsg::ContextCompacted(_) => {
// Core still fans out this deprecated event for legacy clients;
// v2 clients receive the canonical ContextCompaction item instead.
if matches!(api_version, ApiVersion::V2) {

View File

@@ -48,6 +48,7 @@ const SUPPORTED_EXPERIMENTAL_FEATURE_ENABLEMENT: &[&str] = &[
"tool_search",
"tool_suggest",
"tool_call_mcp_elicitation",
"prefix_compaction",
];
#[async_trait]

View File

@@ -93,10 +93,13 @@ async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()>
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
let ThreadItem::ContextCompaction { id: started_id, .. } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
let ThreadItem::ContextCompaction {
id: completed_id, ..
} = completed.item
else {
unreachable!("completed item should be context compaction");
};
@@ -174,10 +177,13 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
let ThreadItem::ContextCompaction { id: started_id, .. } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
let ThreadItem::ContextCompaction {
id: completed_id, ..
} = completed.item
else {
unreachable!("completed item should be context compaction");
};
@@ -237,10 +243,13 @@ async fn thread_compact_start_triggers_compaction_and_returns_empty_response() -
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
let ThreadItem::ContextCompaction { id: started_id, .. } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
let ThreadItem::ContextCompaction {
id: completed_id, ..
} = completed.item
else {
unreachable!("completed item should be context compaction");
};

View File

@@ -281,9 +281,9 @@ async fn experimental_feature_enablement_set_rejects_non_allowlisted_feature() -
error.message
);
assert!(
error
.message
.contains("apps, plugins, tool_search, tool_suggest, tool_call_mcp_elicitation"),
error.message.contains(
"apps, plugins, tool_search, tool_suggest, tool_call_mcp_elicitation, prefix_compaction"
),
"{}",
error.message
);

View File

@@ -32,6 +32,8 @@ pub struct CompactionInput<'a> {
pub reasoning: Option<Reasoning>,
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<TextControls>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mode: Option<&'a str>,
}
/// Canonical input payload for the memory summarize endpoint.

View File

@@ -437,6 +437,9 @@
"plugins": {
"type": "boolean"
},
"prefix_compaction": {
"type": "boolean"
},
"prevent_idle_sleep": {
"type": "boolean"
},
@@ -2405,6 +2408,9 @@
"plugins": {
"type": "boolean"
},
"prefix_compaction": {
"type": "boolean"
},
"prevent_idle_sleep": {
"type": "boolean"
},

View File

@@ -23,7 +23,7 @@
"slug": "gpt-5.4",
"display_name": "gpt-5.4",
"description": "Latest frontier agentic coding model.",
"default_reasoning_level": "xhigh",
"default_reasoning_level": "medium",
"supported_reasoning_levels": [
{
"effort": "low",

View File

@@ -410,6 +410,7 @@ impl ModelClient {
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
session_telemetry: &SessionTelemetry,
mode: Option<&'static str>,
) -> Result<Vec<ResponseItem>> {
if prompt.input.is_empty() {
return Ok(Vec::new());
@@ -454,6 +455,7 @@ impl ModelClient {
parallel_tool_calls: prompt.parallel_tool_calls,
reasoning,
text,
mode,
};
let mut extra_headers = ApiHeaderMap::new();

View File

@@ -58,8 +58,12 @@ pub(crate) enum InitialContextInjection {
DoNotInject,
}
pub(crate) fn provider_supports_inline_remote_compaction(provider: &ModelProviderInfo) -> bool {
provider.is_openai() || provider.supports_remote_compaction()
}
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
provider.supports_remote_compaction()
provider_supports_inline_remote_compaction(provider)
}
pub(crate) async fn run_inline_auto_compact_task(

View File

@@ -31,6 +31,8 @@ use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::info;
const PREFIX_COMPACTION_MODE: &str = "prefix";
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -73,6 +75,71 @@ pub(crate) async fn run_remote_compact_task(
.await
}
pub(crate) async fn run_remote_prefix_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
base_history: Vec<ResponseItem>,
) -> CodexResult<Vec<ResponseItem>> {
let mut history = ContextManager::new();
history.replace(base_history);
let base_instructions = sess.get_base_instructions().await;
if let Some(context_window) = turn_context.model_context_window()
&& let Some(estimated_tokens) =
history.estimate_token_count_with_base_instructions(&base_instructions)
&& estimated_tokens > context_window
{
info!(
turn_id = %turn_context.sub_id,
estimated_tokens,
context_window,
"prefix compaction snapshot exceeds context window"
);
return Err(CodexErr::ContextWindowExceeded);
}
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
let prompt = build_remote_compact_prompt(
sess.as_ref(),
turn_context.as_ref(),
prompt_input,
base_instructions,
)
.await?;
let new_history = sess
.services
.model_client
.compact_conversation_history(
&prompt,
&turn_context.model_info,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
&turn_context.session_telemetry,
Some(PREFIX_COMPACTION_MODE),
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
let compact_request_log_data =
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
log_remote_compact_failure(
turn_context.as_ref(),
&compact_request_log_data,
total_usage_breakdown,
&err,
);
Err(err)
})
.await?;
Ok(process_compacted_history(
sess.as_ref(),
turn_context.as_ref(),
new_history,
InitialContextInjection::DoNotInject,
)
.await)
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
@@ -140,23 +207,13 @@ async fn run_remote_compact_task_inner_impl(
.collect();
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
let tool_router = built_tools(
let prompt = build_remote_compact_prompt(
sess.as_ref(),
turn_context.as_ref(),
&prompt_input,
&HashSet::new(),
/*skills_outcome*/ None,
&CancellationToken::new(),
prompt_input,
base_instructions,
)
.await?;
let prompt = Prompt {
input: prompt_input,
tools: tool_router.model_visible_specs(),
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
base_instructions,
personality: turn_context.personality,
output_schema: None,
};
let mut new_history = sess
.services
@@ -167,6 +224,7 @@ async fn run_remote_compact_task_inner_impl(
turn_context.reasoning_effort,
turn_context.reasoning_summary,
&turn_context.session_telemetry,
/*mode*/ None,
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
@@ -209,6 +267,31 @@ async fn run_remote_compact_task_inner_impl(
Ok(())
}
async fn build_remote_compact_prompt(
sess: &Session,
turn_context: &TurnContext,
prompt_input: Vec<ResponseItem>,
base_instructions: BaseInstructions,
) -> CodexResult<Prompt> {
let tool_router = built_tools(
sess,
turn_context,
&prompt_input,
&HashSet::new(),
/*skills_outcome*/ None,
&CancellationToken::new(),
)
.await?;
Ok(Prompt {
input: prompt_input,
tools: tool_router.model_visible_specs(),
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
base_instructions,
personality: turn_context.personality,
output_schema: None,
})
}
pub(crate) async fn process_compacted_history(
sess: &Session,
turn_context: &TurnContext,

View File

@@ -1,6 +1,7 @@
use super::*;
use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::WireApi;
use codex_model_provider_info::create_oss_provider_with_base_url;
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
use pretty_assertions::assert_eq;
@@ -23,6 +24,21 @@ async fn process_compacted_history_with_test_session(
(refreshed, initial_context)
}
#[test]
fn remote_compact_task_supports_openai_provider() {
let provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None);
assert!(provider_supports_inline_remote_compaction(&provider));
}
#[test]
fn remote_compact_task_ignores_generic_openai_compatible_provider() {
let provider =
create_oss_provider_with_base_url("http://localhost:8082/v1", WireApi::Responses);
assert!(!provider_supports_inline_remote_compaction(&provider));
}
#[test]
fn content_items_to_text_joins_non_empty_segments() {
let items = vec![
@@ -190,7 +206,7 @@ fn build_token_limited_compacted_history_appends_summary_message() {
}
#[test]
fn should_use_remote_compact_task_for_azure_provider() {
fn provider_supports_inline_remote_compaction_for_azure_provider() {
let provider = ModelProviderInfo {
name: "Azure".into(),
base_url: Some("https://example.com/openai".into()),
@@ -210,7 +226,7 @@ fn should_use_remote_compact_task_for_azure_provider() {
supports_websockets: false,
};
assert!(should_use_remote_compact_task(&provider));
assert!(provider_supports_inline_remote_compaction(&provider));
}
#[tokio::test]

View File

@@ -6609,6 +6609,27 @@ hide_spawn_agent_metadata = false
Ok(())
}
#[tokio::test]
async fn prefix_compaction_bool_feature_enables() -> std::io::Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join(CONFIG_TOML_FILE),
r#"[features]
prefix_compaction = true
"#,
)?;
let config = ConfigBuilder::without_managed_config_for_tests()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.build()
.await?;
assert!(config.features.enabled(Feature::PrefixCompaction));
Ok(())
}
#[tokio::test]
async fn feature_requirements_normalize_runtime_feature_mutations() -> std::io::Result<()> {
let codex_home = TempDir::new()?;

View File

@@ -167,6 +167,7 @@ use codex_protocol::exec_output::StreamOutput;
mod handlers;
mod mcp;
pub(crate) mod prefix_compaction;
mod review;
mod rollout_reconstruction;
#[allow(clippy::module_inception)]
@@ -176,6 +177,8 @@ pub(crate) mod turn_context;
#[cfg(test)]
use self::handlers::submission_dispatch_span;
use self::handlers::submission_loop;
#[cfg(test)]
use self::prefix_compaction::prefix_compact_token_limit;
use self::review::spawn_review_thread;
use self::session::AppServerClientMetadata;
use self::session::Session;

View File

@@ -0,0 +1,248 @@
use std::sync::Arc;
use crate::compact::provider_supports_inline_remote_compaction;
use crate::compact_remote::run_remote_prefix_compact_task;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use codex_features::Feature;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::TurnContextItem;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::trace;
pub(crate) type PrefixCompactTask = JoinHandle<CodexResult<PrefixCompactCandidate>>;
#[derive(Debug, Clone)]
pub(crate) struct PrefixCompactCandidate {
base_history: Vec<ResponseItem>,
replacement_prefix: Vec<ResponseItem>,
captured_context: Vec<ResponseItem>,
captured_reference_context_item: Option<TurnContextItem>,
}
pub(crate) async fn maybe_start_prefix_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
total_usage_tokens: i64,
auto_compact_limit: i64,
) {
if !turn_context.features.enabled(Feature::PrefixCompaction) {
return;
}
if !provider_supports_inline_remote_compaction(turn_context.provider.info()) {
return;
}
let Some(prefix_compact_limit) = prefix_compact_token_limit(auto_compact_limit) else {
return;
};
if total_usage_tokens < prefix_compact_limit || total_usage_tokens >= auto_compact_limit {
return;
}
let captured_context = sess.build_initial_context(turn_context.as_ref()).await;
let captured_reference_context_item = Some(turn_context.to_turn_context_item());
let mut state = sess.state.lock().await;
if state.prefix_compact_task.is_some() {
trace!(
turn_id = %turn_context.sub_id,
total_usage_tokens,
prefix_compact_limit,
auto_compact_limit,
"prefix compaction already running or ready"
);
return;
}
let base_history = state.history.raw_items().to_vec();
if base_history.is_empty() {
return;
}
let task = spawn_prefix_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
base_history.clone(),
captured_context,
captured_reference_context_item,
);
state.prefix_compact_task = Some(task);
debug!(
turn_id = %turn_context.sub_id,
model_slug = %turn_context.model_info.slug,
total_usage_tokens,
prefix_compact_limit,
auto_compact_limit,
base_history_len = base_history.len(),
"starting background prefix compaction"
);
}
pub(crate) async fn try_apply_ready_prefix_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<bool> {
let Some(task) = take_finished_prefix_compact_task(sess).await else {
return Ok(false);
};
let candidate = match task.await {
Ok(Ok(candidate)) => candidate,
Ok(Err(err)) => {
debug!(
turn_id = %turn_context.sub_id,
"prefix compaction failed: {err:#}"
);
return Ok(false);
}
Err(err) => {
debug!(
turn_id = %turn_context.sub_id,
"prefix compaction task failed: {err:#}"
);
return Ok(false);
}
};
if apply_prefix_compact_candidate(sess, turn_context, candidate).await? {
return Ok(true);
}
debug!(
turn_id = %turn_context.sub_id,
"prefix compaction candidate is stale; running foreground auto-compaction"
);
Ok(false)
}
pub(crate) async fn abandon_prefix_compact(sess: &Arc<Session>) {
let mut state = sess.state.lock().await;
abort_prefix_compact_task(&mut state.prefix_compact_task);
}
pub(crate) fn abort_prefix_compact_task(task: &mut Option<PrefixCompactTask>) {
if let Some(task) = task.take() {
task.abort();
}
}
pub(super) fn prefix_compact_token_limit(auto_compact_limit: i64) -> Option<i64> {
if auto_compact_limit == i64::MAX || auto_compact_limit <= 1 {
return None;
}
let token_limit = auto_compact_limit.saturating_mul(60) / 100;
let token_limit = token_limit.clamp(1, auto_compact_limit.saturating_sub(1));
Some(token_limit)
}
async fn take_finished_prefix_compact_task(sess: &Arc<Session>) -> Option<PrefixCompactTask> {
let mut state = sess.state.lock().await;
if state
.prefix_compact_task
.as_ref()
.is_some_and(|task| task.is_finished())
{
state.prefix_compact_task.take()
} else {
None
}
}
async fn apply_prefix_compact_candidate(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
candidate: PrefixCompactCandidate,
) -> CodexResult<bool> {
let current_history = sess.clone_history().await;
let current_items = current_history.raw_items();
if current_items.len() < candidate.base_history.len()
|| current_items[..candidate.base_history.len()] != candidate.base_history
{
debug!(
turn_id = %turn_context.sub_id,
current_history_len = current_items.len(),
base_history_len = candidate.base_history.len(),
"prefix compaction candidate no longer matches current history"
);
return Ok(false);
}
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
let retained_suffix: Vec<ResponseItem> = current_items[candidate.base_history.len()..]
.iter()
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
let reference_context_item = sess
.reference_context_item()
.await
.or(candidate.captured_reference_context_item);
let mut new_history = candidate.replacement_prefix;
new_history.extend(candidate.captured_context);
new_history.extend(retained_suffix);
new_history.extend(
current_items
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned(),
);
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history.clone()),
};
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.recompute_token_usage(turn_context).await;
sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
debug!(
turn_id = %turn_context.sub_id,
"applied prefix compaction candidate"
);
Ok(true)
}
fn spawn_prefix_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
base_history: Vec<ResponseItem>,
captured_context: Vec<ResponseItem>,
captured_reference_context_item: Option<TurnContextItem>,
) -> PrefixCompactTask {
tokio::spawn(async move {
let replacement_prefix = run_remote_prefix_compact_task(
Arc::clone(&sess),
Arc::clone(&turn_context),
base_history.clone(),
)
.await?;
debug!(
turn_id = %turn_context.sub_id,
model_slug = %turn_context.model_info.slug,
base_history_len = base_history.len(),
replacement_prefix_len = replacement_prefix.len(),
captured_context_len = captured_context.len(),
"background prefix compaction ready"
);
Ok(PrefixCompactCandidate {
base_history,
replacement_prefix,
captured_context,
captured_reference_context_item,
})
})
}

View File

@@ -6312,6 +6312,19 @@ async fn unified_exec_rejects_escalated_permissions_when_policy_not_on_request()
pretty_assertions::assert_eq!(output, expected);
}
#[test]
fn prefix_compact_token_limit_defaults_to_sixty_percent_of_auto_compact() {
assert_eq!(
prefix_compact_token_limit(/*auto_compact_limit*/ 1_000),
Some(600)
);
}
#[test]
fn prefix_compact_token_limit_is_none_for_tiny_auto_compact_limit() {
assert_eq!(prefix_compact_token_limit(/*auto_compact_limit*/ 1), None);
}
#[tokio::test]
async fn session_start_hooks_only_load_from_trusted_project_layers() -> std::io::Result<()> {
let temp = tempfile::tempdir()?;

View File

@@ -38,6 +38,9 @@ use crate::parse_turn_item;
use crate::plugins::build_plugin_injections;
use crate::resolve_skill_dependencies_for_turn;
use crate::session::PreviousTurnSettings;
use crate::session::prefix_compaction::abandon_prefix_compact;
use crate::session::prefix_compaction::maybe_start_prefix_compact;
use crate::session::prefix_compaction::try_apply_ready_prefix_compact;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::stream_events_utils::HandleOutputCtx;
@@ -155,6 +158,18 @@ pub(crate) async fn run_turn(
if pre_sampling_compacted && let Some(mut client_session) = prewarmed_client_session.take() {
client_session.reset_websocket_session();
}
// This pre-sampling kickoff captures full context before this turn records its
// context diff. If the candidate is later applied, that captured context can
// sit directly before a retained suffix that starts with the same turn's diff.
// This is redundant but intentional: we keep the suffix untouched and rely on
// the normal prefix/current-history match check before applying.
maybe_start_prefix_compact(
&sess,
&turn_context,
sess.get_total_token_usage().await,
auto_compact_limit,
)
.await;
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
@@ -508,6 +523,14 @@ pub(crate) async fn run_turn(
continue;
}
maybe_start_prefix_compact(
&sess,
&turn_context,
total_usage_tokens,
auto_compact_limit,
)
.await;
if !needs_follow_up {
last_agent_message = sampling_request_last_agent_message;
let stop_hook_permission_mode = match turn_context.approval_policy.value() {
@@ -794,6 +817,11 @@ async fn run_auto_compact(
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
if try_apply_ready_prefix_compact(sess, turn_context).await? {
return Ok(());
}
abandon_prefix_compact(sess).await;
if should_use_remote_compact_task(turn_context.provider.info()) {
run_inline_remote_auto_compact_task(
Arc::clone(sess),

View File

@@ -9,6 +9,8 @@ use std::collections::HashSet;
use crate::agent_identity::RegisteredAgentTask;
use crate::context_manager::ContextManager;
use crate::session::PreviousTurnSettings;
use crate::session::prefix_compaction::PrefixCompactTask;
use crate::session::prefix_compaction::abort_prefix_compact_task;
use crate::session::session::SessionConfiguration;
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
use codex_protocol::protocol::RateLimitSnapshot;
@@ -35,6 +37,7 @@ pub(crate) struct SessionState {
pub(crate) active_connector_selection: HashSet<String>,
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
granted_permissions: Option<PermissionProfile>,
pub(crate) prefix_compact_task: Option<PrefixCompactTask>,
next_turn_is_first: bool,
}
@@ -55,6 +58,7 @@ impl SessionState {
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
granted_permissions: None,
prefix_compact_task: None,
next_turn_is_first: true,
}
}
@@ -100,6 +104,7 @@ impl SessionState {
self.history.replace(items);
self.history
.set_reference_context_item(reference_context_item);
abort_prefix_compact_task(&mut self.prefix_compact_task);
}
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {

View File

@@ -2,9 +2,11 @@
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::Result;
use codex_core::compact::SUMMARY_PREFIX;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
@@ -37,9 +39,13 @@ use core_test_support::wait_for_event_match;
use core_test_support::wait_for_event_with_timeout;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::time::Duration;
use wiremock::ResponseTemplate;
fn enable_prefix_compaction(config: &mut codex_core::config::Config) {
let _ = config.features.enable(Feature::PrefixCompaction);
let _ = config.features.disable(Feature::ToolSuggest);
}
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
}
@@ -55,6 +61,15 @@ fn estimate_compact_payload_tokens(request: &responses::ResponsesRequest) -> i64
.saturating_add(approx_token_count(&request.instructions_text()))
}
fn first_input_index_containing(request: &responses::ResponsesRequest, needle: &str) -> usize {
request
.input()
.iter()
.position(|item| item.to_string().contains(needle))
.unwrap_or_else(|| panic!("expected request input to contain {needle:?}"))
}
const PRETURN_CONTEXT_DIFF_CWD_MARKER: &str = "PRETURN_CONTEXT_DIFF_CWD";
const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD";
const DUMMY_FUNCTION_NAME: &str = "test_tool";
const REMOTE_COMPACT_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -86,6 +101,19 @@ fn compacted_summary_only_output(summary: &str) -> Vec<ResponseItem> {
}]
}
async fn wait_for_response_requests(mock_response: &responses::ResponseMock, count: usize) {
for _ in 0..100 {
if mock_response.requests().len() >= count {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!(
"expected at least {count} request(s), got {}",
mock_response.requests().len()
);
}
fn remote_realtime_test_codex_builder(
realtime_server: &responses::WebSocketTestServer,
) -> TestCodexBuilder {
@@ -1064,6 +1092,217 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn prefix_compact_uses_sixty_percent_of_auto_compact_limit() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(20_000);
config.model_auto_compact_token_limit = Some(800);
enable_prefix_compaction(config);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-1", /*total_tokens*/ 450),
]),
sse(vec![
responses::ev_assistant_message("m2", "SECOND_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-2", /*total_tokens*/ 650),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
json!({ "output": compacted_summary_only_output("PREFIX_THRESHOLD_SUMMARY") }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "FIRST_REMOTE_USER".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 0);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "SECOND_REMOTE_USER".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
wait_for_response_requests(&compact_mock, /*count*/ 1).await;
assert_eq!(compact_mock.requests().len(), 1);
assert_eq!(
compact_mock.single_request().body_json()["mode"],
json!("prefix")
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ready_prefix_compact_is_applied_by_pre_turn_auto_compact() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(20_000);
config.model_auto_compact_token_limit = Some(800);
enable_prefix_compaction(config);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-1", /*total_tokens*/ 650),
]),
sse(vec![
responses::ev_assistant_message("m2", "SECOND_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-2", /*total_tokens*/ 850),
]),
sse(vec![
responses::ev_assistant_message("m3", "THIRD_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-3", /*total_tokens*/ 200),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
json!({ "output": compacted_summary_only_output("PREFIX_READY_SUMMARY") }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "FIRST_REMOTE_USER".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
wait_for_response_requests(&compact_mock, /*count*/ 1).await;
assert_eq!(
compact_mock.single_request().body_json()["mode"],
json!("prefix")
);
codex
.submit(Op::OverrideTurnContext {
cwd: Some(PathBuf::from(PRETURN_CONTEXT_DIFF_CWD)),
approval_policy: None,
approvals_reviewer: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "SECOND_REMOTE_USER".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "THIRD_REMOTE_USER".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
1,
"ready prefix compact should replace the foreground compact request"
);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3);
let third_request = &requests[2];
let third_request_body = third_request.body_json().to_string();
assert!(third_request_body.contains("PREFIX_READY_SUMMARY"));
assert!(!third_request_body.contains("FIRST_REMOTE_USER"));
assert!(third_request_body.contains("SECOND_REMOTE_USER"));
assert!(third_request_body.contains("THIRD_REMOTE_USER"));
let summary_index = first_input_index_containing(third_request, "PREFIX_READY_SUMMARY");
let captured_context_index =
first_input_index_containing(third_request, PRETURN_CONTEXT_DIFF_CWD_MARKER);
let suffix_user_index = first_input_index_containing(third_request, "SECOND_REMOTE_USER");
let current_user_index = first_input_index_containing(third_request, "THIRD_REMOTE_USER");
assert!(
summary_index < captured_context_index,
"captured context should follow the compacted prefix summary"
);
assert!(
captured_context_index < suffix_user_index,
"captured context should precede retained suffix messages"
);
assert!(
suffix_user_index < current_user_index,
"retained suffix should preserve chronological user message order"
);
let stale_context_count = third_request
.message_input_texts("user")
.iter()
.filter(|text| text.contains(PRETURN_CONTEXT_DIFF_CWD_MARKER))
.count();
assert_eq!(
stale_context_count, 1,
"prefix compaction should preserve context updates in the retained suffix"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_failure_emits_task_error_event() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -197,6 +197,9 @@ pub enum Feature {
ResponsesWebsockets,
/// Legacy rollout flag for Responses API WebSocket transport v2 experiments.
ResponsesWebsocketsV2,
/// Precompute prefix compaction in the background before the foreground
/// auto-compaction threshold is reached.
PrefixCompaction,
/// Use the agent identity registration flow for ChatGPT-authenticated sessions.
UseAgentIdentity,
/// Enable workspace dependency support.
@@ -980,6 +983,16 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Removed,
default_enabled: false,
},
FeatureSpec {
id: Feature::PrefixCompaction,
key: "prefix_compaction",
stage: Stage::Experimental {
name: "Prefix compaction",
menu_description: "Precompute history compaction in the background before the normal context compaction threshold is reached.",
announcement: "",
},
default_enabled: false,
},
FeatureSpec {
id: Feature::UseAgentIdentity,
key: "use_agent_identity",

View File

@@ -103,6 +103,23 @@ fn guardian_approval_is_experimental_and_user_toggleable() {
assert_eq!(Feature::GuardianApproval.default_enabled(), false);
}
#[test]
fn prefix_compaction_is_experimental_and_user_toggleable() {
let spec = Feature::PrefixCompaction.info();
let stage = spec.stage;
assert!(matches!(stage, Stage::Experimental { .. }));
assert_eq!(stage.experimental_menu_name(), Some("Prefix compaction"));
assert_eq!(
stage.experimental_menu_description(),
Some(
"Precompute history compaction in the background before the normal context compaction threshold is reached."
)
);
assert_eq!(stage.experimental_announcement(), None);
assert_eq!(Feature::PrefixCompaction.default_enabled(), false);
}
#[test]
fn external_migration_is_experimental_and_disabled_by_default() {
let spec = Feature::ExternalMigration.info();
@@ -417,6 +434,21 @@ usage_hint_enabled = false
);
}
#[test]
fn prefix_compaction_feature_config_deserializes_bool() {
let features: FeaturesToml = toml::from_str(
r#"
prefix_compaction = true
"#,
)
.expect("features table should deserialize");
assert_eq!(
features.entries(),
BTreeMap::from([("prefix_compaction".to_string(), true)])
);
}
#[test]
fn unstable_warning_event_only_mentions_enabled_under_development_features() {
let mut configured_features = Table::new();

View File

@@ -2076,7 +2076,7 @@ pub struct ModelRerouteEvent {
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ContextCompactedEvent;
pub struct ContextCompactedEvent {}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct TurnCompleteEvent {

View File

@@ -2206,9 +2206,14 @@ impl ChatWidget {
self.request_redraw();
}
#[cfg(test)]
fn on_agent_message(&mut self, message: String) {
self.finalize_completed_assistant_message(Some(&message));
fn on_context_compacted(&mut self) {
self.flush_answer_stream_with_separator();
self.handle_stream_finished();
self.add_to_history(history_cell::new_info_event(
"Context compacted".to_owned(),
/*hint*/ None,
));
self.request_redraw();
}
fn on_agent_message_delta(&mut self, delta: String) {
@@ -5985,7 +5990,7 @@ impl ChatWidget {
self.exit_review_mode_after_item();
}
ThreadItem::ContextCompaction { .. } => {
self.add_info_message("Context compacted".to_string(), /*hint*/ None);
self.on_context_compacted();
}
ThreadItem::HookPrompt { .. } => {}
ThreadItem::CollabAgentToolCall {
@@ -6654,7 +6659,7 @@ impl ChatWidget {
// TODO(ccunningham): stop relying on legacy AgentMessage in review mode,
// including thread-snapshot replay, and forward
// ItemCompleted(TurnItem::AgentMessage(_)) instead.
self.on_agent_message(message)
self.finalize_completed_assistant_message(Some(&message))
}
EventMsg::AgentMessage(AgentMessageEvent { message, .. }) => {
if !message.is_empty() {

View File

@@ -799,6 +799,37 @@ async fn replayed_in_progress_turn_marks_task_running() {
assert_eq!(status.header(), "Working");
}
#[tokio::test]
async fn replayed_context_compaction_renders_history_marker() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.replay_thread_turns(
vec![AppServerTurn {
id: "turn-1".to_string(),
items: vec![AppServerThreadItem::ContextCompaction {
id: "compact-1".to_string(),
}],
status: AppServerTurnStatus::Completed,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
}],
ReplayKind::ResumeInitialMessages,
);
let cells = drain_insert_history(&mut rx);
let text = cells
.iter()
.map(|cell| lines_to_single_string(cell))
.collect::<Vec<_>>()
.join("\n");
assert!(
text.contains("Context compacted"),
"expected replayed compaction marker, got {text:?}"
);
}
#[tokio::test]
async fn replayed_stream_error_does_not_set_retry_status_or_status_indicator() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;