mirror of
https://github.com/openai/codex.git
synced 2026-03-09 08:03:24 +00:00
Compare commits
5 Commits
dev/friel/
...
pre-sampli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0b5842e95 | ||
|
|
2a6a9e408b | ||
|
|
7eaff9f810 | ||
|
|
4290b8731f | ||
|
|
e5c2ecdc92 |
@@ -1324,6 +1324,16 @@ impl Session {
|
||||
state.clear_mcp_tool_selection();
|
||||
}
|
||||
|
||||
async fn previous_turn_context(&self) -> Option<Arc<TurnContext>> {
|
||||
let state = self.state.lock().await;
|
||||
state.previous_turn_context()
|
||||
}
|
||||
|
||||
pub(crate) async fn set_previous_turn_context(&self, turn_context: Arc<TurnContext>) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_previous_turn_context(turn_context);
|
||||
}
|
||||
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
match conversation_history {
|
||||
@@ -2822,7 +2832,12 @@ impl Session {
|
||||
|
||||
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
|
||||
// Seed with context in case there is an OverrideTurnContext first.
|
||||
let mut previous_context: Option<Arc<TurnContext>> = Some(sess.new_default_turn().await);
|
||||
if sess.previous_turn_context().await.is_none() {
|
||||
let default_turn = sess.new_default_turn().await;
|
||||
if sess.previous_turn_context().await.is_none() {
|
||||
sess.set_previous_turn_context(default_turn).await;
|
||||
}
|
||||
}
|
||||
|
||||
// To break out of this loop, send Op::Shutdown.
|
||||
while let Ok(sub) = rx_sub.recv().await {
|
||||
@@ -2872,8 +2887,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
.await;
|
||||
}
|
||||
Op::UserInput { .. } | Op::UserTurn { .. } => {
|
||||
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op, &mut previous_context)
|
||||
.await;
|
||||
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
|
||||
}
|
||||
Op::ExecApproval {
|
||||
id: approval_id,
|
||||
@@ -2939,13 +2953,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
handlers::set_thread_name(&sess, sub.id.clone(), name).await;
|
||||
}
|
||||
Op::RunUserShellCommand { command } => {
|
||||
handlers::run_user_shell_command(
|
||||
&sess,
|
||||
sub.id.clone(),
|
||||
command,
|
||||
&mut previous_context,
|
||||
)
|
||||
.await;
|
||||
handlers::run_user_shell_command(&sess, sub.id.clone(), command).await;
|
||||
}
|
||||
Op::ResolveElicitation {
|
||||
server_name,
|
||||
@@ -2973,7 +2981,6 @@ mod handlers {
|
||||
use crate::codex::Session;
|
||||
use crate::codex::SessionSettingsUpdate;
|
||||
use crate::codex::SteerInputError;
|
||||
use crate::codex::TurnContext;
|
||||
|
||||
use crate::codex::spawn_review_thread;
|
||||
use crate::config::Config;
|
||||
@@ -3048,12 +3055,7 @@ mod handlers {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn user_input_or_turn(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
op: Op,
|
||||
previous_context: &mut Option<Arc<TurnContext>>,
|
||||
) {
|
||||
pub async fn user_input_or_turn(sess: &Arc<Session>, sub_id: String, op: Op) {
|
||||
let (items, updates) = match op {
|
||||
Op::UserTurn {
|
||||
cwd,
|
||||
@@ -3113,6 +3115,7 @@ mod handlers {
|
||||
// Attempt to inject input into current task.
|
||||
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
|
||||
sess.seed_initial_context_if_needed(¤t_context).await;
|
||||
let previous_context = sess.previous_turn_context().await;
|
||||
let resumed_model = sess.take_pending_resume_previous_model().await;
|
||||
let update_items = sess.build_settings_update_items(
|
||||
previous_context.as_ref(),
|
||||
@@ -3129,16 +3132,10 @@ mod handlers {
|
||||
let regular_task = sess.take_startup_regular_task().await.unwrap_or_default();
|
||||
sess.spawn_task(Arc::clone(¤t_context), items, regular_task)
|
||||
.await;
|
||||
*previous_context = Some(current_context);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_user_shell_command(
|
||||
sess: &Arc<Session>,
|
||||
sub_id: String,
|
||||
command: String,
|
||||
previous_context: &mut Option<Arc<TurnContext>>,
|
||||
) {
|
||||
pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command: String) {
|
||||
if let Some((turn_context, cancellation_token)) =
|
||||
sess.active_turn_context_and_cancellation_token().await
|
||||
{
|
||||
@@ -3163,7 +3160,6 @@ mod handlers {
|
||||
UserShellCommandTask::new(command),
|
||||
)
|
||||
.await;
|
||||
*previous_context = Some(turn_context);
|
||||
}
|
||||
|
||||
pub async fn resolve_elicitation(
|
||||
@@ -3862,15 +3858,16 @@ pub(crate) async fn run_turn(
|
||||
|
||||
let model_info = turn_context.model_info.clone();
|
||||
let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX);
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
|
||||
let event = EventMsg::TurnStarted(TurnStartedEvent {
|
||||
model_context_window: turn_context.model_context_window(),
|
||||
collaboration_mode_kind: turn_context.collaboration_mode.mode,
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
if total_usage_tokens >= auto_compact_limit
|
||||
&& run_auto_compact(&sess, &turn_context).await.is_err()
|
||||
|
||||
if run_pre_sampling_compact(&sess, &turn_context)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
@@ -4119,6 +4116,53 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
) -> CodexResult<()> {
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let auto_compact_limit = turn_context
|
||||
.model_info
|
||||
.auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
// Compact with previous model if the model was switched and previous context window is larger than the new one
|
||||
if let Some(previous_turn_context) = sess.previous_turn_context().await
|
||||
&& should_run_inline_compact_with_previous_context(
|
||||
total_usage_tokens,
|
||||
&previous_turn_context,
|
||||
turn_context.as_ref(),
|
||||
)
|
||||
{
|
||||
run_auto_compact(sess, &previous_turn_context).await?;
|
||||
}
|
||||
// Compact if the total usage tokens are greater than the auto compact limit
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
run_auto_compact(sess, turn_context).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn should_run_inline_compact_with_previous_context(
|
||||
total_usage_tokens: i64,
|
||||
previous_turn_context: &TurnContext,
|
||||
turn_context: &TurnContext,
|
||||
) -> bool {
|
||||
let Some(old_context_window) = previous_turn_context.model_context_window() else {
|
||||
return false;
|
||||
};
|
||||
let Some(new_context_window) = turn_context.model_context_window() else {
|
||||
return false;
|
||||
};
|
||||
let new_auto_compact_limit = turn_context
|
||||
.model_info
|
||||
.auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
|
||||
total_usage_tokens > new_auto_compact_limit
|
||||
&& previous_turn_context.model_info.slug != turn_context.model_info.slug
|
||||
&& old_context_window > new_context_window
|
||||
}
|
||||
|
||||
fn filter_connectors_for_input(
|
||||
connectors: Vec<connectors::AppInfo>,
|
||||
input: &[ResponseItem],
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::codex::SessionConfiguration;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::TokenUsage;
|
||||
@@ -30,6 +32,7 @@ pub(crate) struct SessionState {
|
||||
/// Startup regular task pre-created during session initialization.
|
||||
pub(crate) startup_regular_task: Option<RegularTask>,
|
||||
pub(crate) active_mcp_tool_selection: Option<Vec<String>>,
|
||||
pub(crate) previous_turn_context: Option<Arc<TurnContext>>,
|
||||
}
|
||||
|
||||
impl SessionState {
|
||||
@@ -47,6 +50,7 @@ impl SessionState {
|
||||
pending_resume_previous_model: None,
|
||||
startup_regular_task: None,
|
||||
active_mcp_tool_selection: None,
|
||||
previous_turn_context: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,6 +172,14 @@ impl SessionState {
|
||||
pub(crate) fn clear_mcp_tool_selection(&mut self) {
|
||||
self.active_mcp_tool_selection = None;
|
||||
}
|
||||
|
||||
pub(crate) fn previous_turn_context(&self) -> Option<Arc<TurnContext>> {
|
||||
self.previous_turn_context.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_previous_turn_context(&mut self, turn_context: Arc<TurnContext>) {
|
||||
self.previous_turn_context = Some(turn_context);
|
||||
}
|
||||
}
|
||||
|
||||
// Sometimes new snapshots don't include credits or plan information.
|
||||
@@ -187,6 +199,7 @@ fn merge_rate_limit_fields(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::codex::make_session_configuration_for_tests;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
@@ -258,4 +271,21 @@ mod tests {
|
||||
|
||||
assert_eq!(state.get_mcp_tool_selection(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_previous_turn_context_stores_context() {
|
||||
let (_session, turn_context) = make_session_and_context().await;
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
|
||||
state.set_previous_turn_context(Arc::new(turn_context));
|
||||
|
||||
assert_eq!(
|
||||
state
|
||||
.previous_turn_context()
|
||||
.as_ref()
|
||||
.map(|context| &context.sub_id),
|
||||
Some(&"turn_id".to_string())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,10 +148,12 @@ impl Session {
|
||||
task_cancellation_token.child_token(),
|
||||
)
|
||||
.await;
|
||||
session_ctx.clone_session().flush_rollout().await;
|
||||
let sess = session_ctx.clone_session();
|
||||
sess.flush_rollout().await;
|
||||
sess.set_previous_turn_context(Arc::clone(&ctx_for_finish))
|
||||
.await;
|
||||
if !task_cancellation_token.is_cancelled() {
|
||||
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
|
||||
let sess = session_ctx.clone_session();
|
||||
sess.on_task_finished(ctx_for_finish, last_agent_message)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ use codex_core::built_in_model_providers;
|
||||
use codex_core::compact::SUMMARIZATION_PROMPT;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::models_manager::manager::RefreshStrategy;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ItemCompletedEvent;
|
||||
@@ -16,9 +18,18 @@ use codex_core::protocol::SandboxPolicy;
|
||||
use codex_core::protocol::WarningEvent;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::openai_models::ConfigShellToolType;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelVisibility;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::openai_models::ReasoningEffortPreset;
|
||||
use codex_protocol::openai_models::TruncationPolicyConfig;
|
||||
use codex_protocol::openai_models::default_input_modalities;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ev_local_shell_call;
|
||||
use core_test_support::responses::ev_reasoning_item;
|
||||
use core_test_support::responses::mount_models_once;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
@@ -110,6 +121,37 @@ fn non_openai_model_provider(server: &MockServer) -> ModelProviderInfo {
|
||||
provider
|
||||
}
|
||||
|
||||
fn test_remote_model_with_context_window(slug: &str, context_window: i64) -> ModelInfo {
|
||||
ModelInfo {
|
||||
slug: slug.to_string(),
|
||||
display_name: format!("{slug} display"),
|
||||
description: Some(format!("{slug} description")),
|
||||
default_reasoning_level: Some(ReasoningEffort::Medium),
|
||||
supported_reasoning_levels: vec![ReasoningEffortPreset {
|
||||
effort: ReasoningEffort::Medium,
|
||||
description: ReasoningEffort::Medium.to_string(),
|
||||
}],
|
||||
shell_type: ConfigShellToolType::ShellCommand,
|
||||
visibility: ModelVisibility::List,
|
||||
supported_in_api: true,
|
||||
input_modalities: default_input_modalities(),
|
||||
priority: 1,
|
||||
upgrade: None,
|
||||
base_instructions: "base instructions".to_string(),
|
||||
model_messages: None,
|
||||
supports_reasoning_summaries: false,
|
||||
support_verbosity: false,
|
||||
default_verbosity: None,
|
||||
apply_patch_tool_type: None,
|
||||
truncation_policy: TruncationPolicyConfig::bytes(10_000),
|
||||
supports_parallel_tool_calls: false,
|
||||
context_window: Some(context_window),
|
||||
auto_compact_token_limit: None,
|
||||
effective_context_window_percent: 95,
|
||||
experimental_supported_tools: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn summarize_context_three_requests_and_instructions() {
|
||||
skip_if_no_network!();
|
||||
@@ -1551,6 +1593,411 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_compact_runs_pre_sampling_when_switching_to_smaller_context_window_model() {
|
||||
skip_if_no_network!();
|
||||
|
||||
// Phase 1: configure models and mock responses for large->small model switch.
|
||||
let server = MockServer::start().await;
|
||||
let large_model = "test-large-context-model";
|
||||
let small_model = "test-small-context-model";
|
||||
let compact_summary = "SMALL_CONTEXT_PRE_SAMPLING_SUMMARY";
|
||||
let over_small_model_limit_tokens = 115_000;
|
||||
let models_mock = mount_models_once(
|
||||
&server,
|
||||
ModelsResponse {
|
||||
models: vec![
|
||||
test_remote_model_with_context_window(large_model, 273_000),
|
||||
test_remote_model_with_context_window(small_model, 125_000),
|
||||
],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let compacted_history = vec![
|
||||
codex_protocol::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![codex_protocol::models::ContentItem::OutputText {
|
||||
text: compact_summary.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
},
|
||||
codex_protocol::models::ResponseItem::Compaction {
|
||||
encrypted_content: "ENCRYPTED_SMALL_CONTEXT_PRE_SAMPLING_SUMMARY".to_string(),
|
||||
},
|
||||
];
|
||||
let compact_mock_1 = mount_compact_json_once(
|
||||
&server,
|
||||
serde_json::json!({ "output": compacted_history.clone() }),
|
||||
)
|
||||
.await;
|
||||
let compact_mock_2 =
|
||||
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed_with_tokens("r1", over_small_model_limit_tokens),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let follow_up_user = "smaller model follow up";
|
||||
let follow_up_mock = mount_sse_once_match(
|
||||
&server,
|
||||
move |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(follow_up_user) && body.contains(compact_summary)
|
||||
},
|
||||
sse(vec![
|
||||
ev_assistant_message("m2", FINAL_REPLY),
|
||||
ev_completed("r2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config.features.enable(Feature::RemoteModels);
|
||||
config.model = Some(large_model.to_string());
|
||||
});
|
||||
let test = builder.build(&server).await.unwrap();
|
||||
|
||||
// Phase 2: warm model metadata and seed token usage on the large model.
|
||||
let models_manager = test.thread_manager.get_models_manager();
|
||||
let _ = models_manager
|
||||
.list_models(&test.config, RefreshStrategy::OnlineIfUncached)
|
||||
.await;
|
||||
let model_requests = models_mock.requests();
|
||||
assert_eq!(
|
||||
model_requests.len(),
|
||||
1,
|
||||
"expected a single /models request for online model metadata"
|
||||
);
|
||||
assert_eq!(model_requests[0].url.path(), "/v1/models");
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "first turn on large model".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: large_model.to_string(),
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Phase 3: run the follow-up turn on the smaller model and validate lifecycle integrity.
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: follow_up_user.into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: small_model.to_string(),
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut saw_context_compacted = false;
|
||||
let mut started_turn_ids = Vec::new();
|
||||
let completed_turn_id = loop {
|
||||
let event = test.codex.next_event().await.unwrap();
|
||||
match event.msg {
|
||||
EventMsg::ContextCompacted(_) => {
|
||||
saw_context_compacted = true;
|
||||
}
|
||||
EventMsg::TurnStarted(_) if !event.id.starts_with("auto-compact-") => {
|
||||
started_turn_ids.push(event.id.clone());
|
||||
}
|
||||
EventMsg::TurnComplete(_) if !event.id.starts_with("auto-compact-") => {
|
||||
break event.id;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
assert!(
|
||||
saw_context_compacted,
|
||||
"expected context compaction before the follow-up turn completed"
|
||||
);
|
||||
assert_eq!(
|
||||
started_turn_ids.len(),
|
||||
1,
|
||||
"expected exactly one TurnStarted for the follow-up turn"
|
||||
);
|
||||
assert_eq!(
|
||||
started_turn_ids[0], completed_turn_id,
|
||||
"expected TurnStarted and TurnComplete to use the same follow-up turn id"
|
||||
);
|
||||
|
||||
// Phase 4: assert pre-sampling compact request ordering and model selection.
|
||||
let mut compact_requests = compact_mock_1.requests();
|
||||
compact_requests.extend(compact_mock_2.requests());
|
||||
assert!(
|
||||
!compact_requests.is_empty(),
|
||||
"expected compaction before follow-up request on the smaller model"
|
||||
);
|
||||
assert_eq!(compact_requests[0].path(), "/v1/responses/compact");
|
||||
let first_compact_body = compact_requests[0].body_json();
|
||||
assert_eq!(
|
||||
first_compact_body
|
||||
.get("model")
|
||||
.and_then(|value| value.as_str()),
|
||||
Some(large_model),
|
||||
"expected first pre-sampling compact to run with previous larger model"
|
||||
);
|
||||
|
||||
let follow_up_requests = follow_up_mock.requests();
|
||||
assert!(
|
||||
!follow_up_requests.is_empty(),
|
||||
"expected at least one follow-up /responses request"
|
||||
);
|
||||
let follow_up_request = follow_up_requests
|
||||
.last()
|
||||
.expect("follow-up request")
|
||||
.body_json();
|
||||
assert_eq!(
|
||||
follow_up_request
|
||||
.get("model")
|
||||
.and_then(|value| value.as_str()),
|
||||
Some(small_model),
|
||||
"expected follow-up response request to use the smaller model"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_compact_runs_pre_sampling_after_resume_when_switching_to_smaller_context_window_model()
|
||||
{
|
||||
skip_if_no_network!();
|
||||
|
||||
// Phase 1: configure models and seed over-limit usage on the large model.
|
||||
let server = MockServer::start().await;
|
||||
let large_model = "test-large-context-model";
|
||||
let small_model = "test-small-context-model";
|
||||
let compact_summary = "RESUMED_SMALL_CONTEXT_PRE_SAMPLING_SUMMARY";
|
||||
let over_small_model_limit_tokens = 115_000;
|
||||
mount_models_once(
|
||||
&server,
|
||||
ModelsResponse {
|
||||
models: vec![
|
||||
test_remote_model_with_context_window(large_model, 273_000),
|
||||
test_remote_model_with_context_window(small_model, 125_000),
|
||||
],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed_with_tokens("r1", over_small_model_limit_tokens),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut start_builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config.features.enable(Feature::RemoteModels);
|
||||
config.model = Some(large_model.to_string());
|
||||
});
|
||||
let initial = start_builder.build(&server).await.unwrap();
|
||||
let home = initial.home.clone();
|
||||
let rollout_path = initial
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("rollout path");
|
||||
|
||||
let models_manager = initial.thread_manager.get_models_manager();
|
||||
let _ = models_manager
|
||||
.list_models(&initial.config, RefreshStrategy::OnlineIfUncached)
|
||||
.await;
|
||||
|
||||
// Phase 2: complete the initial large-model turn, then set up compact and follow-up mocks.
|
||||
initial
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "first turn on large model".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: initial.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: large_model.to_string(),
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
wait_for_event(&initial.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let compacted_history = vec![
|
||||
codex_protocol::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![codex_protocol::models::ContentItem::OutputText {
|
||||
text: compact_summary.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
},
|
||||
codex_protocol::models::ResponseItem::Compaction {
|
||||
encrypted_content: "ENCRYPTED_RESUMED_SMALL_CONTEXT_PRE_SAMPLING_SUMMARY".to_string(),
|
||||
},
|
||||
];
|
||||
let compact_mock_1 = mount_compact_json_once(
|
||||
&server,
|
||||
serde_json::json!({ "output": compacted_history.clone() }),
|
||||
)
|
||||
.await;
|
||||
let compact_mock_2 =
|
||||
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;
|
||||
|
||||
let follow_up_user = "smaller model follow up after resume";
|
||||
let follow_up_mock = mount_sse_once_match(
|
||||
&server,
|
||||
move |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(follow_up_user) && body.contains(compact_summary)
|
||||
},
|
||||
sse(vec![
|
||||
ev_assistant_message("m2", FINAL_REPLY),
|
||||
ev_completed("r2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Phase 3: resume, submit the smaller-model turn, and validate lifecycle integrity.
|
||||
let mut resume_builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config.features.enable(Feature::RemoteModels);
|
||||
config.model = Some(large_model.to_string());
|
||||
});
|
||||
let resumed = resume_builder
|
||||
.resume(&server, home, rollout_path)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
resumed
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: follow_up_user.into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: resumed.cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: small_model.to_string(),
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut saw_context_compacted = false;
|
||||
let mut started_turn_ids = Vec::new();
|
||||
let completed_turn_id = loop {
|
||||
let event = resumed.codex.next_event().await.unwrap();
|
||||
match event.msg {
|
||||
EventMsg::ContextCompacted(_) => {
|
||||
saw_context_compacted = true;
|
||||
}
|
||||
EventMsg::TurnStarted(_) if !event.id.starts_with("auto-compact-") => {
|
||||
started_turn_ids.push(event.id.clone());
|
||||
}
|
||||
EventMsg::TurnComplete(_) if !event.id.starts_with("auto-compact-") => {
|
||||
break event.id;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
assert!(
|
||||
saw_context_compacted,
|
||||
"expected context compaction before the resumed follow-up turn completed"
|
||||
);
|
||||
assert_eq!(
|
||||
started_turn_ids.len(),
|
||||
1,
|
||||
"expected exactly one TurnStarted for the resumed follow-up turn"
|
||||
);
|
||||
assert_eq!(
|
||||
started_turn_ids[0], completed_turn_id,
|
||||
"expected TurnStarted and TurnComplete to use the same resumed follow-up turn id"
|
||||
);
|
||||
|
||||
// Phase 4: assert pre-sampling compact request ordering and model selection after resume.
|
||||
let mut compact_requests = compact_mock_1.requests();
|
||||
compact_requests.extend(compact_mock_2.requests());
|
||||
assert!(
|
||||
!compact_requests.is_empty(),
|
||||
"expected compaction before follow-up request after resume"
|
||||
);
|
||||
assert_eq!(compact_requests[0].path(), "/v1/responses/compact");
|
||||
let first_compact_body = compact_requests[0].body_json();
|
||||
assert_eq!(
|
||||
first_compact_body
|
||||
.get("model")
|
||||
.and_then(|value| value.as_str()),
|
||||
Some(large_model),
|
||||
"expected first resumed pre-sampling compact to use previous larger model"
|
||||
);
|
||||
|
||||
let follow_up_requests = follow_up_mock.requests();
|
||||
assert!(
|
||||
!follow_up_requests.is_empty(),
|
||||
"expected at least one resumed follow-up /responses request"
|
||||
);
|
||||
let follow_up_request = follow_up_requests
|
||||
.last()
|
||||
.expect("resumed follow-up request")
|
||||
.body_json();
|
||||
assert_eq!(
|
||||
follow_up_request
|
||||
.get("model")
|
||||
.and_then(|value| value.as_str()),
|
||||
Some(small_model),
|
||||
"expected resumed follow-up response request to use smaller model"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_compact_persists_rollout_entries() {
|
||||
skip_if_no_network!();
|
||||
|
||||
Reference in New Issue
Block a user