mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
[codex-core] Add feature-flagged server-side compaction [ci changed_files]
Move normal auto-compaction onto inline Responses API compaction behind a feature flag, keep the legacy path for manual and compatibility cases, and add observability plus integration coverage. Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -146,6 +146,23 @@ impl From<VerbosityConfig> for OpenAiVerbosity {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ContextManagement {
|
||||
pub r#type: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub compact_threshold: Option<i64>,
|
||||
}
|
||||
|
||||
impl ContextManagement {
|
||||
pub fn compaction(compact_threshold: i64) -> Self {
|
||||
Self {
|
||||
r#type: "compaction".to_string(),
|
||||
compact_threshold: Some(compact_threshold),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone, PartialEq)]
|
||||
pub struct ResponsesApiRequest {
|
||||
pub model: String,
|
||||
@@ -164,6 +181,8 @@ pub struct ResponsesApiRequest {
|
||||
pub prompt_cache_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub text: Option<TextControls>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub context_management: Option<Vec<ContextManagement>>,
|
||||
}
|
||||
|
||||
impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
||||
@@ -183,6 +202,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
||||
service_tier: request.service_tier.clone(),
|
||||
prompt_cache_key: request.prompt_cache_key.clone(),
|
||||
text: request.text.clone(),
|
||||
context_management: request.context_management.clone(),
|
||||
generate: None,
|
||||
client_metadata: None,
|
||||
}
|
||||
@@ -210,6 +230,8 @@ pub struct ResponseCreateWsRequest {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub text: Option<TextControls>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub context_management: Option<Vec<ContextManagement>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
|
||||
@@ -268,6 +268,7 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
|
||||
service_tier: None,
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
let client = ResponsesClient::new(transport.clone(), provider, NoAuth);
|
||||
|
||||
@@ -310,6 +311,7 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
|
||||
service_tier: None,
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let mut extra_headers = HeaderMap::new();
|
||||
|
||||
@@ -52,6 +52,7 @@ use codex_api::SseTelemetry;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::WebsocketTelemetry;
|
||||
use codex_api::build_conversation_headers;
|
||||
use codex_api::common::ContextManagement as ApiContextManagement;
|
||||
use codex_api::common::Reasoning;
|
||||
use codex_api::common::ResponsesWsRequest;
|
||||
use codex_api::create_text_param_for_request;
|
||||
@@ -573,6 +574,17 @@ impl ModelClientSession {
|
||||
};
|
||||
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
|
||||
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
|
||||
let context_management = prompt
|
||||
.inline_compaction_threshold
|
||||
.map(ApiContextManagement::compaction)
|
||||
.map(|entry| vec![entry]);
|
||||
if let Some(threshold) = prompt.inline_compaction_threshold {
|
||||
trace!(
|
||||
model = %model_info.slug,
|
||||
compact_threshold = threshold,
|
||||
"attaching inline server-side compaction to responses request"
|
||||
);
|
||||
}
|
||||
let request = ResponsesApiRequest {
|
||||
model: model_info.slug.clone(),
|
||||
instructions: instructions.clone(),
|
||||
@@ -591,6 +603,7 @@ impl ModelClientSession {
|
||||
},
|
||||
prompt_cache_key,
|
||||
text,
|
||||
context_management,
|
||||
};
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
@@ -42,6 +42,9 @@ pub struct Prompt {
|
||||
|
||||
/// Optional the output schema for the model's response.
|
||||
pub output_schema: Option<Value>,
|
||||
|
||||
/// Optional inline server-side compaction threshold for the next Responses request.
|
||||
pub inline_compaction_threshold: Option<i64>,
|
||||
}
|
||||
|
||||
impl Prompt {
|
||||
@@ -319,6 +322,7 @@ mod tests {
|
||||
verbosity: Some(OpenAiVerbosity::Low),
|
||||
format: None,
|
||||
}),
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
@@ -358,6 +362,7 @@ mod tests {
|
||||
prompt_cache_key: None,
|
||||
service_tier: None,
|
||||
text: Some(text_controls),
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
@@ -395,6 +400,7 @@ mod tests {
|
||||
prompt_cache_key: None,
|
||||
service_tier: None,
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
@@ -417,6 +423,7 @@ mod tests {
|
||||
prompt_cache_key: None,
|
||||
service_tier: Some(ServiceTier::Flex.to_string()),
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
|
||||
@@ -119,6 +119,7 @@ use codex_utils_stream_parser::strip_citations;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::prelude::*;
|
||||
use futures::stream::FuturesOrdered;
|
||||
use reqwest::StatusCode;
|
||||
use rmcp::model::ListResourceTemplatesResult;
|
||||
use rmcp::model::ListResourcesResult;
|
||||
use rmcp::model::PaginatedRequestParams;
|
||||
@@ -2401,6 +2402,7 @@ impl Session {
|
||||
BaseInstructions {
|
||||
text: base_instructions,
|
||||
},
|
||||
None,
|
||||
);
|
||||
let startup_turn_metadata_header = startup_turn_context
|
||||
.turn_metadata_state
|
||||
@@ -3233,6 +3235,46 @@ impl Session {
|
||||
state.record_items(items.iter(), turn_context.truncation_policy);
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_server_side_compaction_checkpoint(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
item: ResponseItem,
|
||||
) {
|
||||
let ghost_snapshots = self
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.iter()
|
||||
.filter(|history_item| matches!(history_item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut replacement_history = vec![item.clone()];
|
||||
if !ghost_snapshots.is_empty() {
|
||||
replacement_history.extend(ghost_snapshots);
|
||||
}
|
||||
|
||||
let reference_context_item = Some(turn_context.to_turn_context_item());
|
||||
let compacted_item = CompactedItem {
|
||||
message: String::new(),
|
||||
replacement_history: Some(replacement_history.clone()),
|
||||
};
|
||||
self.replace_compacted_history(replacement_history, reference_context_item, compacted_item)
|
||||
.await;
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"applied local server-side compaction checkpoint"
|
||||
);
|
||||
self.services.session_telemetry.counter(
|
||||
"codex.compaction_checkpoint_applied",
|
||||
1,
|
||||
&[("mode", "server_side")],
|
||||
);
|
||||
self.send_raw_response_items(turn_context, std::slice::from_ref(&item))
|
||||
.await;
|
||||
self.recompute_token_usage(turn_context).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn record_model_warning(&self, message: impl Into<String>, ctx: &TurnContext) {
|
||||
self.services
|
||||
.session_telemetry
|
||||
@@ -3508,7 +3550,7 @@ impl Session {
|
||||
pub(crate) async fn record_context_updates_and_set_reference_context_item(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) {
|
||||
) -> Vec<ResponseItem> {
|
||||
let reference_context_item = {
|
||||
let state = self.state.lock().await;
|
||||
state.reference_context_item()
|
||||
@@ -3535,6 +3577,7 @@ impl Session {
|
||||
// context items. This keeps later runtime diffing aligned with the current turn state.
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_reference_context_item(Some(turn_context_item));
|
||||
context_items
|
||||
}
|
||||
|
||||
pub(crate) async fn update_token_usage_info(
|
||||
@@ -5355,17 +5398,21 @@ pub(crate) async fn run_turn(
|
||||
// new user message are recorded. Estimate pending incoming items (context
|
||||
// diffs/full reinjection + user input) and trigger compaction preemptively
|
||||
// when they would push the thread over the compaction threshold.
|
||||
if run_pre_sampling_compact(&sess, &turn_context)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
error!("Failed to run pre-sampling compact");
|
||||
return None;
|
||||
}
|
||||
let mut pending_server_side_compaction =
|
||||
match run_pre_sampling_compact(&sess, &turn_context).await {
|
||||
Ok(pending) => pending,
|
||||
Err(err) => {
|
||||
error!("Failed to run pre-sampling compact: {err}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
|
||||
|
||||
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
|
||||
let history_before_turn = sess.clone_history().await.raw_items().to_vec();
|
||||
let reference_context_before_turn = sess.reference_context_item().await;
|
||||
let context_update_items = sess
|
||||
.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
|
||||
.await;
|
||||
|
||||
let loaded_plugins = sess
|
||||
@@ -5504,8 +5551,12 @@ pub(crate) async fn run_turn(
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.clone().into();
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
.await;
|
||||
sess.record_user_prompt_and_emit_turn_item(
|
||||
turn_context.as_ref(),
|
||||
&input,
|
||||
response_item.clone(),
|
||||
)
|
||||
.await;
|
||||
// Track the previous-turn baseline from the regular user-turn path only so
|
||||
// standalone tasks (compact/shell/review/undo) cannot suppress future
|
||||
// model/realtime injections.
|
||||
@@ -5524,6 +5575,19 @@ pub(crate) async fn run_turn(
|
||||
.await;
|
||||
}
|
||||
|
||||
let preturn_inline_compaction_state = PreTurnInlineCompactionState {
|
||||
history_before_turn,
|
||||
reference_context_before_turn,
|
||||
replay_items: context_update_items
|
||||
.iter()
|
||||
.cloned()
|
||||
.chain(std::iter::once(response_item.clone()))
|
||||
.chain(skill_items.iter().cloned())
|
||||
.chain(plugin_items.iter().cloned())
|
||||
.collect(),
|
||||
turn_context_item: turn_context.to_turn_context_item(),
|
||||
};
|
||||
|
||||
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
|
||||
.await;
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
@@ -5638,6 +5702,7 @@ pub(crate) async fn run_turn(
|
||||
.map(|user_message| user_message.message())
|
||||
.collect::<Vec<String>>();
|
||||
let turn_metadata_header = turn_context.turn_metadata_state.current_header_value();
|
||||
let inline_compaction_for_request = pending_server_side_compaction;
|
||||
match run_sampling_request(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
@@ -5645,6 +5710,7 @@ pub(crate) async fn run_turn(
|
||||
&mut client_session,
|
||||
turn_metadata_header.as_deref(),
|
||||
sampling_request_input,
|
||||
inline_compaction_for_request.map(|pending| pending.threshold),
|
||||
&turn_enabled_connectors,
|
||||
skills_outcome,
|
||||
&mut server_model_warning_emitted_for_turn,
|
||||
@@ -5653,9 +5719,30 @@ pub(crate) async fn run_turn(
|
||||
.await
|
||||
{
|
||||
Ok(sampling_request_output) => {
|
||||
if let Some(pending_compaction) = inline_compaction_for_request {
|
||||
let result = if sampling_request_output.observed_server_side_compaction {
|
||||
"applied"
|
||||
} else {
|
||||
"skipped"
|
||||
};
|
||||
let extra_tags = if sampling_request_output.observed_server_side_compaction {
|
||||
Vec::new()
|
||||
} else {
|
||||
vec![("reason", "threshold_not_reached")]
|
||||
};
|
||||
record_compaction_metric(
|
||||
&sess,
|
||||
"server_side",
|
||||
pending_compaction.trigger,
|
||||
result,
|
||||
&extra_tags,
|
||||
);
|
||||
}
|
||||
pending_server_side_compaction = None;
|
||||
let SamplingRequestResult {
|
||||
needs_follow_up,
|
||||
last_agent_message: sampling_request_last_agent_message,
|
||||
observed_server_side_compaction: _,
|
||||
} = sampling_request_output;
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
|
||||
@@ -5675,15 +5762,37 @@ pub(crate) async fn run_turn(
|
||||
|
||||
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
|
||||
if token_limit_reached && needs_follow_up {
|
||||
if run_auto_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::BeforeLastUserMessage,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
if let Some(threshold) =
|
||||
inline_server_side_compaction_threshold(&sess, &turn_context)
|
||||
{
|
||||
return None;
|
||||
record_compaction_metric(
|
||||
&sess,
|
||||
"server_side",
|
||||
AutoCompactTrigger::AutoFollowUp,
|
||||
"requested",
|
||||
&[],
|
||||
);
|
||||
pending_server_side_compaction = Some(PendingServerSideCompaction {
|
||||
threshold,
|
||||
trigger: AutoCompactTrigger::AutoFollowUp,
|
||||
});
|
||||
} else {
|
||||
record_inline_compaction_skip(
|
||||
&sess,
|
||||
&turn_context,
|
||||
AutoCompactTrigger::AutoFollowUp,
|
||||
);
|
||||
if run_auto_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::BeforeLastUserMessage,
|
||||
AutoCompactTrigger::AutoFollowUp,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -5824,6 +5933,20 @@ pub(crate) async fn run_turn(
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
if let Some(pending_compaction) = pending_server_side_compaction
|
||||
&& downgrade_known_inline_compaction_error(
|
||||
&sess,
|
||||
&turn_context,
|
||||
pending_compaction,
|
||||
Some(&preturn_inline_compaction_state),
|
||||
&e,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
pending_server_side_compaction = None;
|
||||
continue;
|
||||
}
|
||||
info!("Turn error: {e:#}");
|
||||
let event = EventMsg::Error(e.to_error_event(None));
|
||||
sess.send_event(&turn_context, event).await;
|
||||
@@ -5836,10 +5959,240 @@ pub(crate) async fn run_turn(
|
||||
last_agent_message
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum AutoCompactTrigger {
|
||||
AutoPreTurn,
|
||||
AutoFollowUp,
|
||||
PreviousModelPreflight,
|
||||
}
|
||||
|
||||
impl AutoCompactTrigger {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
AutoCompactTrigger::AutoPreTurn => "auto_preturn",
|
||||
AutoCompactTrigger::AutoFollowUp => "auto_followup",
|
||||
AutoCompactTrigger::PreviousModelPreflight => "previous_model_preflight",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
struct PendingServerSideCompaction {
|
||||
threshold: i64,
|
||||
trigger: AutoCompactTrigger,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct PreTurnInlineCompactionState {
|
||||
history_before_turn: Vec<ResponseItem>,
|
||||
reference_context_before_turn: Option<TurnContextItem>,
|
||||
replay_items: Vec<ResponseItem>,
|
||||
turn_context_item: TurnContextItem,
|
||||
}
|
||||
|
||||
fn record_compaction_metric(
|
||||
sess: &Session,
|
||||
mode: &'static str,
|
||||
trigger: AutoCompactTrigger,
|
||||
result: &'static str,
|
||||
extra_tags: &[(&'static str, &str)],
|
||||
) {
|
||||
let mut tags = vec![
|
||||
("mode", mode),
|
||||
("trigger", trigger.as_str()),
|
||||
("result", result),
|
||||
];
|
||||
tags.extend_from_slice(extra_tags);
|
||||
sess.services
|
||||
.session_telemetry
|
||||
.counter("codex.compaction", 1, &tags);
|
||||
}
|
||||
|
||||
fn record_compaction_downgrade_metric(
|
||||
sess: &Session,
|
||||
trigger: AutoCompactTrigger,
|
||||
status: &'static str,
|
||||
reason: &'static str,
|
||||
) {
|
||||
let tags = [
|
||||
("trigger", trigger.as_str()),
|
||||
("status", status),
|
||||
("reason", reason),
|
||||
];
|
||||
sess.services
|
||||
.session_telemetry
|
||||
.counter("codex.compaction_downgrade", 1, &tags);
|
||||
}
|
||||
|
||||
fn has_custom_compact_prompt(turn_context: &TurnContext) -> bool {
|
||||
turn_context
|
||||
.compact_prompt
|
||||
.as_ref()
|
||||
.is_some_and(|prompt| prompt != compact::SUMMARIZATION_PROMPT)
|
||||
}
|
||||
|
||||
fn inline_server_side_compaction_threshold(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
) -> Option<i64> {
|
||||
if !sess.enabled(Feature::ServerSideCompaction) {
|
||||
return None;
|
||||
}
|
||||
if !should_use_remote_compact_task(&turn_context.provider) {
|
||||
return None;
|
||||
}
|
||||
if has_custom_compact_prompt(turn_context) {
|
||||
return None;
|
||||
}
|
||||
turn_context.model_info.auto_compact_token_limit()
|
||||
}
|
||||
|
||||
fn record_inline_compaction_skip(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
trigger: AutoCompactTrigger,
|
||||
) {
|
||||
let reason = if !sess.enabled(Feature::ServerSideCompaction) {
|
||||
"flag_off"
|
||||
} else if !should_use_remote_compact_task(&turn_context.provider) {
|
||||
"non_openai"
|
||||
} else if has_custom_compact_prompt(turn_context) {
|
||||
"custom_compact_prompt"
|
||||
} else {
|
||||
"not_eligible"
|
||||
};
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
trigger = trigger.as_str(),
|
||||
reason,
|
||||
"skipping inline server-side compaction"
|
||||
);
|
||||
record_compaction_metric(
|
||||
sess,
|
||||
"server_side",
|
||||
trigger,
|
||||
"skipped",
|
||||
&[("reason", reason)],
|
||||
);
|
||||
}
|
||||
|
||||
fn is_inline_compaction_compat_error(err: &CodexErr) -> bool {
|
||||
fn mentions_inline_compaction(message: &str) -> bool {
|
||||
let lower = message.to_ascii_lowercase();
|
||||
lower.contains("context_management") || lower.contains("compact_threshold")
|
||||
}
|
||||
|
||||
match err {
|
||||
CodexErr::InvalidRequest(message) => mentions_inline_compaction(message),
|
||||
CodexErr::UnexpectedStatus(error) if error.status == StatusCode::BAD_REQUEST => {
|
||||
mentions_inline_compaction(&error.body)
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn downgrade_known_inline_compaction_error(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
pending_compaction: PendingServerSideCompaction,
|
||||
preturn_state: Option<&PreTurnInlineCompactionState>,
|
||||
err: &CodexErr,
|
||||
) -> CodexResult<bool> {
|
||||
if !is_inline_compaction_compat_error(err) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
tracing::warn!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
trigger = pending_compaction.trigger.as_str(),
|
||||
error = %err,
|
||||
"downgrading inline server-side compaction to client-side compaction"
|
||||
);
|
||||
record_compaction_downgrade_metric(
|
||||
sess,
|
||||
pending_compaction.trigger,
|
||||
"attempted",
|
||||
"known_compat_error",
|
||||
);
|
||||
record_compaction_metric(
|
||||
sess,
|
||||
"server_side",
|
||||
pending_compaction.trigger,
|
||||
"downgraded",
|
||||
&[("reason", "known_compat_error")],
|
||||
);
|
||||
|
||||
let downgrade_result = match pending_compaction.trigger {
|
||||
AutoCompactTrigger::AutoPreTurn => {
|
||||
let Some(preturn_state) = preturn_state else {
|
||||
return Ok(false);
|
||||
};
|
||||
sess.replace_history(
|
||||
preturn_state.history_before_turn.clone(),
|
||||
preturn_state.reference_context_before_turn.clone(),
|
||||
)
|
||||
.await;
|
||||
run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
AutoCompactTrigger::AutoPreTurn,
|
||||
)
|
||||
.await?;
|
||||
if !preturn_state.replay_items.is_empty() {
|
||||
sess.record_into_history(&preturn_state.replay_items, turn_context)
|
||||
.await;
|
||||
sess.persist_rollout_response_items(&preturn_state.replay_items)
|
||||
.await;
|
||||
}
|
||||
sess.persist_rollout_items(&[RolloutItem::TurnContext(
|
||||
preturn_state.turn_context_item.clone(),
|
||||
)])
|
||||
.await;
|
||||
{
|
||||
let mut state = sess.state.lock().await;
|
||||
state.set_reference_context_item(Some(preturn_state.turn_context_item.clone()));
|
||||
}
|
||||
sess.recompute_token_usage(turn_context).await;
|
||||
Ok(())
|
||||
}
|
||||
AutoCompactTrigger::AutoFollowUp => {
|
||||
run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
InitialContextInjection::BeforeLastUserMessage,
|
||||
AutoCompactTrigger::AutoFollowUp,
|
||||
)
|
||||
.await
|
||||
}
|
||||
AutoCompactTrigger::PreviousModelPreflight => {
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = downgrade_result {
|
||||
record_compaction_downgrade_metric(
|
||||
sess,
|
||||
pending_compaction.trigger,
|
||||
"failed",
|
||||
"known_compat_error",
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
record_compaction_downgrade_metric(
|
||||
sess,
|
||||
pending_compaction.trigger,
|
||||
"succeeded",
|
||||
"known_compat_error",
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
) -> CodexResult<()> {
|
||||
) -> CodexResult<Option<PendingServerSideCompaction>> {
|
||||
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
|
||||
maybe_run_previous_model_inline_compact(
|
||||
sess,
|
||||
@@ -5854,9 +6207,29 @@ async fn run_pre_sampling_compact(
|
||||
.unwrap_or(i64::MAX);
|
||||
// Compact if the total usage tokens are greater than the auto compact limit
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
|
||||
if let Some(threshold) = inline_server_side_compaction_threshold(sess, turn_context) {
|
||||
record_compaction_metric(
|
||||
sess,
|
||||
"server_side",
|
||||
AutoCompactTrigger::AutoPreTurn,
|
||||
"requested",
|
||||
&[],
|
||||
);
|
||||
return Ok(Some(PendingServerSideCompaction {
|
||||
threshold,
|
||||
trigger: AutoCompactTrigger::AutoPreTurn,
|
||||
}));
|
||||
}
|
||||
record_inline_compaction_skip(sess, turn_context, AutoCompactTrigger::AutoPreTurn);
|
||||
run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
AutoCompactTrigger::AutoPreTurn,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Runs pre-sampling compaction against the previous model when switching to a smaller
|
||||
@@ -5893,10 +6266,18 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
&& previous_model_turn_context.model_info.slug != turn_context.model_info.slug
|
||||
&& old_context_window > new_context_window;
|
||||
if should_run {
|
||||
record_compaction_metric(
|
||||
sess,
|
||||
"client_side",
|
||||
AutoCompactTrigger::PreviousModelPreflight,
|
||||
"requested",
|
||||
&[],
|
||||
);
|
||||
run_auto_compact(
|
||||
sess,
|
||||
&previous_model_turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
AutoCompactTrigger::PreviousModelPreflight,
|
||||
)
|
||||
.await?;
|
||||
return Ok(true);
|
||||
@@ -5908,6 +6289,7 @@ async fn run_auto_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: AutoCompactTrigger,
|
||||
) -> CodexResult<()> {
|
||||
if should_use_remote_compact_task(&turn_context.provider) {
|
||||
run_inline_remote_auto_compact_task(
|
||||
@@ -5924,6 +6306,7 @@ async fn run_auto_compact(
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
record_compaction_metric(sess, "client_side", trigger, "applied", &[]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -6088,6 +6471,7 @@ fn build_prompt(
|
||||
router: &ToolRouter,
|
||||
turn_context: &TurnContext,
|
||||
base_instructions: BaseInstructions,
|
||||
inline_compaction_threshold: Option<i64>,
|
||||
) -> Prompt {
|
||||
Prompt {
|
||||
input,
|
||||
@@ -6096,6 +6480,7 @@ fn build_prompt(
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: turn_context.final_output_json_schema.clone(),
|
||||
inline_compaction_threshold,
|
||||
}
|
||||
}
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -6114,6 +6499,7 @@ async fn run_sampling_request(
|
||||
client_session: &mut ModelClientSession,
|
||||
turn_metadata_header: Option<&str>,
|
||||
input: Vec<ResponseItem>,
|
||||
inline_compaction_threshold: Option<i64>,
|
||||
explicitly_enabled_connectors: &HashSet<String>,
|
||||
skills_outcome: Option<&SkillLoadOutcome>,
|
||||
server_model_warning_emitted_for_turn: &mut bool,
|
||||
@@ -6136,6 +6522,7 @@ async fn run_sampling_request(
|
||||
router.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
base_instructions,
|
||||
inline_compaction_threshold,
|
||||
);
|
||||
let mut retries = 0;
|
||||
loop {
|
||||
@@ -6315,6 +6702,7 @@ pub(crate) async fn built_tools(
|
||||
struct SamplingRequestResult {
|
||||
needs_follow_up: bool,
|
||||
last_agent_message: Option<String>,
|
||||
observed_server_side_compaction: bool,
|
||||
}
|
||||
|
||||
/// Ephemeral per-response state for streaming a single proposed plan.
|
||||
@@ -6904,6 +7292,7 @@ async fn try_run_sampling_request(
|
||||
FuturesOrdered::new();
|
||||
let mut needs_follow_up = false;
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
let mut observed_server_side_compaction = false;
|
||||
let mut active_item: Option<TurnItem> = None;
|
||||
let mut should_emit_turn_diff = false;
|
||||
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
|
||||
@@ -6947,6 +7336,7 @@ async fn try_run_sampling_request(
|
||||
match event {
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
let saw_server_side_compaction = matches!(item, ResponseItem::Compaction { .. });
|
||||
let previously_active_item = active_item.take();
|
||||
if let Some(previous) = previously_active_item.as_ref()
|
||||
&& matches!(previous, TurnItem::AgentMessage(_))
|
||||
@@ -6985,6 +7375,7 @@ async fn try_run_sampling_request(
|
||||
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
|
||||
.instrument(handle_responses)
|
||||
.await?;
|
||||
observed_server_side_compaction |= saw_server_side_compaction;
|
||||
if let Some(tool_future) = output_result.tool_future {
|
||||
in_flight.push_back(tool_future);
|
||||
}
|
||||
@@ -7092,6 +7483,7 @@ async fn try_run_sampling_request(
|
||||
break Ok(SamplingRequestResult {
|
||||
needs_follow_up,
|
||||
last_agent_message,
|
||||
observed_server_side_compaction,
|
||||
});
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
|
||||
@@ -112,6 +112,7 @@ async fn run_remote_compact_task_inner_impl(
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: None,
|
||||
inline_compaction_threshold: None,
|
||||
};
|
||||
|
||||
let mut new_history = sess
|
||||
|
||||
@@ -170,6 +170,8 @@ pub enum Feature {
|
||||
Artifact,
|
||||
/// Enable Fast mode selection in the TUI and request layer.
|
||||
FastMode,
|
||||
/// Enable inline Responses API server-side compaction for auto-compaction flows.
|
||||
ServerSideCompaction,
|
||||
/// Enable voice transcription in the TUI composer.
|
||||
VoiceTranscription,
|
||||
/// Enable experimental realtime voice conversation mode in the TUI.
|
||||
@@ -796,6 +798,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::Stable,
|
||||
default_enabled: true,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::ServerSideCompaction,
|
||||
key: "server_side_compaction",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::VoiceTranscription,
|
||||
key: "voice_transcription",
|
||||
|
||||
@@ -340,6 +340,7 @@ mod job {
|
||||
},
|
||||
personality: None,
|
||||
output_schema: Some(output_schema()),
|
||||
inline_compaction_threshold: None,
|
||||
};
|
||||
|
||||
let mut client_session = session.services.model_client.new_session();
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_utils_stream_parser::strip_citations;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -163,6 +164,39 @@ pub(crate) async fn handle_output_item_done(
|
||||
let mut output = OutputItemResult::default();
|
||||
let plan_mode = ctx.turn_context.collaboration_mode.mode == ModeKind::Plan;
|
||||
|
||||
if matches!(item, ResponseItem::Compaction { .. }) {
|
||||
let had_active_compaction = matches!(
|
||||
previously_active_item.as_ref(),
|
||||
Some(TurnItem::ContextCompaction(_))
|
||||
);
|
||||
let compaction_item = match previously_active_item {
|
||||
Some(TurnItem::ContextCompaction(item)) => item,
|
||||
_ => ContextCompactionItem::new(),
|
||||
};
|
||||
if !had_active_compaction {
|
||||
ctx.sess
|
||||
.emit_turn_item_started(
|
||||
&ctx.turn_context,
|
||||
&TurnItem::ContextCompaction(compaction_item.clone()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
debug!(
|
||||
turn_id = %ctx.turn_context.sub_id,
|
||||
"processing streamed server-side compaction item"
|
||||
);
|
||||
ctx.sess
|
||||
.apply_server_side_compaction_checkpoint(ctx.turn_context.as_ref(), item)
|
||||
.await;
|
||||
ctx.sess
|
||||
.emit_turn_item_completed(
|
||||
&ctx.turn_context,
|
||||
TurnItem::ContextCompaction(compaction_item),
|
||||
)
|
||||
.await;
|
||||
return Ok(output);
|
||||
}
|
||||
|
||||
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
|
||||
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
|
||||
Ok(Some(call)) => {
|
||||
@@ -335,6 +369,9 @@ pub(crate) async fn handle_non_tool_response_item(
|
||||
}
|
||||
Some(turn_item)
|
||||
}
|
||||
ResponseItem::Compaction { .. } => {
|
||||
Some(TurnItem::ContextCompaction(ContextCompactionItem::new()))
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
|
||||
debug!("unexpected tool output from stream");
|
||||
None
|
||||
|
||||
@@ -626,6 +626,16 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ev_compaction(encrypted_content: &str) -> Value {
|
||||
serde_json::json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "compaction",
|
||||
"encrypted_content": encrypted_content,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn user_message_item(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::path::PathBuf;
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::features::Feature;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -192,6 +193,20 @@ fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
|
||||
);
|
||||
}
|
||||
|
||||
async fn submit_text_turn_and_wait(codex: &codex_core::CodexThread, text: &str) -> Result<()> {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: text.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -403,6 +418,246 @@ async fn remote_compact_runs_automatically() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_server_side_compaction_uses_inline_context_management() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
let first_turn_text = "inline compact turn one";
|
||||
let third_turn_text = "inline compact turn three";
|
||||
let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY");
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
])),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_compaction(&inline_summary),
|
||||
responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-2", 80),
|
||||
])),
|
||||
responses::sse_response(sse(vec![responses::ev_completed("resp-3")])),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, first_turn_text).await?;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "inline compact turn two".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
let compacted = wait_for_event_match(&codex, |event| match event {
|
||||
EventMsg::ContextCompacted(_) => Some(true),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
assert!(compacted, "expected inline compaction event");
|
||||
|
||||
submit_text_turn_and_wait(&codex, third_turn_text).await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected three inline /responses requests"
|
||||
);
|
||||
|
||||
let inline_request = requests[1].body_json();
|
||||
assert_eq!(
|
||||
inline_request.get("context_management"),
|
||||
Some(&json!([{
|
||||
"type": "compaction",
|
||||
"compact_threshold": compact_threshold,
|
||||
}])),
|
||||
);
|
||||
|
||||
let post_inline_request = &requests[2];
|
||||
assert!(
|
||||
post_inline_request.body_contains_text(&inline_summary),
|
||||
"expected subsequent request to reuse inline compaction item"
|
||||
);
|
||||
assert!(
|
||||
!post_inline_request.body_contains_text(first_turn_text),
|
||||
"expected pre-compaction user history to be dropped after inline compaction"
|
||||
);
|
||||
assert!(
|
||||
!post_inline_request.body_contains_text("FIRST_REMOTE_REPLY"),
|
||||
"expected pre-compaction assistant history to be dropped after inline compaction"
|
||||
);
|
||||
assert!(
|
||||
post_inline_request.body_contains_text(third_turn_text),
|
||||
"expected next turn to append normally after inline compaction"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_server_side_compaction_uses_legacy_remote_path_with_custom_prompt() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
let custom_compact_prompt = "CUSTOM_REMOTE_COMPACT_PROMPT";
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
config.compact_prompt = Some(custom_compact_prompt.to_string());
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
])),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m2", "AFTER_REMOTE_COMPACT_REPLY"),
|
||||
responses::ev_completed("resp-2"),
|
||||
])),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let compact_mock = responses::mount_compact_user_history_with_summary_once(
|
||||
harness.server(),
|
||||
"CUSTOM_PROMPT_REMOTE_SUMMARY",
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, "custom prompt turn one").await?;
|
||||
submit_text_turn_and_wait(&codex, "custom prompt turn two").await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(requests.len(), 2, "expected two /responses requests");
|
||||
assert_eq!(
|
||||
compact_mock.requests().len(),
|
||||
1,
|
||||
"expected remote compact endpoint to handle the auto-compaction"
|
||||
);
|
||||
assert!(
|
||||
requests[1].body_json().get("context_management").is_none(),
|
||||
"custom compact prompt should opt out of inline compaction"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_server_side_compaction_downgrades_known_compat_errors_once() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
])),
|
||||
ResponseTemplate::new(400).set_body_json(json!({
|
||||
"error": {
|
||||
"message": "Unknown field `context_management` on request body",
|
||||
}
|
||||
})),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m2", "AFTER_DOWNGRADE_REPLY"),
|
||||
responses::ev_completed("resp-2"),
|
||||
])),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let compact_mock = responses::mount_compact_user_history_with_summary_once(
|
||||
harness.server(),
|
||||
"DOWNGRADE_REMOTE_SUMMARY",
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, "downgrade turn one").await?;
|
||||
submit_text_turn_and_wait(&codex, "downgrade turn two").await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected initial turn, failed inline attempt, and downgraded follow-up request"
|
||||
);
|
||||
|
||||
let inline_attempt = requests[1].body_json();
|
||||
assert_eq!(
|
||||
inline_attempt.get("context_management"),
|
||||
Some(&json!([{
|
||||
"type": "compaction",
|
||||
"compact_threshold": compact_threshold,
|
||||
}])),
|
||||
);
|
||||
|
||||
let downgraded_request = requests[2].body_json();
|
||||
assert!(
|
||||
downgraded_request.get("context_management").is_none(),
|
||||
"downgraded retry should fall back to the legacy client-side request shape"
|
||||
);
|
||||
assert!(
|
||||
requests[2].body_contains_text("DOWNGRADE_REMOTE_SUMMARY"),
|
||||
"downgraded retry should reuse the client-side compaction output"
|
||||
);
|
||||
assert_eq!(
|
||||
compact_mock.requests().len(),
|
||||
1,
|
||||
"expected one legacy remote compact request after the compat error"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "windows", ignore)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {
|
||||
|
||||
Reference in New Issue
Block a user