Compare commits

...

18 Commits

Author SHA1 Message Date
Cooper Gamble
cbc6de287b [codex-core] Trim inline compaction coverage [ci changed_files]
- reduce the server-side compaction test matrix to the highest-signal cases
- add comments around the deferred checkpoint rewrite and inline/preflight split

Co-authored-by: Codex <noreply@openai.com>
2026-03-08 21:08:09 -07:00
Cooper Gamble
6c851b25ba [codex-core] Prefer full inline compaction prefix [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 03:32:40 +00:00
Cooper Gamble
81a2737b44 [codex-core] Avoid duplicate inline compaction context [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 03:09:02 +00:00
Cooper Gamble
7ae01b72a1 [codex-core] Fix inline compaction commit semantics [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 02:51:09 +00:00
Cooper Gamble
6d0dd1e4c8 [codex-core] Preserve raw inline compaction event order [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 02:15:55 +00:00
Cooper Gamble
1efe88de5d [codex-core] Fix inline compaction event ordering and token accounting [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 02:03:06 +00:00
Cooper Gamble
f91477b576 [codex-core] Preserve inline compaction checkpoint ordering [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 01:35:14 +00:00
Cooper Gamble
42b73019ee [codex-core] Preserve inline compaction turn prompt state [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 01:12:22 +00:00
Cooper Gamble
3ed2b5e596 [codex-core] Fix inline compaction checkpoint history layout [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 00:53:34 +00:00
Cooper Gamble
da5193c1a0 [codex-core] Fix inline compaction retry and model-switch preflight [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-09 00:18:25 +00:00
Cooper Gamble
17bd5fa967 [core] Fix empty-history inline summary replacement [ci changed_files]
Handle repeated inline compactions on turns that started from empty history by stripping leading compaction items after prefix calculation, and add regression coverage for the fresh-session case.

Co-authored-by: Codex <noreply@openai.com>
2026-03-08 23:36:28 +00:00
Cooper Gamble
54374b23c5 [core] Always inline OpenAI auto-compaction [ci changed_files]
Ignore compact_prompt for OpenAI inline auto-compaction, remove the legacy compat downgrade path, and keep /compact on the point-in-time endpoint. Also skip previous-model preflight remote compaction when inline server-side compaction is available.\n\nCo-authored-by: Codex <noreply@openai.com>
2026-03-08 23:25:57 +00:00
Cooper Gamble
1feeabed7a [codex-core] Remove redundant ThreadId clone in compaction test [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-08 16:05:04 -07:00
Cooper Gamble
f0f7c602b5 [core] Preserve inline compaction turn state [ci changed_files]
Preserve current-turn history when inline compaction downgrades fail and replace prior same-turn compaction checkpoints instead of stacking them.

Tests:
- cargo test -p codex-core codex::tests::build_server_side_compaction_replacement_history_keeps_current_turn_inputs -- --exact
- cargo test -p codex-core codex::tests::build_server_side_compaction_replacement_history_replaces_prior_same_turn_summary -- --exact
- cargo test -p codex-core codex::tests::downgrade_known_inline_compaction_error_restores_current_turn_when_fallback_fails -- --exact

Co-authored-by: Codex <noreply@openai.com>
2026-03-08 20:37:22 +00:00
Cooper Gamble
af505f3c6d [codex-core] Refresh config schema for server-side compaction [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
2026-03-08 11:40:17 -07:00
Cooper Gamble
6f8f0f5a48 [codex-core] Harden inline compaction checkpoint handling [ci changed_files]
Keep current-turn inputs in local inline compaction checkpoints and remember known backend incompatibilities after a compat downgrade so later turns skip the failed inline request path.

Co-authored-by: Codex <noreply@openai.com>
2026-03-08 18:17:42 +00:00
Cooper Gamble
26d84f1895 [codex-core] Preserve ghost snapshots in compaction fallback [ci changed_files]
Keep same-turn ghost snapshots when pre-turn inline compaction downgrades to the legacy client-side path so undo state survives compatibility fallback.

Co-authored-by: Codex <noreply@openai.com>
2026-03-08 16:32:52 +00:00
Cooper Gamble
7675a87533 [codex-core] Add feature-flagged server-side compaction [ci changed_files]
Move normal auto-compaction onto inline Responses API compaction behind a feature flag, keep the legacy path for manual and compatibility cases, and add observability plus integration coverage.

Co-authored-by: Codex <noreply@openai.com>
2026-03-08 16:11:53 +00:00
13 changed files with 992 additions and 24 deletions

View File

@@ -140,6 +140,23 @@ impl From<VerbosityConfig> for OpenAiVerbosity {
}
}
#[derive(Debug, Serialize, Clone, PartialEq)]
#[serde(rename_all = "snake_case")]
pub struct ContextManagement {
pub r#type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub compact_threshold: Option<i64>,
}
impl ContextManagement {
pub fn compaction(compact_threshold: i64) -> Self {
Self {
r#type: "compaction".to_string(),
compact_threshold: Some(compact_threshold),
}
}
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct ResponsesApiRequest {
pub model: String,
@@ -158,6 +175,8 @@ pub struct ResponsesApiRequest {
pub prompt_cache_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<TextControls>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context_management: Option<Vec<ContextManagement>>,
}
impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
@@ -177,6 +196,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
service_tier: request.service_tier.clone(),
prompt_cache_key: request.prompt_cache_key.clone(),
text: request.text.clone(),
context_management: request.context_management.clone(),
generate: None,
client_metadata: None,
}
@@ -204,6 +224,8 @@ pub struct ResponseCreateWsRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<TextControls>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context_management: Option<Vec<ContextManagement>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub generate: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_metadata: Option<HashMap<String, String>>,

View File

@@ -268,6 +268,7 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
service_tier: None,
prompt_cache_key: None,
text: None,
context_management: None,
};
let client = ResponsesClient::new(transport.clone(), provider, NoAuth);
@@ -310,6 +311,7 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
service_tier: None,
prompt_cache_key: None,
text: None,
context_management: None,
};
let mut extra_headers = HeaderMap::new();

View File

@@ -425,6 +425,9 @@
"search_tool": {
"type": "boolean"
},
"server_side_compaction": {
"type": "boolean"
},
"shell_snapshot": {
"type": "boolean"
},
@@ -1897,6 +1900,9 @@
"search_tool": {
"type": "boolean"
},
"server_side_compaction": {
"type": "boolean"
},
"shell_snapshot": {
"type": "boolean"
},

View File

@@ -52,6 +52,7 @@ use codex_api::SseTelemetry;
use codex_api::TransportError;
use codex_api::WebsocketTelemetry;
use codex_api::build_conversation_headers;
use codex_api::common::ContextManagement as ApiContextManagement;
use codex_api::common::Reasoning;
use codex_api::common::ResponsesWsRequest;
use codex_api::create_text_param_for_request;
@@ -533,6 +534,17 @@ impl ModelClientSession {
};
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
let context_management = prompt
.inline_compaction_threshold
.map(ApiContextManagement::compaction)
.map(|entry| vec![entry]);
if let Some(threshold) = prompt.inline_compaction_threshold {
trace!(
model = %model_info.slug,
compact_threshold = threshold,
"attaching inline server-side compaction to responses request"
);
}
let request = ResponsesApiRequest {
model: model_info.slug.clone(),
instructions: instructions.clone(),
@@ -551,6 +563,7 @@ impl ModelClientSession {
},
prompt_cache_key,
text,
context_management,
};
Ok(request)
}

View File

@@ -42,6 +42,9 @@ pub struct Prompt {
/// Optional the output schema for the model's response.
pub output_schema: Option<Value>,
/// Optional inline server-side compaction threshold for the next Responses request.
pub inline_compaction_threshold: Option<i64>,
}
impl Prompt {
@@ -343,6 +346,7 @@ mod tests {
verbosity: Some(OpenAiVerbosity::Low),
format: None,
}),
context_management: None,
};
let v = serde_json::to_value(&req).expect("json");
@@ -382,6 +386,7 @@ mod tests {
prompt_cache_key: None,
service_tier: None,
text: Some(text_controls),
context_management: None,
};
let v = serde_json::to_value(&req).expect("json");
@@ -419,6 +424,7 @@ mod tests {
prompt_cache_key: None,
service_tier: None,
text: None,
context_management: None,
};
let v = serde_json::to_value(&req).expect("json");
@@ -441,6 +447,7 @@ mod tests {
prompt_cache_key: None,
service_tier: Some(ServiceTier::Flex.to_string()),
text: None,
context_management: None,
};
let v = serde_json::to_value(&req).expect("json");

View File

@@ -20,6 +20,7 @@ use crate::apps::render_apps_section;
use crate::commit_attribution::commit_message_trailer_instruction;
use crate::compact;
use crate::compact::InitialContextInjection;
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
@@ -41,6 +42,7 @@ use crate::realtime_conversation::handle_start as handle_realtime_conversation_s
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
use crate::rollout::session_index;
use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::PendingServerSideCompactionCheckpoint;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
@@ -2323,6 +2325,7 @@ impl Session {
BaseInstructions {
text: base_instructions,
},
None,
);
let startup_turn_metadata_header = startup_turn_context
.turn_metadata_state
@@ -3048,6 +3051,46 @@ impl Session {
state.record_items(items.iter(), turn_context.truncation_policy);
}
pub(crate) async fn apply_server_side_compaction_checkpoint(
&self,
turn_context: &TurnContext,
item: ResponseItem,
compaction_initial_context: &[ResponseItem],
turn_start_context_items: &[ResponseItem],
history_before_turn: &[ResponseItem],
history_at_checkpoint: &[ResponseItem],
) {
// The server emits compaction as a streamed item before the response is fully complete.
// Wait until `response.completed` to rewrite local history so later streamed items from the
// same turn can still be appended in wire order before we collapse the checkpoint.
let current_history = self.clone_history().await;
let replacement_history = build_server_side_compaction_replacement_history(
item.clone(),
compaction_initial_context,
turn_start_context_items,
history_before_turn,
history_at_checkpoint,
current_history.raw_items(),
);
let reference_context_item = Some(turn_context.to_turn_context_item());
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(replacement_history.clone()),
};
self.replace_compacted_history(replacement_history, reference_context_item, compacted_item)
.await;
debug!(
turn_id = %turn_context.sub_id,
"applied local server-side compaction checkpoint"
);
self.services.session_telemetry.counter(
"codex.compaction_checkpoint_applied",
1,
&[("mode", "server_side")],
);
}
pub(crate) async fn record_model_warning(&self, message: impl Into<String>, ctx: &TurnContext) {
self.services
.session_telemetry
@@ -3148,7 +3191,6 @@ impl Session {
pub(crate) fn features(&self) -> ManagedFeatures {
self.features.clone()
}
pub(crate) async fn collaboration_mode(&self) -> CollaborationMode {
let state = self.state.lock().await;
state.session_configuration.collaboration_mode.clone()
@@ -3324,7 +3366,7 @@ impl Session {
pub(crate) async fn record_context_updates_and_set_reference_context_item(
&self,
turn_context: &TurnContext,
) {
) -> Vec<ResponseItem> {
let reference_context_item = {
let state = self.state.lock().await;
state.reference_context_item()
@@ -3351,6 +3393,7 @@ impl Session {
// context items. This keeps later runtime diffing aligned with the current turn state.
let mut state = self.state.lock().await;
state.set_reference_context_item(Some(turn_context_item));
context_items
}
pub(crate) async fn update_token_usage_info(
@@ -5142,17 +5185,21 @@ pub(crate) async fn run_turn(
// new user message are recorded. Estimate pending incoming items (context
// diffs/full reinjection + user input) and trigger compaction preemptively
// when they would push the thread over the compaction threshold.
if run_pre_sampling_compact(&sess, &turn_context)
.await
.is_err()
{
error!("Failed to run pre-sampling compact");
return None;
}
let mut pending_server_side_compaction =
match run_pre_sampling_compact(&sess, &turn_context).await {
Ok(pending) => pending,
Err(err) => {
error!("Failed to run pre-sampling compact: {err}");
return None;
}
};
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
let history_before_turn = sess.clone_history().await.raw_items().to_vec();
let compaction_initial_context = sess.build_initial_context(turn_context.as_ref()).await;
let turn_start_context_items = sess
.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
let loaded_plugins = sess
@@ -5292,8 +5339,12 @@ pub(crate) async fn run_turn(
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
sess.record_user_prompt_and_emit_turn_item(
turn_context.as_ref(),
&input,
response_item.clone(),
)
.await;
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
// model/realtime injections.
@@ -5372,6 +5423,7 @@ pub(crate) async fn run_turn(
.map(|user_message| user_message.message())
.collect::<Vec<String>>();
let turn_metadata_header = turn_context.turn_metadata_state.current_header_value();
let inline_compaction_for_request = pending_server_side_compaction;
match run_sampling_request(
Arc::clone(&sess),
Arc::clone(&turn_context),
@@ -5379,6 +5431,10 @@ pub(crate) async fn run_turn(
&mut client_session,
turn_metadata_header.as_deref(),
sampling_request_input,
&compaction_initial_context,
&turn_start_context_items,
&history_before_turn,
inline_compaction_for_request.map(|pending| pending.threshold),
&turn_enabled_connectors,
skills_outcome,
&mut server_model_warning_emitted_for_turn,
@@ -5387,9 +5443,30 @@ pub(crate) async fn run_turn(
.await
{
Ok(sampling_request_output) => {
if let Some(pending_compaction) = inline_compaction_for_request {
let result = if sampling_request_output.observed_server_side_compaction {
"applied"
} else {
"skipped"
};
let extra_tags = if sampling_request_output.observed_server_side_compaction {
Vec::new()
} else {
vec![("reason", "threshold_not_reached")]
};
record_compaction_metric(
&sess,
"server_side",
pending_compaction.trigger,
result,
&extra_tags,
);
}
pending_server_side_compaction = None;
let SamplingRequestResult {
needs_follow_up,
last_agent_message: sampling_request_last_agent_message,
observed_server_side_compaction: _,
} = sampling_request_output;
let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
@@ -5409,15 +5486,37 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
if run_auto_compact(
&sess,
&turn_context,
InitialContextInjection::BeforeLastUserMessage,
)
.await
.is_err()
if let Some(threshold) =
inline_server_side_compaction_threshold(&sess, &turn_context)
{
return None;
record_compaction_metric(
&sess,
"server_side",
AutoCompactTrigger::AutoFollowUp,
"requested",
&[],
);
pending_server_side_compaction = Some(PendingServerSideCompaction {
threshold,
trigger: AutoCompactTrigger::AutoFollowUp,
});
} else {
record_inline_compaction_skip(
&sess,
&turn_context,
AutoCompactTrigger::AutoFollowUp,
);
if run_auto_compact(
&sess,
&turn_context,
InitialContextInjection::BeforeLastUserMessage,
AutoCompactTrigger::AutoFollowUp,
)
.await
.is_err()
{
return None;
}
}
continue;
}
@@ -5519,10 +5618,159 @@ pub(crate) async fn run_turn(
last_agent_message
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum AutoCompactTrigger {
AutoPreTurn,
AutoFollowUp,
PreviousModelPreflight,
}
impl AutoCompactTrigger {
fn as_str(self) -> &'static str {
match self {
AutoCompactTrigger::AutoPreTurn => "auto_preturn",
AutoCompactTrigger::AutoFollowUp => "auto_followup",
AutoCompactTrigger::PreviousModelPreflight => "previous_model_preflight",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct PendingServerSideCompaction {
threshold: i64,
trigger: AutoCompactTrigger,
}
fn build_server_side_compaction_replacement_history(
compaction_item: ResponseItem,
compaction_initial_context: &[ResponseItem],
turn_start_context_items: &[ResponseItem],
history_before_turn: &[ResponseItem],
history_at_checkpoint: &[ResponseItem],
current_history: &[ResponseItem],
) -> Vec<ResponseItem> {
// Rebuild the active turn around the compaction checkpoint:
// 1. keep the turn-local items that existed when compaction fired
// 2. replace any prior same-turn compaction summary with the newest one
// 3. re-append items that arrived later in the same streamed response
// 4. reattach ghost snapshots at the end so undo state survives the rewrite
let checkpoint_turn_items = history_at_checkpoint
.strip_prefix(history_before_turn)
.unwrap_or(history_at_checkpoint);
let stripped_compaction_initial_context =
checkpoint_turn_items.strip_prefix(compaction_initial_context);
let stripped_turn_start_context_items =
checkpoint_turn_items.strip_prefix(turn_start_context_items);
let checkpoint_turn_items = match (
stripped_compaction_initial_context,
stripped_turn_start_context_items,
) {
(Some(after_compaction_initial_context), Some(after_turn_start_context_items)) => {
if compaction_initial_context.len() >= turn_start_context_items.len() {
after_compaction_initial_context
} else {
after_turn_start_context_items
}
}
(Some(after_compaction_initial_context), None) => after_compaction_initial_context,
(None, Some(after_turn_start_context_items)) => after_turn_start_context_items,
(None, None) => checkpoint_turn_items,
};
let post_checkpoint_turn_items = current_history
.strip_prefix(history_at_checkpoint)
.unwrap_or_default();
let mut replacement_history: Vec<ResponseItem> = checkpoint_turn_items
.iter()
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
.filter(|item| !matches!(item, ResponseItem::Compaction { .. }))
.cloned()
.collect();
replacement_history.push(compaction_item);
replacement_history.extend(
post_checkpoint_turn_items
.iter()
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
.filter(|item| !matches!(item, ResponseItem::Compaction { .. }))
.cloned(),
);
let mut replacement_history = insert_initial_context_before_last_real_user_or_summary(
replacement_history,
compaction_initial_context.to_vec(),
);
replacement_history.extend(
current_history
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned(),
);
replacement_history
}
fn record_compaction_metric(
sess: &Session,
mode: &'static str,
trigger: AutoCompactTrigger,
result: &'static str,
extra_tags: &[(&'static str, &str)],
) {
let mut tags = vec![
("mode", mode),
("trigger", trigger.as_str()),
("result", result),
];
tags.extend_from_slice(extra_tags);
sess.services
.session_telemetry
.counter("codex.compaction", 1, &tags);
}
fn inline_server_side_compaction_threshold(
sess: &Session,
turn_context: &TurnContext,
) -> Option<i64> {
if !sess.enabled(Feature::ServerSideCompaction) {
return None;
}
if !should_use_remote_compact_task(&turn_context.provider) {
return None;
}
// OpenAI inline auto-compaction uses Responses `context_management`, which has no
// compaction-prompt field. Auto-compaction therefore ignores `compact_prompt`, while manual
// `/compact` still uses the point-in-time compact endpoint.
turn_context.model_info.auto_compact_token_limit()
}
fn record_inline_compaction_skip(
sess: &Session,
turn_context: &TurnContext,
trigger: AutoCompactTrigger,
) {
let reason = if !sess.enabled(Feature::ServerSideCompaction) {
"flag_off"
} else if !should_use_remote_compact_task(&turn_context.provider) {
"non_openai"
} else {
"not_eligible"
};
debug!(
turn_id = %turn_context.sub_id,
trigger = trigger.as_str(),
reason,
"skipping inline server-side compaction"
);
record_compaction_metric(
sess,
"server_side",
trigger,
"skipped",
&[("reason", reason)],
);
}
async fn run_pre_sampling_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
) -> CodexResult<Option<PendingServerSideCompaction>> {
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
maybe_run_previous_model_inline_compact(
sess,
@@ -5537,9 +5785,29 @@ async fn run_pre_sampling_compact(
.unwrap_or(i64::MAX);
// Compact if the total usage tokens are greater than the auto compact limit
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
if let Some(threshold) = inline_server_side_compaction_threshold(sess, turn_context) {
record_compaction_metric(
sess,
"server_side",
AutoCompactTrigger::AutoPreTurn,
"requested",
&[],
);
return Ok(Some(PendingServerSideCompaction {
threshold,
trigger: AutoCompactTrigger::AutoPreTurn,
}));
}
record_inline_compaction_skip(sess, turn_context, AutoCompactTrigger::AutoPreTurn);
run_auto_compact(
sess,
turn_context,
InitialContextInjection::DoNotInject,
AutoCompactTrigger::AutoPreTurn,
)
.await?;
}
Ok(())
Ok(None)
}
/// Runs pre-sampling compaction against the previous model when switching to a smaller
@@ -5553,6 +5821,13 @@ async fn maybe_run_previous_model_inline_compact(
turn_context: &Arc<TurnContext>,
total_usage_tokens: i64,
) -> CodexResult<bool> {
// Keep OpenAI auto-compaction on one path. If inline server-side compaction is eligible for
// the current turn, let the normal pre-turn inline request handle it instead of running the
// older previous-model client-side preflight flow first.
if inline_server_side_compaction_threshold(sess, turn_context).is_some() {
return Ok(false);
}
let Some(previous_turn_settings) = sess.previous_turn_settings().await else {
return Ok(false);
};
@@ -5576,10 +5851,18 @@ async fn maybe_run_previous_model_inline_compact(
&& previous_model_turn_context.model_info.slug != turn_context.model_info.slug
&& old_context_window > new_context_window;
if should_run {
record_compaction_metric(
sess,
"client_side",
AutoCompactTrigger::PreviousModelPreflight,
"requested",
&[],
);
run_auto_compact(
sess,
&previous_model_turn_context,
InitialContextInjection::DoNotInject,
AutoCompactTrigger::PreviousModelPreflight,
)
.await?;
return Ok(true);
@@ -5591,6 +5874,7 @@ async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
trigger: AutoCompactTrigger,
) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(
@@ -5607,6 +5891,7 @@ async fn run_auto_compact(
)
.await?;
}
record_compaction_metric(sess, "client_side", trigger, "applied", &[]);
Ok(())
}
@@ -5771,6 +6056,7 @@ fn build_prompt(
router: &ToolRouter,
turn_context: &TurnContext,
base_instructions: BaseInstructions,
inline_compaction_threshold: Option<i64>,
) -> Prompt {
Prompt {
input,
@@ -5779,6 +6065,7 @@ fn build_prompt(
base_instructions,
personality: turn_context.personality,
output_schema: turn_context.final_output_json_schema.clone(),
inline_compaction_threshold,
}
}
#[allow(clippy::too_many_arguments)]
@@ -5797,6 +6084,10 @@ async fn run_sampling_request(
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
input: Vec<ResponseItem>,
compaction_initial_context: &[ResponseItem],
turn_start_context_items: &[ResponseItem],
history_before_turn: &[ResponseItem],
inline_compaction_threshold: Option<i64>,
explicitly_enabled_connectors: &HashSet<String>,
skills_outcome: Option<&SkillLoadOutcome>,
server_model_warning_emitted_for_turn: &mut bool,
@@ -5819,6 +6110,7 @@ async fn run_sampling_request(
router.as_ref(),
turn_context.as_ref(),
base_instructions,
inline_compaction_threshold,
);
let mut retries = 0;
loop {
@@ -5831,6 +6123,9 @@ async fn run_sampling_request(
Arc::clone(&turn_diff_tracker),
server_model_warning_emitted_for_turn,
&prompt,
compaction_initial_context,
turn_start_context_items,
history_before_turn,
cancellation_token.child_token(),
)
.await
@@ -5998,6 +6293,7 @@ async fn built_tools(
struct SamplingRequestResult {
needs_follow_up: bool,
last_agent_message: Option<String>,
observed_server_side_compaction: bool,
}
/// Ephemeral per-response state for streaming a single proposed plan.
@@ -6551,6 +6847,9 @@ async fn try_run_sampling_request(
turn_diff_tracker: SharedTurnDiffTracker,
server_model_warning_emitted_for_turn: &mut bool,
prompt: &Prompt,
compaction_initial_context: &[ResponseItem],
turn_start_context_items: &[ResponseItem],
history_before_turn: &[ResponseItem],
cancellation_token: CancellationToken,
) -> CodexResult<SamplingRequestResult> {
feedback_tags!(
@@ -6585,11 +6884,16 @@ async fn try_run_sampling_request(
FuturesOrdered::new();
let mut needs_follow_up = false;
let mut last_agent_message: Option<String> = None;
let mut observed_server_side_compaction = false;
let mut pending_server_side_compaction_checkpoint: Option<
PendingServerSideCompactionCheckpoint,
> = None;
let mut active_item: Option<TurnItem> = None;
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let history_before_turn = Arc::new(history_before_turn.to_vec());
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<SamplingRequestResult> = loop {
let handle_responses = trace_span!(
@@ -6628,6 +6932,7 @@ async fn try_run_sampling_request(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let saw_server_side_compaction = matches!(item, ResponseItem::Compaction { .. });
let previously_active_item = active_item.take();
if let Some(previous) = previously_active_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
@@ -6666,6 +6971,10 @@ async fn try_run_sampling_request(
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
.await?;
observed_server_side_compaction |= saw_server_side_compaction;
if let Some(pending_compaction) = output_result.pending_server_side_compaction {
pending_server_side_compaction_checkpoint = Some(pending_compaction);
}
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
@@ -6679,6 +6988,10 @@ async fn try_run_sampling_request(
handle_non_tool_response_item(&item, plan_mode, Some(&turn_context.cwd)).await
{
let mut turn_item = turn_item;
if matches!(turn_item, TurnItem::ContextCompaction(_)) {
active_item = Some(turn_item);
continue;
}
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
let mut seeded_item_id: Option<String> = None;
if matches!(turn_item, TurnItem::AgentMessage(_))
@@ -6759,8 +7072,39 @@ async fn try_run_sampling_request(
&mut assistant_message_stream_parsers,
)
.await;
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
let mut applied_server_side_compaction_checkpoint = false;
if let Some(PendingServerSideCompactionCheckpoint {
history_at_checkpoint,
item,
turn_item,
}) = pending_server_side_compaction_checkpoint.take()
{
sess.apply_server_side_compaction_checkpoint(
turn_context.as_ref(),
item,
compaction_initial_context,
turn_start_context_items,
history_before_turn.as_slice(),
history_at_checkpoint.as_slice(),
)
.await;
sess.emit_turn_item_started(&turn_context, &turn_item).await;
sess.emit_turn_item_completed(&turn_context, turn_item)
.await;
if let Some(token_usage) = token_usage.as_ref() {
let mut state = sess.state.lock().await;
state.update_token_info_from_usage(
token_usage,
turn_context.model_context_window(),
);
}
sess.recompute_token_usage(&turn_context).await;
applied_server_side_compaction_checkpoint = true;
}
if !applied_server_side_compaction_checkpoint {
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
.await;
}
should_emit_turn_diff = true;
needs_follow_up |= sess.has_pending_input().await;
@@ -6768,6 +7112,7 @@ async fn try_run_sampling_request(
break Ok(SamplingRequestResult {
needs_follow_up,
last_agent_message,
observed_server_side_compaction,
});
}
ResponseEvent::OutputTextDelta(delta) => {

View File

@@ -47,6 +47,7 @@ use crate::tools::registry::ToolHandler;
use crate::tools::router::ToolCallSource;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_app_server_protocol::AppInfo;
use codex_git::GhostCommit;
use codex_otel::TelemetryAuthMode;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
@@ -98,6 +99,30 @@ fn user_message(text: &str) -> ResponseItem {
}
}
fn developer_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn environment_context_message(cwd: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("<environment_context>\n<cwd>{cwd}</cwd>\n</environment_context>"),
}],
end_turn: None,
phase: None,
}
}
fn assistant_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -122,6 +147,12 @@ fn skill_message(text: &str) -> ResponseItem {
}
}
fn ghost_snapshot(id: &str) -> ResponseItem {
ResponseItem::GhostSnapshot {
ghost_commit: GhostCommit::new(id.to_string(), None, Vec::new(), Vec::new()),
}
}
fn developer_input_texts(items: &[ResponseItem]) -> Vec<&str> {
items
.iter()
@@ -219,6 +250,118 @@ fn assistant_message_stream_parsers_seed_plan_parser_across_added_and_delta_boun
assert!(tail.plan_segments.is_empty());
}
#[test]
fn build_server_side_compaction_replacement_history_keeps_current_turn_inputs() {
let prior_snapshot = ghost_snapshot("ghost-before");
let same_turn_snapshot = ghost_snapshot("ghost-during");
let history_before_turn = vec![user_message("earlier"), prior_snapshot.clone()];
let turn_start_context_items = vec![
developer_message("<model_switch>\nuse the new model"),
environment_context_message("/fresh"),
];
let current_turn_user = user_message("current turn");
let skill_injection = skill_message(
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nbody\n</skill>",
);
let plugin_injection = developer_message("PLUGIN_HINT");
let current_turn_tool_output = ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("tool result".to_string()),
};
let current_history = vec![
user_message("earlier"),
prior_snapshot.clone(),
turn_start_context_items[0].clone(),
turn_start_context_items[1].clone(),
current_turn_user.clone(),
skill_injection.clone(),
plugin_injection.clone(),
current_turn_tool_output.clone(),
same_turn_snapshot.clone(),
];
let compaction_item = ResponseItem::Compaction {
encrypted_content: "INLINE_SUMMARY".to_string(),
};
let replacement_history = build_server_side_compaction_replacement_history(
compaction_item.clone(),
&turn_start_context_items,
&turn_start_context_items,
&history_before_turn,
&current_history,
&current_history,
);
assert_eq!(
replacement_history,
vec![
turn_start_context_items[0].clone(),
turn_start_context_items[1].clone(),
current_turn_user,
skill_injection,
plugin_injection,
current_turn_tool_output,
compaction_item,
prior_snapshot,
same_turn_snapshot,
]
);
}
#[test]
fn build_server_side_compaction_replacement_history_prefers_longer_initial_context_prefix() {
let history_before_turn = vec![user_message("earlier")];
let turn_start_context_items = vec![developer_message("<model_switch>\nuse the new model")];
let compaction_initial_context = vec![
turn_start_context_items[0].clone(),
environment_context_message("/fresh"),
];
let current_turn_user = user_message("current turn");
let prior_compaction = ResponseItem::Compaction {
encrypted_content: "INLINE_SUMMARY_1".to_string(),
};
let new_compaction = ResponseItem::Compaction {
encrypted_content: "INLINE_SUMMARY_2".to_string(),
};
let current_turn_tool_output = ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("tool result".to_string()),
};
let history_at_checkpoint = vec![
compaction_initial_context[0].clone(),
compaction_initial_context[1].clone(),
current_turn_user.clone(),
prior_compaction,
];
let current_history = vec![
history_at_checkpoint[0].clone(),
history_at_checkpoint[1].clone(),
history_at_checkpoint[2].clone(),
history_at_checkpoint[3].clone(),
current_turn_tool_output.clone(),
];
let replacement_history = build_server_side_compaction_replacement_history(
new_compaction.clone(),
&compaction_initial_context,
&turn_start_context_items,
&history_before_turn,
&history_at_checkpoint,
&current_history,
);
assert_eq!(
replacement_history,
vec![
compaction_initial_context[0].clone(),
compaction_initial_context[1].clone(),
current_turn_user,
new_compaction,
current_turn_tool_output,
]
);
}
fn make_mcp_tool(
server_name: &str,
tool_name: &str,
@@ -3382,6 +3525,62 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inline_compaction_output_item_buffers_checkpoint_without_committing_turn_item() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
while rx.try_recv().is_ok() {}
let router = Arc::new(ToolRouter::from_config(
&tc.tools_config,
None,
None,
tc.dynamic_tools.as_slice(),
));
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let mut ctx = crate::stream_events_utils::HandleOutputCtx {
sess: Arc::clone(&sess),
turn_context: Arc::clone(&tc),
tool_runtime: crate::tools::parallel::ToolCallRuntime::new(
router,
Arc::clone(&sess),
Arc::clone(&tc),
tracker,
),
cancellation_token: tokio_util::sync::CancellationToken::new(),
};
let item = ResponseItem::Compaction {
encrypted_content: "INLINE_SERVER_SUMMARY".to_string(),
};
let output = crate::stream_events_utils::handle_output_item_done(
&mut ctx,
item.clone(),
Some(TurnItem::ContextCompaction(
codex_protocol::items::ContextCompactionItem::new(),
)),
)
.await
.expect("handle output item");
assert!(output.pending_server_side_compaction.is_some());
assert!(output.tool_future.is_none());
assert!(!output.needs_follow_up);
assert!(output.last_agent_message.is_none());
let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected raw response item event")
.expect("channel open");
assert!(matches!(
first.msg,
EventMsg::RawResponseItem(raw) if raw.item == item
));
assert!(
rx.try_recv().is_err(),
"expected no committed compaction item lifecycle before response.completed"
);
}
#[tokio::test]
async fn steer_input_requires_active_turn() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;

View File

@@ -99,6 +99,7 @@ async fn run_remote_compact_task_inner_impl(
base_instructions,
personality: turn_context.personality,
output_schema: None,
inline_compaction_threshold: None,
};
let mut new_history = sess

View File

@@ -160,6 +160,8 @@ pub enum Feature {
Artifact,
/// Enable Fast mode selection in the TUI and request layer.
FastMode,
/// Enable inline Responses API server-side compaction for auto-compaction flows.
ServerSideCompaction,
/// Enable voice transcription in the TUI composer.
VoiceTranscription,
/// Enable experimental realtime voice conversation mode in the TUI.
@@ -738,6 +740,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Stable,
default_enabled: true,
},
FeatureSpec {
id: Feature::ServerSideCompaction,
key: "server_side_compaction",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::VoiceTranscription,
key: "voice_transcription",

View File

@@ -340,6 +340,7 @@ mod job {
},
personality: None,
output_schema: Some(output_schema()),
inline_compaction_threshold: None,
};
let mut client_session = session.services.model_client.new_session();

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_protocol::config_types::ModeKind;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_utils_stream_parser::strip_citations;
use tokio_util::sync::CancellationToken;
@@ -24,6 +25,8 @@ use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RawResponseItemEvent;
use codex_utils_stream_parser::strip_proposed_plan_blocks;
use futures::Future;
use tracing::debug;
@@ -144,9 +147,16 @@ pub(crate) type InFlightFuture<'f> =
pub(crate) struct OutputItemResult {
pub last_agent_message: Option<String>,
pub needs_follow_up: bool,
pub pending_server_side_compaction: Option<PendingServerSideCompactionCheckpoint>,
pub tool_future: Option<InFlightFuture<'static>>,
}
pub(crate) struct PendingServerSideCompactionCheckpoint {
pub history_at_checkpoint: Vec<ResponseItem>,
pub item: ResponseItem,
pub turn_item: TurnItem,
}
pub(crate) struct HandleOutputCtx {
pub sess: Arc<Session>,
pub turn_context: Arc<TurnContext>,
@@ -163,6 +173,32 @@ pub(crate) async fn handle_output_item_done(
let mut output = OutputItemResult::default();
let plan_mode = ctx.turn_context.collaboration_mode.mode == ModeKind::Plan;
if matches!(item, ResponseItem::Compaction { .. }) {
let turn_item = TurnItem::ContextCompaction(match previously_active_item {
Some(TurnItem::ContextCompaction(item)) => item,
_ => ContextCompactionItem::new(),
});
// Preserve the raw wire event immediately, but defer the committed turn-item lifecycle
// until `response.completed` so later streamed output from the same response is not
// reordered around the local checkpoint rewrite.
debug!(
turn_id = %ctx.turn_context.sub_id,
"emitting streamed server-side raw compaction item and buffering committed checkpoint until response.completed"
);
ctx.sess
.send_event(
&ctx.turn_context,
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
)
.await;
output.pending_server_side_compaction = Some(PendingServerSideCompactionCheckpoint {
history_at_checkpoint: ctx.sess.clone_history().await.raw_items().to_vec(),
item,
turn_item,
});
return Ok(output);
}
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
Ok(Some(call)) => {
@@ -320,6 +356,9 @@ pub(crate) async fn handle_non_tool_response_item(
}
Some(turn_item)
}
ResponseItem::Compaction { .. } => {
Some(TurnItem::ContextCompaction(ContextCompactionItem::new()))
}
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected tool output from stream");
None

View File

@@ -621,6 +621,16 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value {
})
}
pub fn ev_compaction(encrypted_content: &str) -> Value {
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": encrypted_content,
}
})
}
pub fn user_message_item(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,

View File

@@ -5,7 +5,10 @@ use std::path::PathBuf;
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::ModelProviderInfo;
use codex_core::WireApi;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::features::Feature;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
@@ -173,6 +176,20 @@ fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
);
}
async fn submit_text_turn_and_wait(codex: &codex_core::CodexThread, text: &str) -> Result<()> {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -362,6 +379,304 @@ async fn remote_compact_runs_automatically() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_server_side_compaction_uses_inline_context_management() -> Result<()> {
skip_if_no_network!(Ok(()));
let compact_threshold = 120;
let first_turn_text = "inline compact turn one";
let third_turn_text = "inline compact turn three";
let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY");
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config
.features
.enable(Feature::ServerSideCompaction)
.expect("enable server-side compaction");
config.model_auto_compact_token_limit = Some(compact_threshold);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_response_sequence(
harness.server(),
vec![
responses::sse_response(sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-1", 500),
])),
responses::sse_response(sse(vec![
responses::ev_compaction(&inline_summary),
responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"),
responses::ev_completed_with_tokens("resp-2", 80),
])),
responses::sse_response(sse(vec![responses::ev_completed("resp-3")])),
],
)
.await;
submit_text_turn_and_wait(&codex, first_turn_text).await?;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "inline compact turn two".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let compacted = wait_for_event_match(&codex, |event| match event {
EventMsg::ContextCompacted(_) => Some(true),
_ => None,
})
.await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(compacted, "expected inline compaction event");
submit_text_turn_and_wait(&codex, third_turn_text).await?;
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
3,
"expected three inline /responses requests"
);
let inline_request = requests[1].body_json();
assert_eq!(
inline_request.get("context_management"),
Some(&json!([{
"type": "compaction",
"compact_threshold": compact_threshold,
}])),
);
let post_inline_request = &requests[2];
let post_inline_body = post_inline_request.body_json().to_string();
assert!(
post_inline_request.body_contains_text(&inline_summary),
"expected subsequent request to reuse inline compaction item"
);
assert!(
post_inline_request.body_contains_text("<permissions instructions>"),
"expected subsequent request to preserve canonical context after inline compaction"
);
assert!(
!post_inline_request.body_contains_text(first_turn_text),
"expected pre-compaction user history to be dropped after inline compaction"
);
assert!(
!post_inline_request.body_contains_text("FIRST_REMOTE_REPLY"),
"expected pre-compaction assistant history to be dropped after inline compaction"
);
assert!(
post_inline_request.body_contains_text(third_turn_text),
"expected next turn to append normally after inline compaction"
);
assert!(
post_inline_body
.find("INLINE_SERVER_SUMMARY")
.expect("inline compaction marker in subsequent request")
< post_inline_body
.find("AFTER_INLINE_REPLY")
.expect("post-compaction assistant reply in subsequent request"),
"expected post-compaction transcript items to remain after the inline compaction item"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_server_side_compaction_keeps_current_turn_inputs_for_follow_ups() -> Result<()> {
skip_if_no_network!(Ok(()));
let compact_threshold = 120;
let first_turn_text = "inline compact turn one";
let second_turn_text = "inline compact turn two";
let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY");
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config
.features
.enable(Feature::ServerSideCompaction)
.expect("enable server-side compaction");
config.model_auto_compact_token_limit = Some(compact_threshold);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-1", 500),
]),
responses::sse(vec![
responses::ev_compaction(&inline_summary),
responses::ev_function_call("call-inline-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("resp-2", 80),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "AFTER_INLINE_TOOL_REPLY"),
responses::ev_completed("resp-3"),
]),
],
)
.await;
submit_text_turn_and_wait(&codex, first_turn_text).await?;
submit_text_turn_and_wait(&codex, second_turn_text).await?;
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
3,
"expected initial request, inline-compacted tool call, and same-turn follow-up"
);
let follow_up_request = &requests[2];
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_request.body_contains_text(&inline_summary),
"expected same-turn follow-up to include the inline compaction item"
);
assert!(
follow_up_request.body_contains_text("<permissions instructions>"),
"expected same-turn follow-up to preserve canonical context after inline compaction"
);
assert!(
follow_up_request.body_contains_text(second_turn_text),
"expected same-turn follow-up to retain the current turn user input"
);
assert!(
!follow_up_request.body_contains_text(first_turn_text),
"expected same-turn follow-up to drop pre-compaction history"
);
assert!(
follow_up_body
.find(second_turn_text)
.expect("current turn text in follow-up request")
< follow_up_body
.find("INLINE_SERVER_SUMMARY")
.expect("inline compaction marker in follow-up request"),
"expected current-turn items to remain ahead of the inline compaction item"
);
assert!(
follow_up_request
.function_call_output_text("call-inline-mid-turn")
.is_some(),
"expected same-turn follow-up to include the tool output"
);
assert!(
follow_up_body
.find("INLINE_SERVER_SUMMARY")
.expect("inline compaction marker in follow-up request")
< follow_up_body
.find("call-inline-mid-turn")
.expect("post-compaction tool call in follow-up request"),
"expected the inline compaction item to remain ahead of post-compaction tool calls"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_server_side_compaction_retries_without_committing_incomplete_checkpoint() -> Result<()>
{
skip_if_no_network!(Ok(()));
let compact_threshold = 120;
let first_turn_text = "inline compact retry turn one";
let inline_summary = summary_with_prefix("INLINE_SERVER_RETRY_SUMMARY");
let server = wiremock::MockServer::start().await;
let model_provider = ModelProviderInfo {
name: "openai".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: Some("PATH".into()),
env_key_instructions: None,
experimental_bearer_token: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(1),
stream_idle_timeout_ms: Some(2000),
requires_openai_auth: false,
supports_websockets: false,
};
let responses_mock = responses::mount_response_sequence(
&server,
vec![
responses::sse_response(sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed_with_tokens("resp-1", 500),
])),
responses::sse_response(sse(vec![responses::ev_compaction(&inline_summary)])),
responses::sse_response(sse(vec![
responses::ev_assistant_message("m2", "AFTER_RETRY_REPLY"),
responses::ev_completed_with_tokens("resp-2", 40),
])),
responses::sse_response(sse(vec![
responses::ev_assistant_message("m3", "THIRD_TURN_REPLY"),
responses::ev_completed("resp-3"),
])),
],
)
.await;
let test = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
config
.features
.enable(Feature::ServerSideCompaction)
.expect("enable server-side compaction");
config.model_auto_compact_token_limit = Some(compact_threshold);
})
.build(&server)
.await?;
let codex = test.codex.clone();
submit_text_turn_and_wait(&codex, first_turn_text).await?;
submit_text_turn_and_wait(&codex, "inline compact retry turn two").await?;
submit_text_turn_and_wait(&codex, "inline compact retry turn three").await?;
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
4,
"expected initial turn, incomplete inline attempt, retry, and third turn"
);
let third_turn_request = &requests[3];
assert!(
!third_turn_request.body_contains_text(&inline_summary),
"failed inline compaction should not rewrite local history before a retry succeeds"
);
assert!(
third_turn_request.body_contains_text(first_turn_text),
"failed inline compaction should leave pre-compaction history intact for later turns"
);
assert!(
third_turn_request.body_contains_text("AFTER_RETRY_REPLY"),
"retry response should be the history that later turns build on"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {