mirror of
https://github.com/openai/codex.git
synced 2026-04-19 12:14:48 +00:00
Compare commits
25 Commits
bot/update
...
codex/pref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c94487709 | ||
|
|
cca2414f20 | ||
|
|
451a8c055d | ||
|
|
6cf6048964 | ||
|
|
837db68562 | ||
|
|
bf532c99f7 | ||
|
|
8e7b9a95d2 | ||
|
|
38994bb50d | ||
|
|
eb9c293edb | ||
|
|
c9ea4a902b | ||
|
|
56e3a8acbf | ||
|
|
80947c1818 | ||
|
|
bdd4370c21 | ||
|
|
9997539fa1 | ||
|
|
b6b6e3e485 | ||
|
|
9696c3a8aa | ||
|
|
3f96ed7a7f | ||
|
|
a6c594e296 | ||
|
|
2dc3358a5d | ||
|
|
81ecee0ac5 | ||
|
|
a3d87c7793 | ||
|
|
13024ed18f | ||
|
|
c9dac105c0 | ||
|
|
1729fa5fb2 | ||
|
|
ebf13cda36 |
@@ -6798,6 +6798,31 @@ mod tests {
|
||||
absolute_path("readable")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn context_compaction_thread_item_serializes_without_kind() {
|
||||
let item: ThreadItem = serde_json::from_value(json!({
|
||||
"type": "contextCompaction",
|
||||
"id": "compact-1"
|
||||
}))
|
||||
.expect("thread item should deserialize");
|
||||
|
||||
assert_eq!(
|
||||
item,
|
||||
ThreadItem::ContextCompaction {
|
||||
id: "compact-1".to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let serialized = serde_json::to_value(&item).expect("thread item should serialize");
|
||||
assert_eq!(
|
||||
serialized,
|
||||
json!({
|
||||
"type": "contextCompaction",
|
||||
"id": "compact-1"
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collab_agent_state_maps_interrupted_status() {
|
||||
assert_eq!(
|
||||
|
||||
@@ -1349,7 +1349,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.send_server_notification(ServerNotification::PlanDelta(notification))
|
||||
.await;
|
||||
}
|
||||
EventMsg::ContextCompacted(..) => {
|
||||
EventMsg::ContextCompacted(_) => {
|
||||
// Core still fans out this deprecated event for legacy clients;
|
||||
// v2 clients receive the canonical ContextCompaction item instead.
|
||||
if matches!(api_version, ApiVersion::V2) {
|
||||
|
||||
@@ -48,6 +48,7 @@ const SUPPORTED_EXPERIMENTAL_FEATURE_ENABLEMENT: &[&str] = &[
|
||||
"tool_search",
|
||||
"tool_suggest",
|
||||
"tool_call_mcp_elicitation",
|
||||
"prefix_compaction",
|
||||
];
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -93,10 +93,13 @@ async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()>
|
||||
let started = wait_for_context_compaction_started(&mut mcp).await?;
|
||||
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
|
||||
|
||||
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
|
||||
let ThreadItem::ContextCompaction { id: started_id, .. } = started.item else {
|
||||
unreachable!("started item should be context compaction");
|
||||
};
|
||||
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
|
||||
let ThreadItem::ContextCompaction {
|
||||
id: completed_id, ..
|
||||
} = completed.item
|
||||
else {
|
||||
unreachable!("completed item should be context compaction");
|
||||
};
|
||||
|
||||
@@ -174,10 +177,13 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
|
||||
let started = wait_for_context_compaction_started(&mut mcp).await?;
|
||||
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
|
||||
|
||||
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
|
||||
let ThreadItem::ContextCompaction { id: started_id, .. } = started.item else {
|
||||
unreachable!("started item should be context compaction");
|
||||
};
|
||||
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
|
||||
let ThreadItem::ContextCompaction {
|
||||
id: completed_id, ..
|
||||
} = completed.item
|
||||
else {
|
||||
unreachable!("completed item should be context compaction");
|
||||
};
|
||||
|
||||
@@ -237,10 +243,13 @@ async fn thread_compact_start_triggers_compaction_and_returns_empty_response() -
|
||||
let started = wait_for_context_compaction_started(&mut mcp).await?;
|
||||
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
|
||||
|
||||
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
|
||||
let ThreadItem::ContextCompaction { id: started_id, .. } = started.item else {
|
||||
unreachable!("started item should be context compaction");
|
||||
};
|
||||
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
|
||||
let ThreadItem::ContextCompaction {
|
||||
id: completed_id, ..
|
||||
} = completed.item
|
||||
else {
|
||||
unreachable!("completed item should be context compaction");
|
||||
};
|
||||
|
||||
|
||||
@@ -281,9 +281,9 @@ async fn experimental_feature_enablement_set_rejects_non_allowlisted_feature() -
|
||||
error.message
|
||||
);
|
||||
assert!(
|
||||
error
|
||||
.message
|
||||
.contains("apps, plugins, tool_search, tool_suggest, tool_call_mcp_elicitation"),
|
||||
error.message.contains(
|
||||
"apps, plugins, tool_search, tool_suggest, tool_call_mcp_elicitation, prefix_compaction"
|
||||
),
|
||||
"{}",
|
||||
error.message
|
||||
);
|
||||
|
||||
@@ -32,6 +32,8 @@ pub struct CompactionInput<'a> {
|
||||
pub reasoning: Option<Reasoning>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub text: Option<TextControls>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mode: Option<&'a str>,
|
||||
}
|
||||
|
||||
/// Canonical input payload for the memory summarize endpoint.
|
||||
|
||||
@@ -437,6 +437,9 @@
|
||||
"plugins": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"prefix_compaction": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"prevent_idle_sleep": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -2405,6 +2408,9 @@
|
||||
"plugins": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"prefix_compaction": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"prevent_idle_sleep": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
"slug": "gpt-5.4",
|
||||
"display_name": "gpt-5.4",
|
||||
"description": "Latest frontier agentic coding model.",
|
||||
"default_reasoning_level": "xhigh",
|
||||
"default_reasoning_level": "medium",
|
||||
"supported_reasoning_levels": [
|
||||
{
|
||||
"effort": "low",
|
||||
|
||||
@@ -410,6 +410,7 @@ impl ModelClient {
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
mode: Option<&'static str>,
|
||||
) -> Result<Vec<ResponseItem>> {
|
||||
if prompt.input.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
@@ -454,6 +455,7 @@ impl ModelClient {
|
||||
parallel_tool_calls: prompt.parallel_tool_calls,
|
||||
reasoning,
|
||||
text,
|
||||
mode,
|
||||
};
|
||||
|
||||
let mut extra_headers = ApiHeaderMap::new();
|
||||
|
||||
@@ -58,8 +58,12 @@ pub(crate) enum InitialContextInjection {
|
||||
DoNotInject,
|
||||
}
|
||||
|
||||
pub(crate) fn provider_supports_inline_remote_compaction(provider: &ModelProviderInfo) -> bool {
|
||||
provider.is_openai() || provider.supports_remote_compaction()
|
||||
}
|
||||
|
||||
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
|
||||
provider.supports_remote_compaction()
|
||||
provider_supports_inline_remote_compaction(provider)
|
||||
}
|
||||
|
||||
pub(crate) async fn run_inline_auto_compact_task(
|
||||
|
||||
@@ -31,6 +31,8 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
|
||||
const PREFIX_COMPACTION_MODE: &str = "prefix";
|
||||
|
||||
pub(crate) async fn run_inline_remote_auto_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
@@ -73,6 +75,71 @@ pub(crate) async fn run_remote_compact_task(
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn run_remote_prefix_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
base_history: Vec<ResponseItem>,
|
||||
) -> CodexResult<Vec<ResponseItem>> {
|
||||
let mut history = ContextManager::new();
|
||||
history.replace(base_history);
|
||||
let base_instructions = sess.get_base_instructions().await;
|
||||
if let Some(context_window) = turn_context.model_context_window()
|
||||
&& let Some(estimated_tokens) =
|
||||
history.estimate_token_count_with_base_instructions(&base_instructions)
|
||||
&& estimated_tokens > context_window
|
||||
{
|
||||
info!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
estimated_tokens,
|
||||
context_window,
|
||||
"prefix compaction snapshot exceeds context window"
|
||||
);
|
||||
return Err(CodexErr::ContextWindowExceeded);
|
||||
}
|
||||
|
||||
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
|
||||
let prompt = build_remote_compact_prompt(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
prompt_input,
|
||||
base_instructions,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_history = sess
|
||||
.services
|
||||
.model_client
|
||||
.compact_conversation_history(
|
||||
&prompt,
|
||||
&turn_context.model_info,
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
&turn_context.session_telemetry,
|
||||
Some(PREFIX_COMPACTION_MODE),
|
||||
)
|
||||
.or_else(|err| async {
|
||||
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
|
||||
let compact_request_log_data =
|
||||
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
|
||||
log_remote_compact_failure(
|
||||
turn_context.as_ref(),
|
||||
&compact_request_log_data,
|
||||
total_usage_breakdown,
|
||||
&err,
|
||||
);
|
||||
Err(err)
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(process_compacted_history(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
new_history,
|
||||
InitialContextInjection::DoNotInject,
|
||||
)
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn run_remote_compact_task_inner(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -140,23 +207,13 @@ async fn run_remote_compact_task_inner_impl(
|
||||
.collect();
|
||||
|
||||
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
|
||||
let tool_router = built_tools(
|
||||
let prompt = build_remote_compact_prompt(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
&prompt_input,
|
||||
&HashSet::new(),
|
||||
/*skills_outcome*/ None,
|
||||
&CancellationToken::new(),
|
||||
prompt_input,
|
||||
base_instructions,
|
||||
)
|
||||
.await?;
|
||||
let prompt = Prompt {
|
||||
input: prompt_input,
|
||||
tools: tool_router.model_visible_specs(),
|
||||
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: None,
|
||||
};
|
||||
|
||||
let mut new_history = sess
|
||||
.services
|
||||
@@ -167,6 +224,7 @@ async fn run_remote_compact_task_inner_impl(
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
&turn_context.session_telemetry,
|
||||
/*mode*/ None,
|
||||
)
|
||||
.or_else(|err| async {
|
||||
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
|
||||
@@ -209,6 +267,31 @@ async fn run_remote_compact_task_inner_impl(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_remote_compact_prompt(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
prompt_input: Vec<ResponseItem>,
|
||||
base_instructions: BaseInstructions,
|
||||
) -> CodexResult<Prompt> {
|
||||
let tool_router = built_tools(
|
||||
sess,
|
||||
turn_context,
|
||||
&prompt_input,
|
||||
&HashSet::new(),
|
||||
/*skills_outcome*/ None,
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await?;
|
||||
Ok(Prompt {
|
||||
input: prompt_input,
|
||||
tools: tool_router.model_visible_specs(),
|
||||
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn process_compacted_history(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::*;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_model_provider_info::WireApi;
|
||||
use codex_model_provider_info::create_oss_provider_with_base_url;
|
||||
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
@@ -23,6 +24,21 @@ async fn process_compacted_history_with_test_session(
|
||||
(refreshed, initial_context)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_compact_task_supports_openai_provider() {
|
||||
let provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None);
|
||||
|
||||
assert!(provider_supports_inline_remote_compaction(&provider));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_compact_task_ignores_generic_openai_compatible_provider() {
|
||||
let provider =
|
||||
create_oss_provider_with_base_url("http://localhost:8082/v1", WireApi::Responses);
|
||||
|
||||
assert!(!provider_supports_inline_remote_compaction(&provider));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_items_to_text_joins_non_empty_segments() {
|
||||
let items = vec![
|
||||
@@ -190,7 +206,7 @@ fn build_token_limited_compacted_history_appends_summary_message() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_use_remote_compact_task_for_azure_provider() {
|
||||
fn provider_supports_inline_remote_compaction_for_azure_provider() {
|
||||
let provider = ModelProviderInfo {
|
||||
name: "Azure".into(),
|
||||
base_url: Some("https://example.com/openai".into()),
|
||||
@@ -210,7 +226,7 @@ fn should_use_remote_compact_task_for_azure_provider() {
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
assert!(should_use_remote_compact_task(&provider));
|
||||
assert!(provider_supports_inline_remote_compaction(&provider));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -6609,6 +6609,27 @@ hide_spawn_agent_metadata = false
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn prefix_compaction_bool_feature_enables() -> std::io::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(
|
||||
codex_home.path().join(CONFIG_TOML_FILE),
|
||||
r#"[features]
|
||||
prefix_compaction = true
|
||||
"#,
|
||||
)?;
|
||||
|
||||
let config = ConfigBuilder::without_managed_config_for_tests()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.fallback_cwd(Some(codex_home.path().to_path_buf()))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert!(config.features.enabled(Feature::PrefixCompaction));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn feature_requirements_normalize_runtime_feature_mutations() -> std::io::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
|
||||
@@ -167,6 +167,7 @@ use codex_protocol::exec_output::StreamOutput;
|
||||
|
||||
mod handlers;
|
||||
mod mcp;
|
||||
pub(crate) mod prefix_compaction;
|
||||
mod review;
|
||||
mod rollout_reconstruction;
|
||||
#[allow(clippy::module_inception)]
|
||||
@@ -176,6 +177,8 @@ pub(crate) mod turn_context;
|
||||
#[cfg(test)]
|
||||
use self::handlers::submission_dispatch_span;
|
||||
use self::handlers::submission_loop;
|
||||
#[cfg(test)]
|
||||
use self::prefix_compaction::prefix_compact_token_limit;
|
||||
use self::review::spawn_review_thread;
|
||||
use self::session::AppServerClientMetadata;
|
||||
use self::session::Session;
|
||||
|
||||
248
codex-rs/core/src/session/prefix_compaction.rs
Normal file
248
codex-rs/core/src/session/prefix_compaction.rs
Normal file
@@ -0,0 +1,248 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::compact::provider_supports_inline_remote_compaction;
|
||||
use crate::compact_remote::run_remote_prefix_compact_task;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::debug;
|
||||
use tracing::trace;
|
||||
|
||||
pub(crate) type PrefixCompactTask = JoinHandle<CodexResult<PrefixCompactCandidate>>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PrefixCompactCandidate {
|
||||
base_history: Vec<ResponseItem>,
|
||||
replacement_prefix: Vec<ResponseItem>,
|
||||
captured_context: Vec<ResponseItem>,
|
||||
captured_reference_context_item: Option<TurnContextItem>,
|
||||
}
|
||||
|
||||
pub(crate) async fn maybe_start_prefix_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
total_usage_tokens: i64,
|
||||
auto_compact_limit: i64,
|
||||
) {
|
||||
if !turn_context.features.enabled(Feature::PrefixCompaction) {
|
||||
return;
|
||||
}
|
||||
|
||||
if !provider_supports_inline_remote_compaction(turn_context.provider.info()) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(prefix_compact_limit) = prefix_compact_token_limit(auto_compact_limit) else {
|
||||
return;
|
||||
};
|
||||
if total_usage_tokens < prefix_compact_limit || total_usage_tokens >= auto_compact_limit {
|
||||
return;
|
||||
}
|
||||
|
||||
let captured_context = sess.build_initial_context(turn_context.as_ref()).await;
|
||||
let captured_reference_context_item = Some(turn_context.to_turn_context_item());
|
||||
|
||||
let mut state = sess.state.lock().await;
|
||||
if state.prefix_compact_task.is_some() {
|
||||
trace!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
total_usage_tokens,
|
||||
prefix_compact_limit,
|
||||
auto_compact_limit,
|
||||
"prefix compaction already running or ready"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let base_history = state.history.raw_items().to_vec();
|
||||
if base_history.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let task = spawn_prefix_compact_task(
|
||||
Arc::clone(sess),
|
||||
Arc::clone(turn_context),
|
||||
base_history.clone(),
|
||||
captured_context,
|
||||
captured_reference_context_item,
|
||||
);
|
||||
state.prefix_compact_task = Some(task);
|
||||
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
model_slug = %turn_context.model_info.slug,
|
||||
total_usage_tokens,
|
||||
prefix_compact_limit,
|
||||
auto_compact_limit,
|
||||
base_history_len = base_history.len(),
|
||||
"starting background prefix compaction"
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) async fn try_apply_ready_prefix_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
) -> CodexResult<bool> {
|
||||
let Some(task) = take_finished_prefix_compact_task(sess).await else {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
let candidate = match task.await {
|
||||
Ok(Ok(candidate)) => candidate,
|
||||
Ok(Err(err)) => {
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"prefix compaction failed: {err:#}"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"prefix compaction task failed: {err:#}"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
if apply_prefix_compact_candidate(sess, turn_context, candidate).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"prefix compaction candidate is stale; running foreground auto-compaction"
|
||||
);
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub(crate) async fn abandon_prefix_compact(sess: &Arc<Session>) {
|
||||
let mut state = sess.state.lock().await;
|
||||
abort_prefix_compact_task(&mut state.prefix_compact_task);
|
||||
}
|
||||
|
||||
pub(crate) fn abort_prefix_compact_task(task: &mut Option<PrefixCompactTask>) {
|
||||
if let Some(task) = task.take() {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn prefix_compact_token_limit(auto_compact_limit: i64) -> Option<i64> {
|
||||
if auto_compact_limit == i64::MAX || auto_compact_limit <= 1 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let token_limit = auto_compact_limit.saturating_mul(60) / 100;
|
||||
let token_limit = token_limit.clamp(1, auto_compact_limit.saturating_sub(1));
|
||||
Some(token_limit)
|
||||
}
|
||||
|
||||
async fn take_finished_prefix_compact_task(sess: &Arc<Session>) -> Option<PrefixCompactTask> {
|
||||
let mut state = sess.state.lock().await;
|
||||
if state
|
||||
.prefix_compact_task
|
||||
.as_ref()
|
||||
.is_some_and(|task| task.is_finished())
|
||||
{
|
||||
state.prefix_compact_task.take()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_prefix_compact_candidate(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
candidate: PrefixCompactCandidate,
|
||||
) -> CodexResult<bool> {
|
||||
let current_history = sess.clone_history().await;
|
||||
let current_items = current_history.raw_items();
|
||||
if current_items.len() < candidate.base_history.len()
|
||||
|| current_items[..candidate.base_history.len()] != candidate.base_history
|
||||
{
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
current_history_len = current_items.len(),
|
||||
base_history_len = candidate.base_history.len(),
|
||||
"prefix compaction candidate no longer matches current history"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
|
||||
sess.emit_turn_item_started(turn_context, &compaction_item)
|
||||
.await;
|
||||
|
||||
let retained_suffix: Vec<ResponseItem> = current_items[candidate.base_history.len()..]
|
||||
.iter()
|
||||
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned()
|
||||
.collect();
|
||||
let reference_context_item = sess
|
||||
.reference_context_item()
|
||||
.await
|
||||
.or(candidate.captured_reference_context_item);
|
||||
let mut new_history = candidate.replacement_prefix;
|
||||
new_history.extend(candidate.captured_context);
|
||||
new_history.extend(retained_suffix);
|
||||
|
||||
new_history.extend(
|
||||
current_items
|
||||
.iter()
|
||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
let compacted_item = CompactedItem {
|
||||
message: String::new(),
|
||||
replacement_history: Some(new_history.clone()),
|
||||
};
|
||||
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
|
||||
.await;
|
||||
sess.recompute_token_usage(turn_context).await;
|
||||
sess.emit_turn_item_completed(turn_context, compaction_item)
|
||||
.await;
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"applied prefix compaction candidate"
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn spawn_prefix_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
base_history: Vec<ResponseItem>,
|
||||
captured_context: Vec<ResponseItem>,
|
||||
captured_reference_context_item: Option<TurnContextItem>,
|
||||
) -> PrefixCompactTask {
|
||||
tokio::spawn(async move {
|
||||
let replacement_prefix = run_remote_prefix_compact_task(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
base_history.clone(),
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
model_slug = %turn_context.model_info.slug,
|
||||
base_history_len = base_history.len(),
|
||||
replacement_prefix_len = replacement_prefix.len(),
|
||||
captured_context_len = captured_context.len(),
|
||||
"background prefix compaction ready"
|
||||
);
|
||||
Ok(PrefixCompactCandidate {
|
||||
base_history,
|
||||
replacement_prefix,
|
||||
captured_context,
|
||||
captured_reference_context_item,
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -6312,6 +6312,19 @@ async fn unified_exec_rejects_escalated_permissions_when_policy_not_on_request()
|
||||
pretty_assertions::assert_eq!(output, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_compact_token_limit_defaults_to_sixty_percent_of_auto_compact() {
|
||||
assert_eq!(
|
||||
prefix_compact_token_limit(/*auto_compact_limit*/ 1_000),
|
||||
Some(600)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_compact_token_limit_is_none_for_tiny_auto_compact_limit() {
|
||||
assert_eq!(prefix_compact_token_limit(/*auto_compact_limit*/ 1), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn session_start_hooks_only_load_from_trusted_project_layers() -> std::io::Result<()> {
|
||||
let temp = tempfile::tempdir()?;
|
||||
|
||||
@@ -38,6 +38,9 @@ use crate::parse_turn_item;
|
||||
use crate::plugins::build_plugin_injections;
|
||||
use crate::resolve_skill_dependencies_for_turn;
|
||||
use crate::session::PreviousTurnSettings;
|
||||
use crate::session::prefix_compaction::abandon_prefix_compact;
|
||||
use crate::session::prefix_compaction::maybe_start_prefix_compact;
|
||||
use crate::session::prefix_compaction::try_apply_ready_prefix_compact;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::stream_events_utils::HandleOutputCtx;
|
||||
@@ -155,6 +158,18 @@ pub(crate) async fn run_turn(
|
||||
if pre_sampling_compacted && let Some(mut client_session) = prewarmed_client_session.take() {
|
||||
client_session.reset_websocket_session();
|
||||
}
|
||||
// This pre-sampling kickoff captures full context before this turn records its
|
||||
// context diff. If the candidate is later applied, that captured context can
|
||||
// sit directly before a retained suffix that starts with the same turn's diff.
|
||||
// This is redundant but intentional: we keep the suffix untouched and rely on
|
||||
// the normal prefix/current-history match check before applying.
|
||||
maybe_start_prefix_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
sess.get_total_token_usage().await,
|
||||
auto_compact_limit,
|
||||
)
|
||||
.await;
|
||||
|
||||
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
|
||||
|
||||
@@ -508,6 +523,14 @@ pub(crate) async fn run_turn(
|
||||
continue;
|
||||
}
|
||||
|
||||
maybe_start_prefix_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
total_usage_tokens,
|
||||
auto_compact_limit,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !needs_follow_up {
|
||||
last_agent_message = sampling_request_last_agent_message;
|
||||
let stop_hook_permission_mode = match turn_context.approval_policy.value() {
|
||||
@@ -794,6 +817,11 @@ async fn run_auto_compact(
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<()> {
|
||||
if try_apply_ready_prefix_compact(sess, turn_context).await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
abandon_prefix_compact(sess).await;
|
||||
if should_use_remote_compact_task(turn_context.provider.info()) {
|
||||
run_inline_remote_auto_compact_task(
|
||||
Arc::clone(sess),
|
||||
|
||||
@@ -9,6 +9,8 @@ use std::collections::HashSet;
|
||||
use crate::agent_identity::RegisteredAgentTask;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::session::PreviousTurnSettings;
|
||||
use crate::session::prefix_compaction::PrefixCompactTask;
|
||||
use crate::session::prefix_compaction::abort_prefix_compact_task;
|
||||
use crate::session::session::SessionConfiguration;
|
||||
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
@@ -35,6 +37,7 @@ pub(crate) struct SessionState {
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
|
||||
granted_permissions: Option<PermissionProfile>,
|
||||
pub(crate) prefix_compact_task: Option<PrefixCompactTask>,
|
||||
next_turn_is_first: bool,
|
||||
}
|
||||
|
||||
@@ -55,6 +58,7 @@ impl SessionState {
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
granted_permissions: None,
|
||||
prefix_compact_task: None,
|
||||
next_turn_is_first: true,
|
||||
}
|
||||
}
|
||||
@@ -100,6 +104,7 @@ impl SessionState {
|
||||
self.history.replace(items);
|
||||
self.history
|
||||
.set_reference_context_item(reference_context_item);
|
||||
abort_prefix_compact_task(&mut self.prefix_compact_task);
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
|
||||
|
||||
@@ -2,9 +2,11 @@
|
||||
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_features::Feature;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
@@ -37,9 +39,13 @@ use core_test_support::wait_for_event_match;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tokio::time::Duration;
|
||||
use wiremock::ResponseTemplate;
|
||||
|
||||
fn enable_prefix_compaction(config: &mut codex_core::config::Config) {
|
||||
let _ = config.features.enable(Feature::PrefixCompaction);
|
||||
let _ = config.features.disable(Feature::ToolSuggest);
|
||||
}
|
||||
|
||||
fn approx_token_count(text: &str) -> i64 {
|
||||
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
|
||||
}
|
||||
@@ -55,6 +61,15 @@ fn estimate_compact_payload_tokens(request: &responses::ResponsesRequest) -> i64
|
||||
.saturating_add(approx_token_count(&request.instructions_text()))
|
||||
}
|
||||
|
||||
fn first_input_index_containing(request: &responses::ResponsesRequest, needle: &str) -> usize {
|
||||
request
|
||||
.input()
|
||||
.iter()
|
||||
.position(|item| item.to_string().contains(needle))
|
||||
.unwrap_or_else(|| panic!("expected request input to contain {needle:?}"))
|
||||
}
|
||||
|
||||
const PRETURN_CONTEXT_DIFF_CWD_MARKER: &str = "PRETURN_CONTEXT_DIFF_CWD";
|
||||
const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD";
|
||||
const DUMMY_FUNCTION_NAME: &str = "test_tool";
|
||||
const REMOTE_COMPACT_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
@@ -86,6 +101,19 @@ fn compacted_summary_only_output(summary: &str) -> Vec<ResponseItem> {
|
||||
}]
|
||||
}
|
||||
|
||||
async fn wait_for_response_requests(mock_response: &responses::ResponseMock, count: usize) {
|
||||
for _ in 0..100 {
|
||||
if mock_response.requests().len() >= count {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
panic!(
|
||||
"expected at least {count} request(s), got {}",
|
||||
mock_response.requests().len()
|
||||
);
|
||||
}
|
||||
|
||||
fn remote_realtime_test_codex_builder(
|
||||
realtime_server: &responses::WebSocketTestServer,
|
||||
) -> TestCodexBuilder {
|
||||
@@ -1064,6 +1092,217 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn prefix_compact_uses_sixty_percent_of_auto_compact_limit() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.model_context_window = Some(20_000);
|
||||
config.model_auto_compact_token_limit = Some(800);
|
||||
enable_prefix_compaction(config);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
responses::mount_sse_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", /*total_tokens*/ 450),
|
||||
]),
|
||||
sse(vec![
|
||||
responses::ev_assistant_message("m2", "SECOND_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-2", /*total_tokens*/ 650),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let compact_mock = responses::mount_compact_json_once(
|
||||
harness.server(),
|
||||
json!({ "output": compacted_summary_only_output("PREFIX_THRESHOLD_SUMMARY") }),
|
||||
)
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "FIRST_REMOTE_USER".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
assert_eq!(compact_mock.requests().len(), 0);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "SECOND_REMOTE_USER".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
wait_for_response_requests(&compact_mock, /*count*/ 1).await;
|
||||
|
||||
assert_eq!(compact_mock.requests().len(), 1);
|
||||
assert_eq!(
|
||||
compact_mock.single_request().body_json()["mode"],
|
||||
json!("prefix")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn ready_prefix_compact_is_applied_by_pre_turn_auto_compact() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.model_context_window = Some(20_000);
|
||||
config.model_auto_compact_token_limit = Some(800);
|
||||
enable_prefix_compaction(config);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_sse_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", /*total_tokens*/ 650),
|
||||
]),
|
||||
sse(vec![
|
||||
responses::ev_assistant_message("m2", "SECOND_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-2", /*total_tokens*/ 850),
|
||||
]),
|
||||
sse(vec![
|
||||
responses::ev_assistant_message("m3", "THIRD_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-3", /*total_tokens*/ 200),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let compact_mock = responses::mount_compact_json_once(
|
||||
harness.server(),
|
||||
json!({ "output": compacted_summary_only_output("PREFIX_READY_SUMMARY") }),
|
||||
)
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "FIRST_REMOTE_USER".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
wait_for_response_requests(&compact_mock, /*count*/ 1).await;
|
||||
assert_eq!(
|
||||
compact_mock.single_request().body_json()["mode"],
|
||||
json!("prefix")
|
||||
);
|
||||
|
||||
codex
|
||||
.submit(Op::OverrideTurnContext {
|
||||
cwd: Some(PathBuf::from(PRETURN_CONTEXT_DIFF_CWD)),
|
||||
approval_policy: None,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "SECOND_REMOTE_USER".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "THIRD_REMOTE_USER".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
assert_eq!(
|
||||
compact_mock.requests().len(),
|
||||
1,
|
||||
"ready prefix compact should replace the foreground compact request"
|
||||
);
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(requests.len(), 3);
|
||||
let third_request = &requests[2];
|
||||
let third_request_body = third_request.body_json().to_string();
|
||||
assert!(third_request_body.contains("PREFIX_READY_SUMMARY"));
|
||||
assert!(!third_request_body.contains("FIRST_REMOTE_USER"));
|
||||
assert!(third_request_body.contains("SECOND_REMOTE_USER"));
|
||||
assert!(third_request_body.contains("THIRD_REMOTE_USER"));
|
||||
let summary_index = first_input_index_containing(third_request, "PREFIX_READY_SUMMARY");
|
||||
let captured_context_index =
|
||||
first_input_index_containing(third_request, PRETURN_CONTEXT_DIFF_CWD_MARKER);
|
||||
let suffix_user_index = first_input_index_containing(third_request, "SECOND_REMOTE_USER");
|
||||
let current_user_index = first_input_index_containing(third_request, "THIRD_REMOTE_USER");
|
||||
assert!(
|
||||
summary_index < captured_context_index,
|
||||
"captured context should follow the compacted prefix summary"
|
||||
);
|
||||
assert!(
|
||||
captured_context_index < suffix_user_index,
|
||||
"captured context should precede retained suffix messages"
|
||||
);
|
||||
assert!(
|
||||
suffix_user_index < current_user_index,
|
||||
"retained suffix should preserve chronological user message order"
|
||||
);
|
||||
let stale_context_count = third_request
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.filter(|text| text.contains(PRETURN_CONTEXT_DIFF_CWD_MARKER))
|
||||
.count();
|
||||
assert_eq!(
|
||||
stale_context_count, 1,
|
||||
"prefix compaction should preserve context updates in the retained suffix"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_manual_compact_failure_emits_task_error_event() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -197,6 +197,9 @@ pub enum Feature {
|
||||
ResponsesWebsockets,
|
||||
/// Legacy rollout flag for Responses API WebSocket transport v2 experiments.
|
||||
ResponsesWebsocketsV2,
|
||||
/// Precompute prefix compaction in the background before the foreground
|
||||
/// auto-compaction threshold is reached.
|
||||
PrefixCompaction,
|
||||
/// Use the agent identity registration flow for ChatGPT-authenticated sessions.
|
||||
UseAgentIdentity,
|
||||
/// Enable workspace dependency support.
|
||||
@@ -980,6 +983,16 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::Removed,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::PrefixCompaction,
|
||||
key: "prefix_compaction",
|
||||
stage: Stage::Experimental {
|
||||
name: "Prefix compaction",
|
||||
menu_description: "Precompute history compaction in the background before the normal context compaction threshold is reached.",
|
||||
announcement: "",
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::UseAgentIdentity,
|
||||
key: "use_agent_identity",
|
||||
|
||||
@@ -103,6 +103,23 @@ fn guardian_approval_is_experimental_and_user_toggleable() {
|
||||
assert_eq!(Feature::GuardianApproval.default_enabled(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_compaction_is_experimental_and_user_toggleable() {
|
||||
let spec = Feature::PrefixCompaction.info();
|
||||
let stage = spec.stage;
|
||||
|
||||
assert!(matches!(stage, Stage::Experimental { .. }));
|
||||
assert_eq!(stage.experimental_menu_name(), Some("Prefix compaction"));
|
||||
assert_eq!(
|
||||
stage.experimental_menu_description(),
|
||||
Some(
|
||||
"Precompute history compaction in the background before the normal context compaction threshold is reached."
|
||||
)
|
||||
);
|
||||
assert_eq!(stage.experimental_announcement(), None);
|
||||
assert_eq!(Feature::PrefixCompaction.default_enabled(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn external_migration_is_experimental_and_disabled_by_default() {
|
||||
let spec = Feature::ExternalMigration.info();
|
||||
@@ -417,6 +434,21 @@ usage_hint_enabled = false
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_compaction_feature_config_deserializes_bool() {
|
||||
let features: FeaturesToml = toml::from_str(
|
||||
r#"
|
||||
prefix_compaction = true
|
||||
"#,
|
||||
)
|
||||
.expect("features table should deserialize");
|
||||
|
||||
assert_eq!(
|
||||
features.entries(),
|
||||
BTreeMap::from([("prefix_compaction".to_string(), true)])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unstable_warning_event_only_mentions_enabled_under_development_features() {
|
||||
let mut configured_features = Table::new();
|
||||
|
||||
@@ -2076,7 +2076,7 @@ pub struct ModelRerouteEvent {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct ContextCompactedEvent;
|
||||
pub struct ContextCompactedEvent {}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct TurnCompleteEvent {
|
||||
|
||||
@@ -2206,9 +2206,14 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn on_agent_message(&mut self, message: String) {
|
||||
self.finalize_completed_assistant_message(Some(&message));
|
||||
fn on_context_compacted(&mut self) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
self.handle_stream_finished();
|
||||
self.add_to_history(history_cell::new_info_event(
|
||||
"Context compacted".to_owned(),
|
||||
/*hint*/ None,
|
||||
));
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
fn on_agent_message_delta(&mut self, delta: String) {
|
||||
@@ -5985,7 +5990,7 @@ impl ChatWidget {
|
||||
self.exit_review_mode_after_item();
|
||||
}
|
||||
ThreadItem::ContextCompaction { .. } => {
|
||||
self.add_info_message("Context compacted".to_string(), /*hint*/ None);
|
||||
self.on_context_compacted();
|
||||
}
|
||||
ThreadItem::HookPrompt { .. } => {}
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
@@ -6654,7 +6659,7 @@ impl ChatWidget {
|
||||
// TODO(ccunningham): stop relying on legacy AgentMessage in review mode,
|
||||
// including thread-snapshot replay, and forward
|
||||
// ItemCompleted(TurnItem::AgentMessage(_)) instead.
|
||||
self.on_agent_message(message)
|
||||
self.finalize_completed_assistant_message(Some(&message))
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message, .. }) => {
|
||||
if !message.is_empty() {
|
||||
|
||||
@@ -799,6 +799,37 @@ async fn replayed_in_progress_turn_marks_task_running() {
|
||||
assert_eq!(status.header(), "Working");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replayed_context_compaction_renders_history_marker() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
chat.replay_thread_turns(
|
||||
vec![AppServerTurn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![AppServerThreadItem::ContextCompaction {
|
||||
id: "compact-1".to_string(),
|
||||
}],
|
||||
status: AppServerTurnStatus::Completed,
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
}],
|
||||
ReplayKind::ResumeInitialMessages,
|
||||
);
|
||||
|
||||
let cells = drain_insert_history(&mut rx);
|
||||
let text = cells
|
||||
.iter()
|
||||
.map(|cell| lines_to_single_string(cell))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert!(
|
||||
text.contains("Context compacted"),
|
||||
"expected replayed compaction marker, got {text:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replayed_stream_error_does_not_set_retry_status_or_status_indicator() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
Reference in New Issue
Block a user