mirror of
https://github.com/openai/codex.git
synced 2026-03-10 16:43:25 +00:00
Compare commits
23 Commits
shaqayeq/p
...
cooper/ser
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e5c94b9fa9 | ||
|
|
d6f31e60a1 | ||
|
|
0ac2853dd3 | ||
|
|
847146d0b0 | ||
|
|
fe9694d418 | ||
|
|
0fc23fcf0f | ||
|
|
9d7e5738d5 | ||
|
|
5d5cb4263b | ||
|
|
8804cdaa67 | ||
|
|
7aa1879a85 | ||
|
|
9a616a9d7d | ||
|
|
2084086dc7 | ||
|
|
bd11c80e5d | ||
|
|
a0b8610c09 | ||
|
|
a2e0e9c25b | ||
|
|
057ab52fff | ||
|
|
641af07920 | ||
|
|
17acf74589 | ||
|
|
c64f5c4db4 | ||
|
|
0ba92b24f2 | ||
|
|
cbdbbb13dd | ||
|
|
2721aa5c4d | ||
|
|
148fa29ed1 |
@@ -140,6 +140,23 @@ impl From<VerbosityConfig> for OpenAiVerbosity {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ContextManagement {
|
||||
pub r#type: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub compact_threshold: Option<i64>,
|
||||
}
|
||||
|
||||
impl ContextManagement {
|
||||
pub fn compaction(compact_threshold: i64) -> Self {
|
||||
Self {
|
||||
r#type: "compaction".to_string(),
|
||||
compact_threshold: Some(compact_threshold),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone, PartialEq)]
|
||||
pub struct ResponsesApiRequest {
|
||||
pub model: String,
|
||||
@@ -158,6 +175,8 @@ pub struct ResponsesApiRequest {
|
||||
pub prompt_cache_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub text: Option<TextControls>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub context_management: Option<Vec<ContextManagement>>,
|
||||
}
|
||||
|
||||
impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
||||
@@ -177,6 +196,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
||||
service_tier: request.service_tier.clone(),
|
||||
prompt_cache_key: request.prompt_cache_key.clone(),
|
||||
text: request.text.clone(),
|
||||
context_management: request.context_management.clone(),
|
||||
generate: None,
|
||||
client_metadata: None,
|
||||
}
|
||||
@@ -204,6 +224,8 @@ pub struct ResponseCreateWsRequest {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub text: Option<TextControls>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub context_management: Option<Vec<ContextManagement>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
|
||||
@@ -268,6 +268,7 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
|
||||
service_tier: None,
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
let client = ResponsesClient::new(transport.clone(), provider, NoAuth);
|
||||
|
||||
@@ -310,6 +311,7 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
|
||||
service_tier: None,
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let mut extra_headers = HeaderMap::new();
|
||||
|
||||
@@ -428,6 +428,9 @@
|
||||
"search_tool": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"server_side_compaction": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"shell_snapshot": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1908,6 +1911,9 @@
|
||||
"search_tool": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"server_side_compaction": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"shell_snapshot": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -52,6 +52,7 @@ use codex_api::SseTelemetry;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::WebsocketTelemetry;
|
||||
use codex_api::build_conversation_headers;
|
||||
use codex_api::common::ContextManagement as ApiContextManagement;
|
||||
use codex_api::common::Reasoning;
|
||||
use codex_api::common::ResponsesWsRequest;
|
||||
use codex_api::create_text_param_for_request;
|
||||
@@ -533,6 +534,10 @@ impl ModelClientSession {
|
||||
};
|
||||
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
|
||||
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
|
||||
let context_management = prompt
|
||||
.inline_compaction_threshold
|
||||
.map(ApiContextManagement::compaction)
|
||||
.map(|entry| vec![entry]);
|
||||
let request = ResponsesApiRequest {
|
||||
model: model_info.slug.clone(),
|
||||
instructions: instructions.clone(),
|
||||
@@ -551,6 +556,7 @@ impl ModelClientSession {
|
||||
},
|
||||
prompt_cache_key,
|
||||
text,
|
||||
context_management,
|
||||
};
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
@@ -42,6 +42,9 @@ pub struct Prompt {
|
||||
|
||||
/// Optional the output schema for the model's response.
|
||||
pub output_schema: Option<Value>,
|
||||
|
||||
/// Optional inline server-side compaction threshold for the next Responses request.
|
||||
pub inline_compaction_threshold: Option<i64>,
|
||||
}
|
||||
|
||||
impl Prompt {
|
||||
@@ -316,6 +319,7 @@ mod tests {
|
||||
verbosity: Some(OpenAiVerbosity::Low),
|
||||
format: None,
|
||||
}),
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
@@ -355,6 +359,7 @@ mod tests {
|
||||
prompt_cache_key: None,
|
||||
service_tier: None,
|
||||
text: Some(text_controls),
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
@@ -392,6 +397,7 @@ mod tests {
|
||||
prompt_cache_key: None,
|
||||
service_tier: None,
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
@@ -414,6 +420,7 @@ mod tests {
|
||||
prompt_cache_key: None,
|
||||
service_tier: Some(ServiceTier::Flex.to_string()),
|
||||
text: None,
|
||||
context_management: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
|
||||
@@ -41,6 +41,7 @@ use crate::realtime_conversation::handle_start as handle_realtime_conversation_s
|
||||
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
|
||||
use crate::rollout::session_index;
|
||||
use crate::stream_events_utils::HandleOutputCtx;
|
||||
use crate::stream_events_utils::ServerSideCompaction;
|
||||
use crate::stream_events_utils::handle_non_tool_response_item;
|
||||
use crate::stream_events_utils::handle_output_item_done;
|
||||
use crate::stream_events_utils::last_assistant_message_from_item;
|
||||
@@ -2345,6 +2346,7 @@ impl Session {
|
||||
BaseInstructions {
|
||||
text: base_instructions,
|
||||
},
|
||||
None,
|
||||
);
|
||||
let startup_turn_metadata_header = startup_turn_context
|
||||
.turn_metadata_state
|
||||
@@ -2851,6 +2853,7 @@ impl Session {
|
||||
AskForApproval::Never => {
|
||||
return Some(RequestPermissionsResponse {
|
||||
permissions: PermissionProfile::default(),
|
||||
scope: PermissionGrantScope::Turn,
|
||||
});
|
||||
}
|
||||
AskForApproval::Reject(reject_config)
|
||||
@@ -2858,6 +2861,7 @@ impl Session {
|
||||
{
|
||||
return Some(RequestPermissionsResponse {
|
||||
permissions: PermissionProfile::default(),
|
||||
scope: PermissionGrantScope::Turn,
|
||||
});
|
||||
}
|
||||
AskForApproval::OnFailure
|
||||
@@ -3175,6 +3179,36 @@ impl Session {
|
||||
state.record_items(items.iter(), turn_context.truncation_policy);
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_server_side_compaction(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
item: ResponseItem,
|
||||
) {
|
||||
let current_history = self.clone_history().await;
|
||||
let replacement_history = build_server_side_compaction_replacement_history(
|
||||
item.clone(),
|
||||
current_history.raw_items(),
|
||||
);
|
||||
|
||||
let reference_context_item = Some(turn_context.to_turn_context_item());
|
||||
let compacted_item = CompactedItem {
|
||||
message: String::new(),
|
||||
replacement_history: Some(replacement_history.clone()),
|
||||
};
|
||||
self.replace_compacted_history(replacement_history, reference_context_item, compacted_item)
|
||||
.await;
|
||||
self.recompute_token_usage(turn_context).await;
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"applied local server-side compaction"
|
||||
);
|
||||
self.services.session_telemetry.counter(
|
||||
"codex.compaction_checkpoint_applied",
|
||||
1,
|
||||
&[("mode", "server_side")],
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) async fn record_model_warning(&self, message: impl Into<String>, ctx: &TurnContext) {
|
||||
self.services
|
||||
.session_telemetry
|
||||
@@ -3275,7 +3309,6 @@ impl Session {
|
||||
pub(crate) fn features(&self) -> ManagedFeatures {
|
||||
self.features.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn collaboration_mode(&self) -> CollaborationMode {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.collaboration_mode.clone()
|
||||
@@ -3450,7 +3483,7 @@ impl Session {
|
||||
pub(crate) async fn record_context_updates_and_set_reference_context_item(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) {
|
||||
) -> Vec<ResponseItem> {
|
||||
let reference_context_item = {
|
||||
let state = self.state.lock().await;
|
||||
state.reference_context_item()
|
||||
@@ -3477,6 +3510,7 @@ impl Session {
|
||||
// context items. This keeps later runtime diffing aligned with the current turn state.
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_reference_context_item(Some(turn_context_item));
|
||||
context_items
|
||||
}
|
||||
|
||||
pub(crate) async fn update_token_usage_info(
|
||||
@@ -5282,11 +5316,8 @@ pub(crate) async fn run_turn(
|
||||
// new user message are recorded. Estimate pending incoming items (context
|
||||
// diffs/full reinjection + user input) and trigger compaction preemptively
|
||||
// when they would push the thread over the compaction threshold.
|
||||
if run_pre_sampling_compact(&sess, &turn_context)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
error!("Failed to run pre-sampling compact");
|
||||
if let Err(err) = run_pre_sampling_compact(&sess, &turn_context).await {
|
||||
error!("Failed to run pre-sampling compact: {err}");
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -5432,8 +5463,12 @@ pub(crate) async fn run_turn(
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.clone().into();
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
.await;
|
||||
sess.record_user_prompt_and_emit_turn_item(
|
||||
turn_context.as_ref(),
|
||||
&input,
|
||||
response_item.clone(),
|
||||
)
|
||||
.await;
|
||||
// Track the previous-turn baseline from the regular user-turn path only so
|
||||
// standalone tasks (compact/shell/review/undo) cannot suppress future
|
||||
// model/realtime injections.
|
||||
@@ -5519,6 +5554,7 @@ pub(crate) async fn run_turn(
|
||||
&mut client_session,
|
||||
turn_metadata_header.as_deref(),
|
||||
sampling_request_input,
|
||||
inline_server_side_compaction_threshold(&sess, &turn_context),
|
||||
&turn_enabled_connectors,
|
||||
skills_outcome,
|
||||
&mut server_model_warning_emitted_for_turn,
|
||||
@@ -5527,9 +5563,22 @@ pub(crate) async fn run_turn(
|
||||
.await
|
||||
{
|
||||
Ok(sampling_request_output) => {
|
||||
if inline_server_side_compaction_threshold(&sess, &turn_context).is_some() {
|
||||
let result = if sampling_request_output.observed_server_side_compaction {
|
||||
"applied"
|
||||
} else {
|
||||
"skipped"
|
||||
};
|
||||
sess.services.session_telemetry.counter(
|
||||
"codex.inline_compaction",
|
||||
1,
|
||||
&[("result", result)],
|
||||
);
|
||||
}
|
||||
let SamplingRequestResult {
|
||||
needs_follow_up,
|
||||
last_agent_message: sampling_request_last_agent_message,
|
||||
observed_server_side_compaction: _,
|
||||
} = sampling_request_output;
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
|
||||
@@ -5549,7 +5598,8 @@ pub(crate) async fn run_turn(
|
||||
|
||||
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
|
||||
if token_limit_reached && needs_follow_up {
|
||||
if run_auto_compact(
|
||||
if inline_server_side_compaction_threshold(&sess, &turn_context).is_some() {
|
||||
} else if run_auto_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::BeforeLastUserMessage,
|
||||
@@ -5659,24 +5709,59 @@ pub(crate) async fn run_turn(
|
||||
last_agent_message
|
||||
}
|
||||
|
||||
fn build_server_side_compaction_replacement_history(
|
||||
compaction_item: ResponseItem,
|
||||
history_at_compaction: &[ResponseItem],
|
||||
) -> Vec<ResponseItem> {
|
||||
let mut replacement_history = vec![compaction_item];
|
||||
replacement_history.extend(
|
||||
history_at_compaction
|
||||
.iter()
|
||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned(),
|
||||
);
|
||||
replacement_history
|
||||
}
|
||||
|
||||
fn inline_server_side_compaction_threshold(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
) -> Option<i64> {
|
||||
if !sess.enabled(Feature::ServerSideCompaction) {
|
||||
return None;
|
||||
}
|
||||
if !should_use_remote_compact_task(&turn_context.provider) {
|
||||
return None;
|
||||
}
|
||||
// OpenAI inline auto-compaction uses Responses `context_management`, which has no
|
||||
// compaction-prompt field. Auto-compaction therefore ignores `compact_prompt`, while manual
|
||||
// `/compact` still uses the point-in-time compact endpoint.
|
||||
turn_context.model_info.auto_compact_token_limit()
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
) -> CodexResult<()> {
|
||||
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
|
||||
maybe_run_previous_model_inline_compact(
|
||||
if maybe_run_previous_model_inline_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
total_usage_tokens_before_compaction,
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let auto_compact_limit = turn_context
|
||||
.model_info
|
||||
.auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
// Compact if the total usage tokens are greater than the auto compact limit
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
// Compact if the total usage tokens are greater than or equal to the auto-compact limit.
|
||||
if total_usage_tokens >= auto_compact_limit
|
||||
&& inline_server_side_compaction_threshold(sess, turn_context).is_none()
|
||||
{
|
||||
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -5911,6 +5996,7 @@ fn build_prompt(
|
||||
router: &ToolRouter,
|
||||
turn_context: &TurnContext,
|
||||
base_instructions: BaseInstructions,
|
||||
inline_compaction_threshold: Option<i64>,
|
||||
) -> Prompt {
|
||||
Prompt {
|
||||
input,
|
||||
@@ -5919,6 +6005,7 @@ fn build_prompt(
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: turn_context.final_output_json_schema.clone(),
|
||||
inline_compaction_threshold,
|
||||
}
|
||||
}
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -5937,6 +6024,7 @@ async fn run_sampling_request(
|
||||
client_session: &mut ModelClientSession,
|
||||
turn_metadata_header: Option<&str>,
|
||||
input: Vec<ResponseItem>,
|
||||
inline_compaction_threshold: Option<i64>,
|
||||
explicitly_enabled_connectors: &HashSet<String>,
|
||||
skills_outcome: Option<&SkillLoadOutcome>,
|
||||
server_model_warning_emitted_for_turn: &mut bool,
|
||||
@@ -5959,6 +6047,7 @@ async fn run_sampling_request(
|
||||
router.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
base_instructions,
|
||||
inline_compaction_threshold,
|
||||
);
|
||||
let mut retries = 0;
|
||||
loop {
|
||||
@@ -6138,6 +6227,7 @@ async fn built_tools(
|
||||
struct SamplingRequestResult {
|
||||
needs_follow_up: bool,
|
||||
last_agent_message: Option<String>,
|
||||
observed_server_side_compaction: bool,
|
||||
}
|
||||
|
||||
/// Ephemeral per-response state for streaming a single proposed plan.
|
||||
@@ -6726,6 +6816,7 @@ async fn try_run_sampling_request(
|
||||
FuturesOrdered::new();
|
||||
let mut needs_follow_up = false;
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
let mut observed_server_side_compaction = false;
|
||||
let mut active_item: Option<TurnItem> = None;
|
||||
let mut should_emit_turn_diff = false;
|
||||
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
|
||||
@@ -6769,6 +6860,7 @@ async fn try_run_sampling_request(
|
||||
match event {
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
let saw_server_side_compaction = matches!(item, ResponseItem::Compaction { .. });
|
||||
let previously_active_item = active_item.take();
|
||||
if let Some(previous) = previously_active_item.as_ref()
|
||||
&& matches!(previous, TurnItem::AgentMessage(_))
|
||||
@@ -6807,6 +6899,16 @@ async fn try_run_sampling_request(
|
||||
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
|
||||
.instrument(handle_responses)
|
||||
.await?;
|
||||
observed_server_side_compaction |= saw_server_side_compaction;
|
||||
if let Some(ServerSideCompaction { item, turn_item }) =
|
||||
output_result.server_side_compaction
|
||||
{
|
||||
sess.apply_server_side_compaction(turn_context.as_ref(), item)
|
||||
.await;
|
||||
sess.emit_turn_item_started(&turn_context, &turn_item).await;
|
||||
sess.emit_turn_item_completed(&turn_context, turn_item)
|
||||
.await;
|
||||
}
|
||||
if let Some(tool_future) = output_result.tool_future {
|
||||
in_flight.push_back(tool_future);
|
||||
}
|
||||
@@ -6820,6 +6922,10 @@ async fn try_run_sampling_request(
|
||||
handle_non_tool_response_item(&item, plan_mode, Some(&turn_context.cwd)).await
|
||||
{
|
||||
let mut turn_item = turn_item;
|
||||
if matches!(turn_item, TurnItem::ContextCompaction(_)) {
|
||||
active_item = Some(turn_item);
|
||||
continue;
|
||||
}
|
||||
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
|
||||
let mut seeded_item_id: Option<String> = None;
|
||||
if matches!(turn_item, TurnItem::AgentMessage(_))
|
||||
@@ -6909,6 +7015,7 @@ async fn try_run_sampling_request(
|
||||
break Ok(SamplingRequestResult {
|
||||
needs_follow_up,
|
||||
last_agent_message,
|
||||
observed_server_side_compaction,
|
||||
});
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
|
||||
@@ -55,6 +55,7 @@ use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::router::ToolCallSource;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use codex_app_server_protocol::AppInfo;
|
||||
use codex_git::GhostCommit;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
@@ -106,6 +107,30 @@ fn user_message(text: &str) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
fn developer_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "developer".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn environment_context_message(cwd: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: format!("<environment_context>\n<cwd>{cwd}</cwd>\n</environment_context>"),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -130,6 +155,12 @@ fn skill_message(text: &str) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
fn ghost_snapshot(id: &str) -> ResponseItem {
|
||||
ResponseItem::GhostSnapshot {
|
||||
ghost_commit: GhostCommit::new(id.to_string(), None, Vec::new(), Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn developer_input_texts(items: &[ResponseItem]) -> Vec<&str> {
|
||||
items
|
||||
.iter()
|
||||
@@ -227,6 +258,42 @@ fn assistant_message_stream_parsers_seed_plan_parser_across_added_and_delta_boun
|
||||
assert!(tail.plan_segments.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_server_side_compaction_replacement_history_replaces_prompt_history() {
|
||||
let prior_snapshot = ghost_snapshot("ghost-before");
|
||||
let same_turn_snapshot = ghost_snapshot("ghost-during");
|
||||
let current_history = vec![
|
||||
user_message("earlier"),
|
||||
prior_snapshot.clone(),
|
||||
developer_message("<model_switch>\nuse the new model"),
|
||||
environment_context_message("/fresh"),
|
||||
user_message("current turn"),
|
||||
skill_message(
|
||||
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
developer_message("PLUGIN_HINT"),
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_text("tool result".to_string()),
|
||||
},
|
||||
same_turn_snapshot.clone(),
|
||||
ResponseItem::Compaction {
|
||||
encrypted_content: "INLINE_SUMMARY_1".to_string(),
|
||||
},
|
||||
];
|
||||
let compaction_item = ResponseItem::Compaction {
|
||||
encrypted_content: "INLINE_SUMMARY_2".to_string(),
|
||||
};
|
||||
|
||||
let replacement_history =
|
||||
build_server_side_compaction_replacement_history(compaction_item.clone(), ¤t_history);
|
||||
|
||||
assert_eq!(
|
||||
replacement_history,
|
||||
vec![compaction_item, prior_snapshot, same_turn_snapshot]
|
||||
);
|
||||
}
|
||||
|
||||
fn make_mcp_tool(
|
||||
server_name: &str,
|
||||
tool_name: &str,
|
||||
@@ -3629,6 +3696,62 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inline_compaction_output_item_returns_immediate_compaction_apply_signal() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
let router = Arc::new(ToolRouter::from_config(
|
||||
&tc.tools_config,
|
||||
None,
|
||||
None,
|
||||
tc.dynamic_tools.as_slice(),
|
||||
));
|
||||
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
let mut ctx = crate::stream_events_utils::HandleOutputCtx {
|
||||
sess: Arc::clone(&sess),
|
||||
turn_context: Arc::clone(&tc),
|
||||
tool_runtime: crate::tools::parallel::ToolCallRuntime::new(
|
||||
router,
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&tc),
|
||||
tracker,
|
||||
),
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
};
|
||||
let item = ResponseItem::Compaction {
|
||||
encrypted_content: "INLINE_SERVER_SUMMARY".to_string(),
|
||||
};
|
||||
|
||||
let output = crate::stream_events_utils::handle_output_item_done(
|
||||
&mut ctx,
|
||||
item.clone(),
|
||||
Some(TurnItem::ContextCompaction(
|
||||
codex_protocol::items::ContextCompactionItem::new(),
|
||||
)),
|
||||
)
|
||||
.await
|
||||
.expect("handle output item");
|
||||
|
||||
assert!(output.server_side_compaction.is_some());
|
||||
assert!(output.tool_future.is_none());
|
||||
assert!(!output.needs_follow_up);
|
||||
assert!(output.last_agent_message.is_none());
|
||||
|
||||
let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
|
||||
.await
|
||||
.expect("expected raw response item event")
|
||||
.expect("channel open");
|
||||
assert!(matches!(
|
||||
first.msg,
|
||||
EventMsg::RawResponseItem(raw) if raw.item == item
|
||||
));
|
||||
assert!(
|
||||
rx.try_recv().is_err(),
|
||||
"expected the caller to decide when to commit the compaction lifecycle"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn steer_input_requires_active_turn() {
|
||||
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
|
||||
|
||||
@@ -99,6 +99,7 @@ async fn run_remote_compact_task_inner_impl(
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: None,
|
||||
inline_compaction_threshold: None,
|
||||
};
|
||||
|
||||
let mut new_history = sess
|
||||
|
||||
@@ -162,6 +162,8 @@ pub enum Feature {
|
||||
Artifact,
|
||||
/// Enable Fast mode selection in the TUI and request layer.
|
||||
FastMode,
|
||||
/// Enable inline Responses API server-side compaction for auto-compaction flows.
|
||||
ServerSideCompaction,
|
||||
/// Enable voice transcription in the TUI composer.
|
||||
VoiceTranscription,
|
||||
/// Enable experimental realtime voice conversation mode in the TUI.
|
||||
@@ -746,6 +748,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::Stable,
|
||||
default_enabled: true,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::ServerSideCompaction,
|
||||
key: "server_side_compaction",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::VoiceTranscription,
|
||||
key: "voice_transcription",
|
||||
|
||||
@@ -340,6 +340,7 @@ mod job {
|
||||
},
|
||||
personality: None,
|
||||
output_schema: Some(output_schema()),
|
||||
inline_compaction_threshold: None,
|
||||
};
|
||||
|
||||
let mut client_session = session.services.model_client.new_session();
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_utils_stream_parser::strip_citations;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -24,6 +25,8 @@ use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use codex_utils_stream_parser::strip_proposed_plan_blocks;
|
||||
use futures::Future;
|
||||
use tracing::debug;
|
||||
@@ -144,9 +147,15 @@ pub(crate) type InFlightFuture<'f> =
|
||||
pub(crate) struct OutputItemResult {
|
||||
pub last_agent_message: Option<String>,
|
||||
pub needs_follow_up: bool,
|
||||
pub server_side_compaction: Option<ServerSideCompaction>,
|
||||
pub tool_future: Option<InFlightFuture<'static>>,
|
||||
}
|
||||
|
||||
pub(crate) struct ServerSideCompaction {
|
||||
pub item: ResponseItem,
|
||||
pub turn_item: TurnItem,
|
||||
}
|
||||
|
||||
pub(crate) struct HandleOutputCtx {
|
||||
pub sess: Arc<Session>,
|
||||
pub turn_context: Arc<TurnContext>,
|
||||
@@ -163,6 +172,27 @@ pub(crate) async fn handle_output_item_done(
|
||||
let mut output = OutputItemResult::default();
|
||||
let plan_mode = ctx.turn_context.collaboration_mode.mode == ModeKind::Plan;
|
||||
|
||||
if matches!(item, ResponseItem::Compaction { .. }) {
|
||||
let turn_item = TurnItem::ContextCompaction(match previously_active_item {
|
||||
Some(TurnItem::ContextCompaction(item)) => item,
|
||||
_ => ContextCompactionItem::new(),
|
||||
});
|
||||
// Preserve the raw wire event immediately; the caller rewrites local history inline and
|
||||
// then continues appending later same-turn output after the compaction item.
|
||||
debug!(
|
||||
turn_id = %ctx.turn_context.sub_id,
|
||||
"emitting streamed server-side raw compaction item for immediate local checkpoint apply"
|
||||
);
|
||||
ctx.sess
|
||||
.send_event(
|
||||
&ctx.turn_context,
|
||||
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
|
||||
)
|
||||
.await;
|
||||
output.server_side_compaction = Some(ServerSideCompaction { item, turn_item });
|
||||
return Ok(output);
|
||||
}
|
||||
|
||||
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
|
||||
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
|
||||
Ok(Some(call)) => {
|
||||
@@ -320,6 +350,9 @@ pub(crate) async fn handle_non_tool_response_item(
|
||||
}
|
||||
Some(turn_item)
|
||||
}
|
||||
ResponseItem::Compaction { .. } => {
|
||||
Some(TurnItem::ContextCompaction(ContextCompactionItem::new()))
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
|
||||
debug!("unexpected tool output from stream");
|
||||
None
|
||||
|
||||
@@ -621,6 +621,16 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ev_compaction(encrypted_content: &str) -> Value {
|
||||
serde_json::json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "compaction",
|
||||
"encrypted_content": encrypted_content,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn user_message_item(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
|
||||
@@ -5,10 +5,17 @@ use std::path::PathBuf;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::ModelProviderInfo;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::models_manager::manager::RefreshStrategy;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
@@ -19,11 +26,13 @@ use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::context_snapshot;
|
||||
use core_test_support::context_snapshot::ContextSnapshotOptions;
|
||||
use core_test_support::context_snapshot::ContextSnapshotRenderMode;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::mount_models_once;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
@@ -59,6 +68,18 @@ fn summary_with_prefix(summary: &str) -> String {
|
||||
format!("{SUMMARY_PREFIX}\n{summary}")
|
||||
}
|
||||
|
||||
fn model_info_with_context_window(slug: &str, context_window: i64) -> ModelInfo {
|
||||
let models_response: ModelsResponse =
|
||||
serde_json::from_str(include_str!("../../models.json")).expect("valid models.json");
|
||||
let mut model_info = models_response
|
||||
.models
|
||||
.into_iter()
|
||||
.find(|model| model.slug == slug)
|
||||
.unwrap_or_else(|| panic!("model `{slug}` missing from models.json"));
|
||||
model_info.context_window = Some(context_window);
|
||||
model_info
|
||||
}
|
||||
|
||||
fn context_snapshot_options() -> ContextSnapshotOptions {
|
||||
ContextSnapshotOptions::default()
|
||||
.render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 })
|
||||
@@ -173,6 +194,20 @@ fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
|
||||
);
|
||||
}
|
||||
|
||||
async fn submit_text_turn_and_wait(codex: &codex_core::CodexThread, text: &str) -> Result<()> {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: text.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -362,6 +397,387 @@ async fn remote_compact_runs_automatically() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_server_side_compaction_uses_inline_context_management() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
let first_turn_text = "inline compact turn one";
|
||||
let third_turn_text = "inline compact turn three";
|
||||
let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY");
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
])),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_compaction(&inline_summary),
|
||||
responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-2", 80),
|
||||
])),
|
||||
responses::sse_response(sse(vec![responses::ev_completed("resp-3")])),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, first_turn_text).await?;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "inline compact turn two".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
let compacted = wait_for_event_match(&codex, |event| match event {
|
||||
EventMsg::ContextCompacted(_) => Some(true),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
assert!(compacted, "expected inline compaction event");
|
||||
|
||||
submit_text_turn_and_wait(&codex, third_turn_text).await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected three inline /responses requests"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
requests[0].body_json().get("context_management"),
|
||||
Some(&json!([{
|
||||
"type": "compaction",
|
||||
"compact_threshold": compact_threshold,
|
||||
}])),
|
||||
);
|
||||
let inline_request = requests[1].body_json();
|
||||
assert_eq!(
|
||||
inline_request.get("context_management"),
|
||||
Some(&json!([{
|
||||
"type": "compaction",
|
||||
"compact_threshold": compact_threshold,
|
||||
}])),
|
||||
);
|
||||
|
||||
let post_inline_request = &requests[2];
|
||||
assert_eq!(
|
||||
post_inline_request.body_json().get("context_management"),
|
||||
Some(&json!([{
|
||||
"type": "compaction",
|
||||
"compact_threshold": compact_threshold,
|
||||
}])),
|
||||
);
|
||||
let post_inline_body = post_inline_request.body_json().to_string();
|
||||
assert!(
|
||||
post_inline_request.body_contains_text(&inline_summary),
|
||||
"expected subsequent request to reuse inline compaction item"
|
||||
);
|
||||
assert!(
|
||||
!post_inline_request.body_contains_text("<permissions instructions>"),
|
||||
"expected subsequent request to rely on the compaction item rather than duplicate canonical context"
|
||||
);
|
||||
assert!(
|
||||
!post_inline_request.body_contains_text(first_turn_text),
|
||||
"expected pre-compaction user history to be dropped after inline compaction"
|
||||
);
|
||||
assert!(
|
||||
!post_inline_request.body_contains_text("FIRST_REMOTE_REPLY"),
|
||||
"expected pre-compaction assistant history to be dropped after inline compaction"
|
||||
);
|
||||
assert!(
|
||||
post_inline_request.body_contains_text(third_turn_text),
|
||||
"expected next turn to append normally after inline compaction"
|
||||
);
|
||||
assert!(
|
||||
post_inline_body
|
||||
.find("INLINE_SERVER_SUMMARY")
|
||||
.expect("inline compaction marker in subsequent request")
|
||||
< post_inline_body
|
||||
.find("AFTER_INLINE_REPLY")
|
||||
.expect("post-compaction assistant reply in subsequent request"),
|
||||
"expected post-compaction transcript items to remain after the inline compaction item"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn snapshot_request_shape_auto_server_side_compaction_history_layout() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY");
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
])),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_compaction(&inline_summary),
|
||||
responses::ev_assistant_message("m2", "AFTER_INLINE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-2", 80),
|
||||
])),
|
||||
responses::sse_response(sse(vec![responses::ev_completed("resp-3")])),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, "inline compact turn one").await?;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "inline compact turn two".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |event| {
|
||||
matches!(event, EventMsg::ContextCompacted(_))
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, "inline compact turn three").await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected three inline /responses requests"
|
||||
);
|
||||
|
||||
insta::assert_snapshot!(
|
||||
"auto_server_side_compaction_history_layout_shapes",
|
||||
format_labeled_requests_snapshot(
|
||||
"Inline server-side compaction: the compacting turn opts into context_management, and the next request reuses the streamed compaction checkpoint directly.",
|
||||
&[
|
||||
("Inline Compaction Request", &requests[1]),
|
||||
("Post-Inline-Compaction History Layout", &requests[2]),
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_server_side_compaction_follow_ups_use_compaction_checkpoint() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
let first_turn_text = "inline compact turn one";
|
||||
let second_turn_text = "inline compact turn two";
|
||||
let inline_summary = summary_with_prefix("INLINE_SERVER_SUMMARY");
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_sse_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_compaction(&inline_summary),
|
||||
responses::ev_function_call("call-inline-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
|
||||
responses::ev_completed_with_tokens("resp-2", 80),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m3", "AFTER_INLINE_TOOL_REPLY"),
|
||||
responses::ev_completed("resp-3"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_text_turn_and_wait(&codex, first_turn_text).await?;
|
||||
submit_text_turn_and_wait(&codex, second_turn_text).await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected initial request, inline-compacted tool call, and same-turn follow-up"
|
||||
);
|
||||
|
||||
let follow_up_request = &requests[2];
|
||||
let follow_up_body = follow_up_request.body_json().to_string();
|
||||
assert!(
|
||||
follow_up_request.body_contains_text(&inline_summary),
|
||||
"expected same-turn follow-up to include the inline compaction item"
|
||||
);
|
||||
assert!(
|
||||
!follow_up_request.body_contains_text("<permissions instructions>"),
|
||||
"expected same-turn follow-up to rely on the compaction item rather than duplicate canonical context"
|
||||
);
|
||||
assert!(
|
||||
!follow_up_request.body_contains_text(second_turn_text),
|
||||
"expected same-turn follow-up to replace prior plaintext history with the compaction item"
|
||||
);
|
||||
assert!(
|
||||
!follow_up_request.body_contains_text(first_turn_text),
|
||||
"expected same-turn follow-up to drop pre-compaction history"
|
||||
);
|
||||
assert!(
|
||||
follow_up_request
|
||||
.function_call_output_text("call-inline-mid-turn")
|
||||
.is_some(),
|
||||
"expected same-turn follow-up to include the tool output"
|
||||
);
|
||||
assert!(
|
||||
follow_up_body
|
||||
.find("INLINE_SERVER_SUMMARY")
|
||||
.expect("inline compaction marker in follow-up request")
|
||||
< follow_up_body
|
||||
.find("call-inline-mid-turn")
|
||||
.expect("post-compaction tool call in follow-up request"),
|
||||
"expected the inline compaction item to remain ahead of post-compaction tool calls"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_server_side_compaction_retries_without_persisting_failed_compaction_item()
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 120;
|
||||
let first_turn_text = "inline compact retry turn one";
|
||||
let inline_summary = summary_with_prefix("INLINE_SERVER_RETRY_SUMMARY");
|
||||
let server = wiremock::MockServer::start().await;
|
||||
let model_provider = ModelProviderInfo {
|
||||
name: "openai".into(),
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
env_key: Some("PATH".into()),
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(1),
|
||||
stream_idle_timeout_ms: Some(2000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-1", 500),
|
||||
])),
|
||||
responses::sse_response(sse(vec![responses::ev_compaction(&inline_summary)])),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m2", "AFTER_RETRY_REPLY"),
|
||||
responses::ev_completed_with_tokens("resp-2", 40),
|
||||
])),
|
||||
responses::sse_response(sse(vec![
|
||||
responses::ev_assistant_message("m3", "THIRD_TURN_REPLY"),
|
||||
responses::ev_completed("resp-3"),
|
||||
])),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let test = test_codex()
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
})
|
||||
.build(&server)
|
||||
.await?;
|
||||
let codex = test.codex.clone();
|
||||
|
||||
submit_text_turn_and_wait(&codex, first_turn_text).await?;
|
||||
submit_text_turn_and_wait(&codex, "inline compact retry turn two").await?;
|
||||
submit_text_turn_and_wait(&codex, "inline compact retry turn three").await?;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
4,
|
||||
"expected initial turn, incomplete inline attempt, retry, and third turn"
|
||||
);
|
||||
|
||||
let third_turn_request = &requests[3];
|
||||
assert!(
|
||||
!third_turn_request.body_contains_text(&inline_summary),
|
||||
"failed inline compaction should not persist its compaction item into later turns"
|
||||
);
|
||||
assert!(
|
||||
third_turn_request.body_contains_text(first_turn_text),
|
||||
"failed inline compaction should leave pre-compaction history intact for later turns"
|
||||
);
|
||||
assert!(
|
||||
third_turn_request.body_contains_text("AFTER_RETRY_REPLY"),
|
||||
"retry response should be the history that later turns build on"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "windows", ignore)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {
|
||||
@@ -2113,6 +2529,171 @@ async fn snapshot_request_shape_remote_pre_turn_compaction_strips_incoming_model
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn previous_model_preflight_remote_compaction_still_runs_with_inline_feature_enabled()
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let compact_threshold = 200;
|
||||
let previous_model = "gpt-5.2-codex";
|
||||
let next_model = "gpt-5.1-codex-max";
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_model(previous_model)
|
||||
.with_config(move |config| {
|
||||
config.model_auto_compact_token_limit = Some(compact_threshold);
|
||||
config
|
||||
.features
|
||||
.enable(Feature::ServerSideCompaction)
|
||||
.expect("enable server-side compaction");
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
let models_mock = mount_models_once(
|
||||
harness.server(),
|
||||
ModelsResponse {
|
||||
models: vec![
|
||||
model_info_with_context_window(previous_model, 273_000),
|
||||
model_info_with_context_window(next_model, 125_000),
|
||||
],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let _ = harness
|
||||
.test()
|
||||
.thread_manager
|
||||
.get_models_manager()
|
||||
.list_models(RefreshStrategy::Online)
|
||||
.await;
|
||||
|
||||
let initial_turn_request_mock = responses::mount_sse_once(
|
||||
harness.server(),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m1", "BEFORE_SWITCH_REPLY"),
|
||||
responses::ev_completed_with_tokens("r1", 120_000),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let post_compact_turn_request_mock = responses::mount_sse_once(
|
||||
harness.server(),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m2", "AFTER_SWITCH_REPLY"),
|
||||
responses::ev_completed_with_tokens("r2", 80),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let compact_mock = responses::mount_compact_user_history_with_summary_once(
|
||||
harness.server(),
|
||||
&summary_with_prefix("REMOTE_SWITCH_SUMMARY"),
|
||||
)
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "BEFORE_SWITCH_USER".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: harness.test().cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: previous_model.to_string(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "AFTER_SWITCH_USER".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: harness.test().cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: next_model.to_string(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
assert_eq!(
|
||||
compact_mock.requests().len(),
|
||||
1,
|
||||
"expected previous-model preflight to keep using remote compaction even when inline server-side compaction is enabled"
|
||||
);
|
||||
assert_eq!(models_mock.requests().len(), 1, "expected one models fetch");
|
||||
assert_eq!(
|
||||
initial_turn_request_mock.requests().len(),
|
||||
1,
|
||||
"expected initial turn request"
|
||||
);
|
||||
assert_eq!(
|
||||
post_compact_turn_request_mock.requests().len(),
|
||||
1,
|
||||
"expected post-compaction follow-up request"
|
||||
);
|
||||
let initial_turn_request = initial_turn_request_mock.single_request();
|
||||
let compact_request = compact_mock.single_request();
|
||||
let post_compact_turn_request = post_compact_turn_request_mock.single_request();
|
||||
assert_eq!(
|
||||
initial_turn_request
|
||||
.body_json()
|
||||
.get("model")
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(previous_model)
|
||||
);
|
||||
assert_eq!(
|
||||
compact_request
|
||||
.body_json()
|
||||
.get("model")
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(previous_model)
|
||||
);
|
||||
assert_eq!(
|
||||
post_compact_turn_request
|
||||
.body_json()
|
||||
.get("model")
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(next_model)
|
||||
);
|
||||
let compact_body = compact_request.body_json().to_string();
|
||||
assert!(
|
||||
!compact_body.contains("<model_switch>"),
|
||||
"expected previous-model preflight compact request to strip model-switch context"
|
||||
);
|
||||
let follow_up_body = post_compact_turn_request.body_json().to_string();
|
||||
assert!(
|
||||
follow_up_body.contains("<model_switch>"),
|
||||
"expected post-compaction next-model request to restore model-switch context"
|
||||
);
|
||||
assert_eq!(
|
||||
post_compact_turn_request
|
||||
.body_json()
|
||||
.get("context_management"),
|
||||
Some(&json!([{
|
||||
"type": "compaction",
|
||||
"compact_threshold": compact_threshold,
|
||||
}])),
|
||||
"expected eligible next-model requests to always include inline context management after preflight compaction"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// TODO(ccunningham): Update once remote pre-turn compaction context-overflow handling includes
|
||||
// incoming user input and emits richer oversized-input messaging.
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
---
|
||||
source: core/tests/suite/compact_remote.rs
|
||||
assertion_line: 558
|
||||
expression: "format_labeled_requests_snapshot(\"Inline server-side compaction: the compacting turn opts into context_management, and the next request reuses the streamed compaction checkpoint directly.\",\n&[(\"Inline Compaction Request\", &requests[1]),\n(\"Post-Inline-Compaction History Layout\", &requests[2]),])"
|
||||
---
|
||||
Scenario: Inline server-side compaction: the compacting turn opts into context_management, and the next request reuses the streamed compaction checkpoint directly.
|
||||
|
||||
## Inline Compaction Request
|
||||
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
01:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/user:inline compact turn one
|
||||
03:message/assistant:FIRST_REMOTE_REPLY
|
||||
04:message/user:inline compact turn two
|
||||
|
||||
## Post-Inline-Compaction History Layout
|
||||
00:compaction:encrypted=true
|
||||
01:message/assistant:AFTER_INLINE_REPLY
|
||||
02:message/user:inline compact turn three
|
||||
Reference in New Issue
Block a user