Pre-sampling compact with previous model context (#11504)

- Run pre-sampling compact through a single helper that builds
previous-model turn context and compacts before the follow-up request
when switching to a smaller context window.
- Keep compaction events on the parent turn id and add compact suite
coverage for switch-in-session and resume+switch flows.
This commit is contained in:
Ahmed Ibrahim
2026-02-11 17:24:06 -08:00
committed by GitHub
parent 3f1b41689a
commit 6938150c5e
3 changed files with 500 additions and 4 deletions

View File

@@ -564,6 +564,80 @@ impl TurnContext {
})
}
pub(crate) async fn with_model(&self, model: String, models_manager: &ModelsManager) -> Self {
let mut config = (*self.config).clone();
config.model = Some(model.clone());
let model_info = models_manager.get_model_info(model.as_str(), &config).await;
let truncation_policy = model_info.truncation_policy.into();
let supported_reasoning_levels = model_info
.supported_reasoning_levels
.iter()
.map(|preset| preset.effort)
.collect::<Vec<_>>();
let reasoning_effort = if let Some(current_reasoning_effort) = self.reasoning_effort {
if supported_reasoning_levels.contains(&current_reasoning_effort) {
Some(current_reasoning_effort)
} else {
supported_reasoning_levels
.get(supported_reasoning_levels.len().saturating_sub(1) / 2)
.copied()
.or(model_info.default_reasoning_level)
}
} else {
supported_reasoning_levels
.get(supported_reasoning_levels.len().saturating_sub(1) / 2)
.copied()
.or(model_info.default_reasoning_level)
};
config.model_reasoning_effort = reasoning_effort;
let collaboration_mode =
self.collaboration_mode
.with_updates(Some(model.clone()), Some(reasoning_effort), None);
let features = self.features.clone();
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
features: &features,
web_search_mode: self.tools_config.web_search_mode,
});
Self {
sub_id: self.sub_id.clone(),
config: Arc::new(config),
auth_manager: self.auth_manager.clone(),
model_info: model_info.clone(),
otel_manager: self
.otel_manager
.clone()
.with_model(model.as_str(), model_info.slug.as_str()),
provider: self.provider.clone(),
reasoning_effort,
reasoning_summary: self.reasoning_summary,
session_source: self.session_source.clone(),
cwd: self.cwd.clone(),
developer_instructions: self.developer_instructions.clone(),
compact_prompt: self.compact_prompt.clone(),
user_instructions: self.user_instructions.clone(),
collaboration_mode,
personality: self.personality,
approval_policy: self.approval_policy,
sandbox_policy: self.sandbox_policy.clone(),
network: self.network.clone(),
windows_sandbox_level: self.windows_sandbox_level,
shell_environment_policy: self.shell_environment_policy.clone(),
tools_config,
features,
ghost_snapshot: self.ghost_snapshot.clone(),
final_output_json_schema: self.final_output_json_schema.clone(),
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
truncation_policy,
js_repl: Arc::clone(&self.js_repl),
dynamic_tools: self.dynamic_tools.clone(),
turn_metadata_header: self.turn_metadata_header.clone(),
}
}
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
path.as_ref()
.map(PathBuf::from)
@@ -3892,7 +3966,6 @@ pub(crate) async fn run_turn(
let model_info = turn_context.model_info.clone();
let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
let event = EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_context.sub_id.clone(),
@@ -3900,9 +3973,11 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit
&& run_auto_compact(&sess, &turn_context).await.is_err()
if run_pre_sampling_compact(&sess, &turn_context)
.await
.is_err()
{
error!("Failed to run pre-sampling compact");
return None;
}
@@ -4141,6 +4216,62 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_pre_sampling_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
maybe_run_previous_model_inline_compact(
sess,
turn_context,
total_usage_tokens_before_compaction,
)
.await?;
let total_usage_tokens = sess.get_total_token_usage().await;
let auto_compact_limit = turn_context
.model_info
.auto_compact_token_limit()
.unwrap_or(i64::MAX);
// Compact if the total usage tokens are greater than the auto compact limit
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(sess, turn_context).await?;
}
Ok(())
}
async fn maybe_run_previous_model_inline_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
total_usage_tokens: i64,
) -> CodexResult<()> {
let Some(previous_model) = sess.previous_model().await else {
return Ok(());
};
let previous_turn_context = Arc::new(
turn_context
.with_model(previous_model, &sess.services.models_manager)
.await,
);
let Some(old_context_window) = previous_turn_context.model_context_window() else {
return Ok(());
};
let Some(new_context_window) = turn_context.model_context_window() else {
return Ok(());
};
let new_auto_compact_limit = turn_context
.model_info
.auto_compact_token_limit()
.unwrap_or(i64::MAX);
let should_run = total_usage_tokens > new_auto_compact_limit
&& previous_turn_context.model_info.slug != turn_context.model_info.slug
&& old_context_window > new_context_window;
if should_run {
run_auto_compact(sess, &previous_turn_context).await?;
}
Ok(())
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
@@ -6112,6 +6243,44 @@ mod tests {
);
}
#[tokio::test]
async fn turn_context_with_model_updates_model_fields() {
let (session, mut turn_context) = make_session_and_context().await;
turn_context.reasoning_effort = Some(ReasoningEffortConfig::Minimal);
let updated = turn_context
.with_model("gpt-5.1".to_string(), &session.services.models_manager)
.await;
let expected_model_info = session
.services
.models_manager
.get_model_info("gpt-5.1", updated.config.as_ref())
.await;
assert_eq!(updated.config.model.as_deref(), Some("gpt-5.1"));
assert_eq!(updated.collaboration_mode.model(), "gpt-5.1");
assert_eq!(updated.model_info, expected_model_info);
assert_eq!(
updated.reasoning_effort,
Some(ReasoningEffortConfig::Medium)
);
assert_eq!(
updated.collaboration_mode.reasoning_effort(),
Some(ReasoningEffortConfig::Medium)
);
assert_eq!(
updated.config.model_reasoning_effort,
Some(ReasoningEffortConfig::Medium)
);
assert_eq!(
updated.truncation_policy,
expected_model_info.truncation_policy.into()
);
assert!(!Arc::ptr_eq(
&updated.tool_call_gate,
&turn_context.tool_call_gate
));
}
#[test]
fn falls_back_to_content_when_structured_is_null() {
let ctr = McpCallToolResult {

View File

@@ -153,7 +153,7 @@ impl Session {
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();
sess.on_task_finished(ctx_for_finish, last_agent_message)
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
.await;
}
// Set previous model regardless of completion or interruption for model-switch handling.

View File

@@ -5,6 +5,7 @@ use codex_core::built_in_model_providers;
use codex_core::compact::SUMMARIZATION_PROMPT;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::config::Config;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ItemCompletedEvent;
@@ -16,9 +17,12 @@ use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::WarningEvent;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::items::TurnItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_local_shell_call;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::mount_models_once;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
@@ -110,6 +114,78 @@ fn non_openai_model_provider(server: &MockServer) -> ModelProviderInfo {
provider
}
fn model_info_with_context_window(slug: &str, context_window: i64) -> ModelInfo {
let models_response: ModelsResponse =
serde_json::from_str(include_str!("../../models.json")).expect("valid models.json");
let mut model_info = models_response
.models
.into_iter()
.find(|model| model.slug == slug)
.unwrap_or_else(|| panic!("model `{slug}` missing from models.json"));
model_info.context_window = Some(context_window);
model_info
}
fn assert_pre_sampling_switch_compaction_requests(
first: &serde_json::Value,
compact: &serde_json::Value,
follow_up: &serde_json::Value,
previous_model: &str,
next_model: &str,
) {
assert_eq!(first["model"].as_str(), Some(previous_model));
assert_eq!(compact["model"].as_str(), Some(previous_model));
assert_eq!(follow_up["model"].as_str(), Some(next_model));
let compact_body = compact.to_string();
assert!(
body_contains_text(&compact_body, SUMMARIZATION_PROMPT),
"pre-sampling compact request should include summarization prompt"
);
}
async fn assert_compaction_uses_turn_lifecycle_id(codex: &std::sync::Arc<codex_core::CodexThread>) {
let mut turn_started_id = None;
let mut turn_completed_id = None;
let mut compact_started_id = None;
let mut compact_completed_id = None;
while turn_completed_id.is_none() {
let event = codex.next_event().await.expect("next event");
match event.msg {
EventMsg::TurnStarted(_) => turn_started_id = Some(event.id.clone()),
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::ContextCompaction(_),
..
}) => compact_started_id = Some(event.id.clone()),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::ContextCompaction(_),
..
}) => compact_completed_id = Some(event.id.clone()),
EventMsg::TurnComplete(_) => turn_completed_id = Some(event.id.clone()),
_ => {}
}
}
let turn_started_id = turn_started_id.expect("turn started id");
let turn_completed_id = turn_completed_id.expect("turn complete id");
assert_eq!(
turn_completed_id, turn_started_id,
"turn start and complete should use the same event id"
);
assert_eq!(
compact_started_id,
Some(turn_started_id.clone()),
"compaction item start should use the turn event id"
);
assert_eq!(
compact_completed_id,
Some(turn_started_id),
"compaction item completion should use the turn event id"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn summarize_context_three_requests_and_instructions() {
skip_if_no_network!();
@@ -1551,6 +1627,257 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pre_sampling_compact_runs_on_switch_to_smaller_context_model() {
skip_if_no_network!();
let server = MockServer::start().await;
let previous_model = "gpt-5.2-codex";
let next_model = "gpt-5.1-codex-max";
let models_mock = mount_models_once(
&server,
ModelsResponse {
models: vec![
model_info_with_context_window(previous_model, 273_000),
model_info_with_context_window(next_model, 125_000),
],
},
)
.await;
let request_log = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_assistant_message("m1", "before switch"),
ev_completed_with_tokens("r1", 120_000),
]),
sse(vec![
ev_assistant_message("m2", "PRE_SAMPLING_SUMMARY"),
ev_completed_with_tokens("r2", 10),
]),
sse(vec![
ev_assistant_message("m3", "after switch"),
ev_completed_with_tokens("r3", 100),
]),
],
)
.await;
let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_model(previous_model)
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.features.enable(Feature::RemoteModels);
});
let test = builder.build(&server).await.expect("build test codex");
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "before switch".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: previous_model.to_string(),
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await
.expect("submit first user turn");
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "after switch".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: next_model.to_string(),
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await
.expect("submit second user turn");
assert_compaction_uses_turn_lifecycle_id(&test.codex).await;
let requests = request_log.requests();
assert_eq!(models_mock.requests().len(), 1);
assert_eq!(
requests.len(),
3,
"expected user, compact, and follow-up requests"
);
assert_pre_sampling_switch_compaction_requests(
&requests[0].body_json(),
&requests[1].body_json(),
&requests[2].body_json(),
previous_model,
next_model,
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pre_sampling_compact_runs_after_resume_and_switch_to_smaller_model() {
skip_if_no_network!();
let server = MockServer::start().await;
let previous_model = "gpt-5.2-codex";
let next_model = "gpt-5.1-codex-max";
let models_mock = mount_models_once(
&server,
ModelsResponse {
models: vec![
model_info_with_context_window(previous_model, 273_000),
model_info_with_context_window(next_model, 125_000),
],
},
)
.await;
let request_log = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_assistant_message("m1", "before resume"),
ev_completed_with_tokens("r1", 120_000),
]),
sse(vec![
ev_assistant_message("m2", "PRE_SAMPLING_SUMMARY"),
ev_completed_with_tokens("r2", 10),
]),
sse(vec![
ev_assistant_message("m3", "after resume"),
ev_completed_with_tokens("r3", 100),
]),
],
)
.await;
let model_provider = non_openai_model_provider(&server);
let mut initial_builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_model(previous_model)
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.features.enable(Feature::RemoteModels);
});
let initial = initial_builder
.build(&server)
.await
.expect("build initial test codex");
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
initial
.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "before resume".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: initial.cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: previous_model.to_string(),
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await
.expect("submit pre-resume turn");
wait_for_event(&initial.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
initial
.codex
.submit(Op::Shutdown)
.await
.expect("shutdown initial session");
wait_for_event(&initial.codex, |event| {
matches!(event, EventMsg::ShutdownComplete)
})
.await;
let model_provider = non_openai_model_provider(&server);
let mut resumed_builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_model(previous_model)
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.features.enable(Feature::RemoteModels);
});
let resumed = resumed_builder
.resume(&server, home, rollout_path)
.await
.expect("resume codex");
resumed
.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "after resume".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: resumed.cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: next_model.to_string(),
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await
.expect("submit resumed user turn");
assert_compaction_uses_turn_lifecycle_id(&resumed.codex).await;
let requests = request_log.requests();
assert_eq!(models_mock.requests().len(), 1);
assert_eq!(
requests.len(),
3,
"expected user, compact, and follow-up requests"
);
assert_pre_sampling_switch_compaction_requests(
&requests[0].body_json(),
&requests[1].body_json(),
&requests[2].body_json(),
previous_model,
next_model,
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_persists_rollout_entries() {
skip_if_no_network!();