Compare commits

...

2 Commits

Author SHA1 Message Date
Charles Cunningham
b774ea09ad compact: split core logic changes from snapshot test coverage 2026-02-11 12:05:26 -08:00
Charles Cunningham
7255c6b03c core tests: split compaction snapshot and shape coverage 2026-02-11 12:00:46 -08:00
23 changed files with 2733 additions and 142 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1708,6 +1708,7 @@ dependencies = [
"include_dir",
"indexmap 2.13.0",
"indoc",
"insta",
"keyring",
"landlock",
"libc",

View File

@@ -203,11 +203,6 @@ async fn thread_compact_start_triggers_compaction_and_returns_empty_response() -
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse = responses::sse(vec![
responses::ev_assistant_message("m1", "MANUAL_COMPACT_SUMMARY"),
responses::ev_completed_with_tokens("r1", 200),
]);
responses::mount_sse_sequence(&server, vec![sse]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(

View File

@@ -152,6 +152,7 @@ codex-utils-cargo-bin = { workspace = true }
core_test_support = { workspace = true }
ctor = { workspace = true }
image = { workspace = true, features = ["jpeg", "png"] }
insta = { workspace = true }
maplit = { workspace = true }
predicates = { workspace = true }
pretty_assertions = { workspace = true }

View File

@@ -17,6 +17,8 @@ use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::build_track_events_context;
use crate::apps::render_apps_section;
use crate::compact;
use crate::compact::AutoCompactCallsite;
use crate::compact::TurnContextReinjection;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
@@ -123,6 +125,7 @@ use crate::config::types::McpServerConfig;
use crate::config::types::ShellEnvironmentPolicy;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_item_token_count;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
@@ -2166,11 +2169,11 @@ impl Session {
history.replace(replacement.clone());
} else {
let user_messages = collect_user_messages(history.raw_items());
let rebuilt = compact::build_compacted_history(
self.build_initial_context(turn_context).await,
let mut rebuilt = self.build_initial_context(turn_context).await;
rebuilt.extend(compact::build_compacted_history(
&user_messages,
&compacted.message,
);
));
history.replace(rebuilt);
}
}
@@ -2187,9 +2190,14 @@ impl Session {
&self,
turn_context: &TurnContext,
compacted_history: Vec<ResponseItem>,
turn_context_reinjection: TurnContextReinjection,
) -> Vec<ResponseItem> {
let initial_context = self.build_initial_context(turn_context).await;
compact::process_compacted_history(compacted_history, &initial_context)
compact::process_compacted_history(
compacted_history,
&initial_context,
turn_context_reinjection,
)
}
/// Append ResponseItems to the in-memory conversation history only.
@@ -2239,6 +2247,11 @@ impl Session {
self.flush_rollout().await;
}
pub(crate) async fn mark_initial_context_unseeded_for_next_turn(&self) {
let mut state = self.state.lock().await;
state.initial_context_seeded = false;
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
@@ -2295,14 +2308,6 @@ impl Session {
DeveloperInstructions::new(SEARCH_TOOL_DEVELOPER_INSTRUCTIONS.to_string()).into(),
);
}
// Add developer instructions for memories.
if let Some(memory_prompt) =
memories::build_memory_tool_developer_instructions(&turn_context.config.codex_home)
.await
&& turn_context.features.enabled(Feature::MemoryTool)
{
items.push(DeveloperInstructions::new(memory_prompt).into());
}
// Add developer instructions from collaboration_mode if they exist and are non-empty
let (collaboration_mode, base_instructions) = {
let state = self.state.lock().await;
@@ -3122,19 +3127,29 @@ mod handlers {
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
sess.seed_initial_context_if_needed(&current_context).await;
let resumed_model = sess.take_pending_resume_previous_model().await;
let update_items = sess.build_settings_update_items(
let pre_turn_context_items = sess.build_settings_update_items(
previous_context.as_ref(),
resumed_model.as_deref(),
&current_context,
);
if !update_items.is_empty() {
sess.record_conversation_items(&current_context, &update_items)
let has_user_input = !items.is_empty();
if !has_user_input && !pre_turn_context_items.is_empty() {
// Empty-input UserTurn still needs these model-visible updates persisted now.
// Otherwise `previous_context` advances and the next non-empty turn computes no diff.
sess.record_conversation_items(&current_context, &pre_turn_context_items)
.await;
}
sess.refresh_mcp_servers_if_requested(&current_context)
.await;
let regular_task = sess.take_startup_regular_task().await.unwrap_or_default();
let regular_task = if has_user_input {
sess.take_startup_regular_task()
.await
.unwrap_or_default()
.with_pre_turn_context_items(pre_turn_context_items)
} else {
sess.take_startup_regular_task().await.unwrap_or_default()
};
sess.spawn_task(Arc::clone(&current_context), items, regular_task)
.await;
*previous_context = Some(current_context);
@@ -3861,6 +3876,7 @@ pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
pre_turn_context_items: Vec<ResponseItem>,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -3870,7 +3886,9 @@ pub(crate) async fn run_turn(
let model_info = turn_context.model_info.clone();
let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let mut incoming_turn_items = pre_turn_context_items.clone();
incoming_turn_items.push(response_item.clone());
let event = EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_context.sub_id.clone(),
@@ -3878,11 +3896,34 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit
&& run_auto_compact(&sess, &turn_context).await.is_err()
let pre_turn_compaction_outcome = match run_pre_turn_auto_compaction_if_needed(
&sess,
&turn_context,
auto_compact_limit,
&incoming_turn_items,
)
.await
{
return None;
}
Ok(outcome) => outcome,
Err(CodexErr::ContextWindowExceeded) => {
let incoming_items_tokens_estimate = incoming_turn_items
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
let message = format!(
"Incoming user message and/or turn context is too large to fit in context window. Please reduce the size of your message and try again. (incoming_items_tokens_estimate={incoming_items_tokens_estimate})"
);
let event =
EventMsg::Error(CodexErr::ContextWindowExceeded.to_error_event(Some(message)));
sess.send_event(&turn_context, event).await;
return None;
}
Err(err) => {
let event = EventMsg::Error(err.to_error_event(None));
sess.send_event(&turn_context, event).await;
return None;
}
};
let skills_outcome = Some(
sess.services
@@ -3954,10 +3995,15 @@ pub(crate) async fn run_turn(
.await;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
persist_pre_turn_items_for_compaction_outcome(
&sess,
&turn_context,
pre_turn_compaction_outcome,
&pre_turn_context_items,
&input,
response_item,
)
.await;
if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)
@@ -4059,7 +4105,19 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
if run_auto_compact(&sess, &turn_context).await.is_err() {
if let Err(err) = run_auto_compact(
&sess,
&turn_context,
AutoCompactCallsite::MidTurnContinuation,
TurnContextReinjection::ReinjectAboveLastRealUser,
None,
)
.await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running auto compact task".to_string())),
);
sess.send_event(&turn_context, event).await;
return None;
}
continue;
@@ -4119,13 +4177,158 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PreTurnCompactionOutcome {
/// Pre-turn input fits without compaction.
NotNeeded,
/// Pre-turn compaction succeeded with incoming turn context + user message included.
CompactedWithIncomingItems,
/// Pre-turn compaction succeeded without incoming turn items
/// (incoming user message should be appended after the compaction summary).
/// This compaction strategy is currently out of distribution for our compaction model,
/// but is planned to be trained on in the future.
#[cfg(test)]
CompactedWithoutIncomingItems,
}
async fn persist_pre_turn_items_for_compaction_outcome(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
outcome: PreTurnCompactionOutcome,
pre_turn_context_items: &[ResponseItem],
input: &[UserInput],
response_item: ResponseItem,
) {
match outcome {
PreTurnCompactionOutcome::CompactedWithIncomingItems => {
// Incoming turn items were already part of pre-turn compaction input, and the
// user prompt is already persisted in history after compaction. Emit lifecycle events
// only so UI/consumers still observe a normal user turn item transition.
let turn_item = TurnItem::UserMessage(UserMessageItem::new(input));
sess.emit_turn_item_started(turn_context.as_ref(), &turn_item)
.await;
sess.emit_turn_item_completed(turn_context.as_ref(), turn_item)
.await;
sess.ensure_rollout_materialized().await;
}
PreTurnCompactionOutcome::NotNeeded => {
if !pre_turn_context_items.is_empty() {
sess.record_conversation_items(turn_context, pre_turn_context_items)
.await;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), input, response_item)
.await;
}
#[cfg(test)]
PreTurnCompactionOutcome::CompactedWithoutIncomingItems => {
// Reserved path for future models that compact pre-turn history without incoming turn
// items; reseed canonical initial context above the incoming user message.
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
if !initial_context.is_empty() {
sess.record_conversation_items(turn_context, &initial_context)
.await;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), input, response_item)
.await;
}
}
Ok(())
}
/// Runs pre-turn auto-compaction with incoming turn context + user message included.
async fn run_pre_turn_auto_compaction_if_needed(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_limit: i64,
incoming_turn_items: &[ResponseItem],
) -> CodexResult<PreTurnCompactionOutcome> {
let total_usage_tokens = sess.get_total_token_usage().await;
let incoming_items_tokens_estimate = incoming_turn_items
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
if !is_projected_submission_over_auto_compact_limit(
total_usage_tokens,
incoming_items_tokens_estimate,
auto_compact_limit,
) {
return Ok(PreTurnCompactionOutcome::NotNeeded);
}
let compact_result = run_auto_compact(
sess,
turn_context,
AutoCompactCallsite::PreTurnIncludingIncomingUserMessage,
TurnContextReinjection::ReinjectAboveLastRealUser,
Some(incoming_turn_items.to_vec()),
)
.await;
if let Err(err) = compact_result {
if matches!(err, CodexErr::ContextWindowExceeded) {
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?AutoCompactCallsite::PreTurnIncludingIncomingUserMessage,
incoming_items_tokens_estimate,
auto_compact_limit,
reason = "pre-turn compaction exceeded context window",
"incoming user/context is too large for pre-turn auto-compaction flow"
);
}
return Err(err);
}
Ok(PreTurnCompactionOutcome::CompactedWithIncomingItems)
}
fn is_projected_submission_over_auto_compact_limit(
total_usage_tokens: i64,
incoming_user_tokens_estimate: i64,
auto_compact_limit: i64,
) -> bool {
if auto_compact_limit == i64::MAX {
return false;
}
total_usage_tokens.saturating_add(incoming_user_tokens_estimate) >= auto_compact_limit
}
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let result = if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await
} else {
run_inline_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await
};
if let Err(err) = &result {
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
compact_error = %err,
"auto compaction failed"
);
}
result
}
fn filter_connectors_for_input(
@@ -4996,7 +5199,7 @@ async fn try_run_sampling_request(
ResponseEvent::Completed {
response_id: _,
token_usage,
can_append: _,
..
} => {
if let Some(state) = plan_mode_state.as_mut() {
flush_proposed_plan_segments_all(&sess, &turn_context, state).await;
@@ -5208,6 +5411,151 @@ mod tests {
}
}
#[test]
fn pre_turn_projection_uses_incoming_user_tokens_for_compaction() {
assert!(is_projected_submission_over_auto_compact_limit(90, 15, 100));
assert!(!is_projected_submission_over_auto_compact_limit(90, 9, 100));
}
#[test]
fn pre_turn_projection_does_not_compact_with_unbounded_limit() {
assert!(!is_projected_submission_over_auto_compact_limit(
i64::MAX - 1,
100,
i64::MAX,
));
}
#[test]
fn post_compaction_projection_triggers_error_when_still_over_limit() {
assert!(is_projected_submission_over_auto_compact_limit(95, 10, 100));
assert!(is_projected_submission_over_auto_compact_limit(
100, 10, 100
));
assert!(!is_projected_submission_over_auto_compact_limit(
80, 10, 100
));
}
#[tokio::test]
async fn reserved_compacted_without_incoming_items_records_initial_context_and_prompt() {
let (session, turn_context) = make_session_and_context().await;
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let stale_pre_turn_context_items = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "stale context diff".to_string(),
}],
end_turn: None,
phase: None,
}];
persist_pre_turn_items_for_compaction_outcome(
&session,
&turn_context,
PreTurnCompactionOutcome::CompactedWithoutIncomingItems,
&stale_pre_turn_context_items,
&input,
response_item.clone(),
)
.await;
let mut expected = session.build_initial_context(turn_context.as_ref()).await;
expected.push(response_item);
let actual = session.clone_history().await.raw_items().to_vec();
assert_eq!(actual, expected);
}
#[tokio::test]
async fn compacted_with_incoming_items_emits_lifecycle_without_history_writes() {
let (session, turn_context) = make_session_and_context().await;
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let stale_pre_turn_context_items = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "stale context diff".to_string(),
}],
end_turn: None,
phase: None,
}];
persist_pre_turn_items_for_compaction_outcome(
&session,
&turn_context,
PreTurnCompactionOutcome::CompactedWithIncomingItems,
&stale_pre_turn_context_items,
&input,
response_item,
)
.await;
let actual = session.clone_history().await.raw_items().to_vec();
assert_eq!(actual, Vec::<ResponseItem>::new());
}
#[test]
fn estimate_user_input_token_count_is_positive_for_text_input() {
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
let response_input_item = ResponseInputItem::from(input);
let response_item: ResponseItem = response_input_item.into();
let estimated_tokens = estimate_item_token_count(&response_item);
assert!(estimated_tokens > 0);
}
#[test]
fn estimate_user_input_token_count_ignores_skill_and_mention_payload_lengths() {
let short = vec![
UserInput::Skill {
name: "s".to_string(),
path: PathBuf::from("/s"),
},
UserInput::Mention {
name: "m".to_string(),
path: "app://m".to_string(),
},
];
let long = vec![
UserInput::Skill {
name: "very-long-skill-name-that-should-not-affect-prompt-serialization"
.to_string(),
path: PathBuf::from(
"/very/long/skill/path/that/should/not/affect/prompt/serialization/SKILL.md",
),
},
UserInput::Mention {
name: "very-long-mention-name-that-should-not-affect-prompt-serialization"
.to_string(),
path: "app://very-long-connector-path-that-should-not-affect-prompt-serialization"
.to_string(),
},
];
let short_response_input_item = ResponseInputItem::from(short);
let long_response_input_item = ResponseInputItem::from(long);
let short_response_item: ResponseItem = short_response_input_item.into();
let long_response_item: ResponseItem = long_response_input_item.into();
let short_tokens = estimate_item_token_count(&short_response_item);
let long_tokens = estimate_item_token_count(&long_response_item);
assert_eq!(short_tokens, long_tokens);
}
fn make_connector(id: &str, name: &str) -> AppInfo {
AppInfo {
id: id.to_string(),
@@ -6969,8 +7317,8 @@ mod tests {
.clone()
.for_prompt(&reconstruction_turn.model_info.input_modalities);
let user_messages1 = collect_user_messages(&snapshot1);
let rebuilt1 =
compact::build_compacted_history(initial_context.clone(), &user_messages1, summary1);
let mut rebuilt1 = initial_context.clone();
rebuilt1.extend(compact::build_compacted_history(&user_messages1, summary1));
live_history.replace(rebuilt1);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary1.to_string(),
@@ -7012,8 +7360,8 @@ mod tests {
.clone()
.for_prompt(&reconstruction_turn.model_info.input_modalities);
let user_messages2 = collect_user_messages(&snapshot2);
let rebuilt2 =
compact::build_compacted_history(initial_context.clone(), &user_messages2, summary2);
let mut rebuilt2 = initial_context.clone();
rebuilt2.extend(compact::build_compacted_history(&user_messages2, summary2));
live_history.replace(rebuilt2);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary2.to_string(),

View File

@@ -7,6 +7,7 @@ use crate::client_common::ResponseEvent;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::CompactedItem;
@@ -32,6 +33,32 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum AutoCompactCallsite {
/// Pre-turn auto-compaction where the incoming turn context + user message are included in
/// the compaction request.
PreTurnIncludingIncomingUserMessage,
/// Reserved pre-turn auto-compaction strategy that compacts from the end of the previous turn
/// only, excluding incoming turn context + user message. This is currently unused by the
/// default pre-turn flow and retained for future model-specific strategies.
#[allow(dead_code)]
PreTurnExcludingIncomingUserMessage,
/// Mid-turn compaction between assistant responses in a follow-up loop.
MidTurnContinuation,
}
/// Controls whether compacted-history processing should reinsert canonical turn context.
///
/// When callers exclude incoming user/context from the compaction request, they should typically
/// set reinjection to `Skip` and append canonical context together with the next user message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TurnContextReinjection {
/// Insert canonical context immediately above the last real user message in compacted history.
ReinjectAboveLastRealUser,
/// Do not reinsert canonical context while processing compacted history.
Skip,
}
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
provider.is_openai()
}
@@ -39,6 +66,9 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
@@ -47,7 +77,15 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input).await?;
run_compact_task_inner(
sess,
turn_context,
input,
Some(auto_compact_callsite),
turn_context_reinjection,
incoming_items,
)
.await?;
Ok(())
}
@@ -62,13 +100,24 @@ pub(crate) async fn run_compact_task(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await
run_compact_task_inner(
sess,
turn_context,
input,
None,
TurnContextReinjection::Skip,
None,
)
.await
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
auto_compact_callsite: Option<AutoCompactCallsite>,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
@@ -76,6 +125,15 @@ async fn run_compact_task_inner(
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
if let Some(incoming_items) = incoming_items.as_ref() {
history.record_items(incoming_items.iter(), turn_context.truncation_policy);
}
if !history.raw_items().iter().any(is_user_turn_boundary) {
// Nothing to compact: do not rewrite history when there is no user-turn boundary.
sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
return Ok(());
}
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
@@ -151,8 +209,11 @@ async fn run_compact_task_inner(
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
// Trim from the beginning to preserve cache (prefix-based) and keep recent
// messages intact.
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
@@ -161,8 +222,12 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
compact_error = %e,
"compaction failed after history truncation could not proceed"
);
return Err(e);
}
Err(e) => {
@@ -177,11 +242,16 @@ async fn run_compact_task_inner(
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
retries,
max_retries,
compact_error = %e,
"compaction failed after retry exhaustion"
);
return Err(e);
}
}
}
@@ -191,9 +261,32 @@ async fn run_compact_task_inner(
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(history_items);
let incoming_user_items = match incoming_items.as_ref() {
Some(items) => items
.iter()
.filter(|item| real_user_message_text(item).is_some())
.cloned()
.collect(),
None => Vec::new(),
};
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let initial_context = match turn_context_reinjection {
TurnContextReinjection::ReinjectAboveLastRealUser => {
sess.build_initial_context(turn_context.as_ref()).await
}
TurnContextReinjection::Skip => Vec::new(),
};
let compacted_history = build_compacted_history_with_limit(
&user_messages,
&incoming_user_items,
&summary_text,
COMPACT_USER_MESSAGE_MAX_TOKENS,
);
let mut new_history = process_compacted_history(
compacted_history,
&initial_context,
turn_context_reinjection,
);
let ghost_snapshots: Vec<ResponseItem> = history_items
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
@@ -205,7 +298,7 @@ async fn run_compact_task_inner(
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
replacement_history: Some(sess.clone_history().await.raw_items().to_vec()),
});
sess.persist_rollout_items(&[rollout_item]).await;
@@ -260,28 +353,39 @@ pub(crate) fn is_summary_message(message: &str) -> bool {
pub(crate) fn process_compacted_history(
mut compacted_history: Vec<ResponseItem>,
initial_context: &[ResponseItem],
turn_context_reinjection: TurnContextReinjection,
) -> Vec<ResponseItem> {
// Keep only model-visible transcript items that we allow from remote compaction output.
compacted_history.retain(should_keep_compacted_history_item);
let initial_context = initial_context.to_vec();
// Re-inject canonical context from the current session since we stripped it
// from the pre-compaction history. Keep it right before the last user
// message so older user messages remain earlier in the transcript.
if let Some(last_user_index) = compacted_history.iter().rposition(|item| {
matches!(
crate::event_mapping::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
)
}) {
compacted_history.splice(last_user_index..last_user_index, initial_context);
} else {
compacted_history.extend(initial_context);
match turn_context_reinjection {
TurnContextReinjection::ReinjectAboveLastRealUser => {
// Insert immediately above the last real user message so turn context applies to that
// user input rather than an earlier turn.
if let Some(insertion_index) = compacted_history
.iter()
.rposition(|item| real_user_message_text(item).is_some())
{
compacted_history
.splice(insertion_index..insertion_index, initial_context.to_vec());
}
}
TurnContextReinjection::Skip => {}
}
compacted_history
}
fn real_user_message_text(item: &ResponseItem) -> Option<String> {
match crate::event_mapping::parse_turn_item(item) {
Some(TurnItem::UserMessage(user_message)) => {
let message = user_message.message();
(!is_summary_message(&message)).then_some(message)
}
_ => None,
}
}
/// Returns whether an item from remote compaction output should be preserved.
///
/// Called while processing the model-provided compacted transcript, before we
@@ -307,24 +411,24 @@ fn should_keep_compacted_history_item(item: &ResponseItem) -> bool {
}
pub(crate) fn build_compacted_history(
initial_context: Vec<ResponseItem>,
user_messages: &[String],
summary_text: &str,
) -> Vec<ResponseItem> {
build_compacted_history_with_limit(
initial_context,
user_messages,
&[],
summary_text,
COMPACT_USER_MESSAGE_MAX_TOKENS,
)
}
fn build_compacted_history_with_limit(
mut history: Vec<ResponseItem>,
user_messages: &[String],
incoming_user_items: &[ResponseItem],
summary_text: &str,
max_tokens: usize,
) -> Vec<ResponseItem> {
let mut history = Vec::new();
let mut selected_messages: Vec<String> = Vec::new();
if max_tokens > 0 {
let mut remaining = max_tokens;
@@ -357,6 +461,8 @@ fn build_compacted_history_with_limit(
});
}
history.extend(incoming_user_items.iter().cloned());
let summary_text = if summary_text.is_empty() {
"(no summary available)".to_string()
} else {
@@ -535,8 +641,8 @@ do things
let max_tokens = 16;
let big = "word ".repeat(200);
let history = super::build_compacted_history_with_limit(
Vec::new(),
std::slice::from_ref(&big),
&[],
"SUMMARY",
max_tokens,
);
@@ -572,11 +678,10 @@ do things
#[test]
fn build_token_limited_compacted_history_appends_summary_message() {
let initial_context: Vec<ResponseItem> = Vec::new();
let user_messages = vec!["first user message".to_string()];
let summary_text = "summary text";
let history = build_compacted_history(initial_context, &user_messages, summary_text);
let history = build_compacted_history(&user_messages, summary_text);
assert!(
!history.is_empty(),
"expected compacted history to include summary"
@@ -592,6 +697,55 @@ do things
assert_eq!(summary, summary_text);
}
#[test]
fn build_compacted_history_preserves_incoming_user_item_structure() {
let preserved_user_item = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![
ContentItem::InputImage {
image_url: "data:image/png;base64,AAAA".to_string(),
},
ContentItem::InputText {
text: "latest user with image".to_string(),
},
],
end_turn: None,
phase: None,
};
let history = super::build_compacted_history_with_limit(
&["older user".to_string()],
std::slice::from_ref(&preserved_user_item),
"SUMMARY",
128,
);
let expected = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
preserved_user_item,
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(history, expected);
}
#[test]
fn process_compacted_history_replaces_developer_messages() {
let compacted_history = vec![
@@ -657,7 +811,11 @@ do things
},
];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -766,7 +924,11 @@ keep me updated
},
];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -902,7 +1064,11 @@ keep me updated
phase: None,
}];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -967,7 +1133,11 @@ keep me updated
phase: None,
}];
let refreshed = process_compacted_history(compacted_history, &initial_context);
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
@@ -1008,4 +1178,320 @@ keep me updated
];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_pre_turn_places_summary_last() {
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_preserves_summary_order() {
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nolder summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "newer user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nlatest summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "assistant after latest summary".to_string(),
}],
end_turn: None,
phase: None,
},
];
let refreshed =
process_compacted_history(compacted_history, &[], TurnContextReinjection::Skip);
let expected = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nolder summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "newer user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nlatest summary"),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "assistant after latest summary".to_string(),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_skips_context_insertion_without_real_user_message() {
let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::Skip,
);
let expected = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_reinject_noops_without_real_user_message() {
let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
}];
assert_eq!(refreshed, expected);
}
#[test]
fn process_compacted_history_mid_turn_without_orphan_user_places_summary_last() {
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
let initial_context = vec![ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
}];
let refreshed = process_compacted_history(
compacted_history,
&initial_context,
TurnContextReinjection::ReinjectAboveLastRealUser,
);
let expected = vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "fresh permissions".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "older user".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("{SUMMARY_PREFIX}\nsummary text"),
}],
end_turn: None,
phase: None,
},
];
assert_eq!(refreshed, expected);
}
}

View File

@@ -3,10 +3,14 @@ use std::sync::Arc;
use crate::Prompt;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::compact::AutoCompactCallsite;
use crate::compact::TurnContextReinjection;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_item_token_count;
use crate::context_manager::estimate_response_item_model_visible_bytes;
use crate::context_manager::is_codex_generated_item;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::CompactedItem;
@@ -24,8 +28,19 @@ use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
// Controls whether canonical turn context should be reinserted into compacted history.
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context).await?;
run_remote_compact_task_inner(
&sess,
&turn_context,
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await?;
Ok(())
}
@@ -40,18 +55,40 @@ pub(crate) async fn run_remote_compact_task(
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context).await
run_remote_compact_task_inner(
&sess,
&turn_context,
AutoCompactCallsite::PreTurnExcludingIncomingUserMessage,
// Manual `/compact` should not reinsert turn context into compacted history; we reseed
// canonical initial context before the next user turn.
TurnContextReinjection::Skip,
None,
)
.await
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
if let Err(err) = run_remote_compact_task_inner_impl(
sess,
turn_context,
auto_compact_callsite,
turn_context_reinjection,
incoming_items,
)
.await
{
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
compact_error = %err,
"remote compaction task failed"
);
sess.send_event(turn_context, event).await;
return Err(err);
}
Ok(())
@@ -60,6 +97,9 @@ async fn run_remote_compact_task_inner(
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
auto_compact_callsite: AutoCompactCallsite,
turn_context_reinjection: TurnContextReinjection,
incoming_items: Option<Vec<ResponseItem>>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
@@ -70,10 +110,21 @@ async fn run_remote_compact_task_inner_impl(
&mut history,
turn_context.as_ref(),
&base_instructions,
incoming_items.as_deref(),
);
if let Some(incoming_items) = incoming_items {
history.record_items(incoming_items.iter(), turn_context.truncation_policy);
}
if !history.raw_items().iter().any(is_user_turn_boundary) {
// Nothing to compact: do not rewrite history when there is no user-turn boundary.
sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
return Ok(());
}
if deleted_items > 0 {
info!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
deleted_items,
"trimmed history items before remote compaction"
);
@@ -110,6 +161,7 @@ async fn run_remote_compact_task_inner_impl(
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
log_remote_compact_failure(
turn_context,
auto_compact_callsite,
&compact_request_log_data,
total_usage_breakdown,
&err,
@@ -118,7 +170,7 @@ async fn run_remote_compact_task_inner_impl(
})
.await?;
new_history = sess
.process_compacted_history(turn_context, new_history)
.process_compacted_history(turn_context, new_history, turn_context_reinjection)
.await;
if !ghost_snapshots.is_empty() {
@@ -163,12 +215,14 @@ fn build_compact_request_log_data(
fn log_remote_compact_failure(
turn_context: &TurnContext,
auto_compact_callsite: AutoCompactCallsite,
log_data: &CompactRequestLogData,
total_usage_breakdown: TotalTokenUsageBreakdown,
err: &CodexErr,
) {
error!(
turn_id = %turn_context.sub_id,
auto_compact_callsite = ?auto_compact_callsite,
last_api_response_total_tokens = total_usage_breakdown.last_api_response_total_tokens,
all_history_items_model_visible_bytes = total_usage_breakdown.all_history_items_model_visible_bytes,
estimated_tokens_of_items_added_since_last_successful_api_response = total_usage_breakdown.estimated_tokens_of_items_added_since_last_successful_api_response,
@@ -184,15 +238,37 @@ fn trim_function_call_history_to_fit_context_window(
history: &mut ContextManager,
turn_context: &TurnContext,
base_instructions: &BaseInstructions,
incoming_items: Option<&[ResponseItem]>,
) -> usize {
let Some(context_window) = turn_context.model_context_window() else {
return 0;
};
let incoming_items_tokens = incoming_items
.unwrap_or_default()
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
trim_codex_generated_tail_items_to_fit_context_window(
history,
context_window,
base_instructions,
incoming_items_tokens,
)
}
fn trim_codex_generated_tail_items_to_fit_context_window(
history: &mut ContextManager,
context_window: i64,
base_instructions: &BaseInstructions,
incoming_items_tokens: i64,
) -> usize {
let mut deleted_items = 0usize;
let Some(context_window) = turn_context.model_context_window() else {
return deleted_items;
};
while history
.estimate_token_count_with_base_instructions(base_instructions)
.is_some_and(|estimated_tokens| estimated_tokens > context_window)
.is_some_and(|estimated_tokens| {
estimated_tokens.saturating_add(incoming_items_tokens) > context_window
})
{
let Some(last_item) = history.raw_items().last() else {
break;
@@ -208,3 +284,90 @@ fn trim_function_call_history_to_fit_context_window(
deleted_items
}
#[cfg(test)]
mod tests {
use super::*;
use crate::truncate::TruncationPolicy;
use codex_protocol::models::ContentItem;
use pretty_assertions::assert_eq;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn developer_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
#[test]
fn trim_accounts_for_incoming_items_tokens() {
let base_instructions = BaseInstructions {
text: String::new(),
};
let incoming_items = vec![user_message(
"INCOMING_USER_MESSAGE_THAT_TIPS_OVER_THE_WINDOW",
)];
let incoming_items_tokens = incoming_items
.iter()
.map(estimate_item_token_count)
.fold(0_i64, i64::saturating_add);
assert!(
incoming_items_tokens > 0,
"expected incoming item token estimate to be positive"
);
let mut history = ContextManager::new();
let history_items = vec![
user_message("USER_ONE"),
developer_message("TRAILING_CODEX_GENERATED_CONTEXT"),
];
history.record_items(history_items.iter(), TruncationPolicy::Tokens(10_000));
let history_tokens = history
.estimate_token_count_with_base_instructions(&base_instructions)
.unwrap_or_default();
let context_window = history_tokens
.saturating_add(incoming_items_tokens)
.saturating_sub(1);
let mut without_incoming_projection = history.clone();
let deleted_without_incoming = trim_codex_generated_tail_items_to_fit_context_window(
&mut without_incoming_projection,
context_window,
&base_instructions,
0,
);
assert_eq!(
deleted_without_incoming, 0,
"history-only projection should not trim when currently under the limit"
);
let deleted_with_incoming = trim_codex_generated_tail_items_to_fit_context_window(
&mut history,
context_window,
&base_instructions,
incoming_items_tokens,
);
assert_eq!(
deleted_with_incoming, 1,
"incoming projection should trim trailing codex-generated history to fit pre-turn request"
);
assert_eq!(history.raw_items(), vec![user_message("USER_ONE")]);
}
}

View File

@@ -395,7 +395,7 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize {
.saturating_sub(650)
}
fn estimate_item_token_count(item: &ResponseItem) -> i64 {
pub(crate) fn estimate_item_token_count(item: &ResponseItem) -> i64 {
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
approx_tokens_from_byte_count_i64(model_visible_bytes)
}

View File

@@ -3,6 +3,7 @@ mod normalize;
pub(crate) use history::ContextManager;
pub(crate) use history::TotalTokenUsageBreakdown;
pub(crate) use history::estimate_item_token_count;
pub(crate) use history::estimate_response_item_model_visible_bytes;
pub(crate) use history::is_codex_generated_item;
pub(crate) use history::is_user_turn_boundary;

View File

@@ -3,6 +3,8 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::context_manager::is_user_turn_boundary;
use crate::protocol::EventMsg;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::user_input::UserInput;
@@ -25,20 +27,48 @@ impl SessionTask for CompactTask {
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
let has_user_turn_boundary = session
.clone_history()
.await
.raw_items()
.iter()
.any(is_user_turn_boundary);
if crate::compact::should_use_remote_compact_task(&ctx.provider) {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "remote")],
);
let _ = crate::compact_remote::run_remote_compact_task(session, ctx).await;
if let Err(err) =
crate::compact_remote::run_remote_compact_task(session.clone(), ctx.clone()).await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
session.send_event(&ctx, event).await;
} else if has_user_turn_boundary {
// Manual `/compact` rewrites history to compacted transcript items and drops
// per-turn context entries. Force initial-context reseeding on the next user turn.
session.mark_initial_context_unseeded_for_next_turn().await;
}
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "local")],
);
let _ = crate::compact::run_compact_task(session, ctx, input).await;
if let Err(err) =
crate::compact::run_compact_task(session.clone(), ctx.clone(), input).await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running local compact task".to_string())),
);
session.send_event(&ctx, event).await;
} else if has_user_turn_boundary {
// Manual `/compact` rewrites history to compacted transcript items and drops
// per-turn context entries. Force initial-context reseeding on the next user turn.
session.mark_initial_context_unseeded_for_next_turn().await;
}
}
None

View File

@@ -8,6 +8,7 @@ use crate::codex::run_turn;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_otel::OtelManager;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::user_input::UserInput;
use futures::future::BoxFuture;
@@ -24,12 +25,14 @@ type PrewarmedSessionTask = JoinHandle<Option<ModelClientSession>>;
pub(crate) struct RegularTask {
prewarmed_session_task: Mutex<Option<PrewarmedSessionTask>>,
pre_turn_context_items: Vec<ResponseItem>,
}
impl Default for RegularTask {
fn default() -> Self {
Self {
prewarmed_session_task: Mutex::new(None),
pre_turn_context_items: Vec::new(),
}
}
}
@@ -58,9 +61,15 @@ impl RegularTask {
Self {
prewarmed_session_task: Mutex::new(Some(prewarmed_session_task)),
pre_turn_context_items: Vec::new(),
}
}
pub(crate) fn with_pre_turn_context_items(mut self, items: Vec<ResponseItem>) -> Self {
self.pre_turn_context_items = items;
self
}
async fn take_prewarmed_session(&self) -> Option<ModelClientSession> {
let prewarmed_session_task = self
.prewarmed_session_task
@@ -104,6 +113,7 @@ impl SessionTask for RegularTask {
sess,
ctx,
input,
self.pre_turn_context_items.clone(),
prewarmed_client_session,
cancellation_token,
)

View File

@@ -25,6 +25,7 @@ use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use std::collections::VecDeque;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
@@ -39,6 +40,7 @@ use core_test_support::responses::sse_failed;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use wiremock::MockServer;
// --- Test helpers -----------------------------------------------------------
@@ -110,6 +112,136 @@ fn non_openai_model_provider(server: &MockServer) -> ModelProviderInfo {
provider
}
fn normalize_shape_text(text: &str) -> String {
if text == SUMMARIZATION_PROMPT {
return "<SUMMARIZATION_PROMPT>".to_string();
}
if let Some(summary) = text.strip_prefix(format!("{SUMMARY_PREFIX}\n").as_str()) {
return format!("<SUMMARY:{summary}>");
}
if text.starts_with("# AGENTS.md instructions for ") {
return "<AGENTS_MD>".to_string();
}
if text.starts_with("<environment_context>") {
return "<ENVIRONMENT_CONTEXT>".to_string();
}
if text.contains("<permissions instructions>") {
return "<PERMISSIONS_INSTRUCTIONS>".to_string();
}
text.replace('\n', "\\n")
}
fn message_text_for_shape(item: &Value) -> String {
item.get("content")
.and_then(Value::as_array)
.map(|content| {
content
.iter()
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
.map(normalize_shape_text)
.collect::<Vec<String>>()
.join(" | ")
})
.filter(|text| !text.is_empty())
.unwrap_or_else(|| "<NO_TEXT>".to_string())
}
fn request_input_shape(request: &ResponsesRequest) -> String {
request
.input()
.into_iter()
.enumerate()
.map(|(idx, item)| {
let Some(item_type) = item.get("type").and_then(Value::as_str) else {
return format!("{idx:02}:<MISSING_TYPE>");
};
match item_type {
"message" => {
let role = item.get("role").and_then(Value::as_str).unwrap_or("unknown");
let text = message_text_for_shape(&item);
format!("{idx:02}:message/{role}:{text}")
}
"function_call" => {
let name = item.get("name").and_then(Value::as_str).unwrap_or("unknown");
let normalized_name = if name == DUMMY_FUNCTION_NAME {
"<TOOL_CALL>"
} else {
name
};
format!("{idx:02}:function_call/{normalized_name}")
}
"function_call_output" => {
let output = item
.get("output")
.and_then(Value::as_str)
.map(|output| {
if output.starts_with("unsupported call: ")
|| output.starts_with("unsupported custom tool call: ")
{
"<TOOL_ERROR_OUTPUT>".to_string()
} else {
normalize_shape_text(output)
}
})
.unwrap_or_else(|| "<NON_STRING_OUTPUT>".to_string());
format!("{idx:02}:function_call_output:{output}")
}
"local_shell_call" => {
let command = item
.get("action")
.and_then(|action| action.get("command"))
.and_then(Value::as_array)
.map(|parts| {
parts
.iter()
.filter_map(Value::as_str)
.collect::<Vec<&str>>()
.join(" ")
})
.filter(|cmd| !cmd.is_empty())
.unwrap_or_else(|| "<NO_COMMAND>".to_string());
format!("{idx:02}:local_shell_call:{command}")
}
"reasoning" => {
let summary_text = item
.get("summary")
.and_then(Value::as_array)
.and_then(|summary| summary.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.map(normalize_shape_text)
.unwrap_or_else(|| "<NO_SUMMARY>".to_string());
let has_encrypted_content = item
.get("encrypted_content")
.and_then(Value::as_str)
.is_some_and(|value| !value.is_empty());
format!(
"{idx:02}:reasoning:summary={summary_text}:encrypted={has_encrypted_content}"
)
}
"compaction" => {
let has_encrypted_content = item
.get("encrypted_content")
.and_then(Value::as_str)
.is_some_and(|value| !value.is_empty());
format!("{idx:02}:compaction:encrypted={has_encrypted_content}")
}
other => format!("{idx:02}:{other}"),
}
})
.collect::<Vec<String>>()
.join("\n")
}
fn sectioned_request_shapes(sections: &[(&str, &ResponsesRequest)]) -> String {
sections
.iter()
.map(|(title, request)| format!("## {title}\n{}", request_input_shape(request)))
.collect::<Vec<String>>()
.join("\n\n")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn summarize_context_three_requests_and_instructions() {
skip_if_no_network!();
@@ -323,8 +455,15 @@ async fn manual_compact_uses_custom_prompt() {
skip_if_no_network!();
let server = start_mock_server().await;
let sse_stream = sse(vec![ev_completed("r1")]);
let response_mock = mount_sse_once(&server, sse_stream).await;
let first_turn = sse(vec![
ev_assistant_message("m0", FIRST_REPLY),
ev_completed_with_tokens("r0", 80),
]);
let compact_turn = sse(vec![
ev_assistant_message("m1", SUMMARY_TEXT),
ev_completed_with_tokens("r1", 100),
]);
let request_log = mount_sse_sequence(&server, vec![first_turn, compact_turn]).await;
let custom_prompt = "Use this compact prompt instead";
@@ -339,6 +478,18 @@ async fn manual_compact_uses_custom_prompt() {
.expect("create conversation")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user turn");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("trigger compact");
let warning_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await;
let EventMsg::Warning(WarningEvent { message }) = warning_event else {
@@ -347,7 +498,13 @@ async fn manual_compact_uses_custom_prompt() {
assert_eq!(message, COMPACT_WARNING_MESSAGE);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let body = response_mock.single_request().body_json();
let requests = request_log.requests();
assert_eq!(
requests.len(),
2,
"expected first turn and compact requests"
);
let body = requests[1].body_json();
let input = body
.get("input")
@@ -390,6 +547,10 @@ async fn manual_compact_emits_api_and_local_token_usage_events() {
let server = start_mock_server().await;
let sse_first_turn = sse(vec![
ev_assistant_message("m0", FIRST_REPLY),
ev_completed_with_tokens("r0", 80),
]);
// Compact run where the API reports zero tokens in usage. Our local
// estimator should still compute a non-zero context size for the compacted
// history.
@@ -397,7 +558,7 @@ async fn manual_compact_emits_api_and_local_token_usage_events() {
ev_assistant_message("m1", SUMMARY_TEXT),
ev_completed_with_tokens("r1", 0),
]);
mount_sse_once(&server, sse_compact).await;
mount_sse_sequence(&server, vec![sse_first_turn, sse_compact]).await;
let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex().with_config(move |config| {
@@ -406,39 +567,41 @@ async fn manual_compact_emits_api_and_local_token_usage_events() {
});
let codex = builder.build(&server).await.unwrap().codex;
// Trigger manual compact and collect TokenCount events for the compact turn.
codex.submit(Op::Compact).await.unwrap();
// First TokenCount: from the compact API call (usage.total_tokens = 0).
let first = wait_for_event_match(&codex, |ev| match ev {
EventMsg::TokenCount(tc) => tc
.info
.as_ref()
.map(|info| info.last_token_usage.total_tokens),
_ => None,
})
.await;
// Second TokenCount: from the local post-compaction estimate.
let last = wait_for_event_match(&codex, |ev| match ev {
EventMsg::TokenCount(tc) => tc
.info
.as_ref()
.map(|info| info.last_token_usage.total_tokens),
_ => None,
})
.await;
// Ensure the compact task itself completes.
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(
first, 0,
"expected first TokenCount from compact API usage to be zero"
// Trigger manual compact and collect TokenCount events for the compact turn.
codex.submit(Op::Compact).await.unwrap();
let mut token_totals = Vec::new();
loop {
let event = wait_for_event(&codex, |_| true).await;
match event {
EventMsg::TokenCount(tc) => {
if let Some(info) = tc.info {
token_totals.push(info.last_token_usage.total_tokens);
}
}
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
assert!(
token_totals.contains(&0),
"expected compact turn to emit TokenCount usage.total_tokens = 0"
);
assert!(
last > 0,
"second TokenCount should reflect a non-zero estimated context size after compaction"
token_totals.iter().any(|total| *total > 0),
"expected compact turn to emit non-zero estimated local context size after compaction"
);
}
@@ -1797,6 +1960,79 @@ async fn manual_compact_retries_after_context_window_error() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_non_context_failure_retries_then_emits_task_error() {
skip_if_no_network!();
let server = start_mock_server().await;
let user_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
]);
let compact_failed_1 = sse_failed(
"resp-fail-1",
"server_error",
"temporary compact failure one",
);
let compact_failed_2 = sse_failed(
"resp-fail-2",
"server_error",
"temporary compact failure two",
);
mount_sse_sequence(&server, vec![user_turn, compact_failed_1, compact_failed_2]).await;
let mut model_provider = non_openai_model_provider(&server);
model_provider.stream_max_retries = Some(1);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200_000);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "first turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("trigger compact");
let reconnect_message = wait_for_event_match(&codex, |event| match event {
EventMsg::StreamError(stream_error) => Some(stream_error.message.clone()),
_ => None,
})
.await;
assert!(
reconnect_message.contains("Reconnecting... 1/1"),
"expected reconnect stream error message, got {reconnect_message}"
);
let task_error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
assert!(
task_error_message.contains("Error running local compact task"),
"expected local compact task error prefix, got {task_error_message}"
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_twice_preserves_latest_user_messages() {
skip_if_no_network!();
@@ -1960,15 +2196,27 @@ async fn manual_compact_twice_preserves_latest_user_messages() {
.unwrap_or_else(|| panic!("final turn request missing for {final_user_message}"))
.input()
.into_iter()
.filter(|item| {
let role = item
.get("role")
.and_then(|v| v.as_str())
.unwrap_or_default();
let text = item
.get("content")
.and_then(|v| v.as_array())
.and_then(|v| v.first())
.and_then(|v| v.get("text"))
.and_then(|v| v.as_str())
.unwrap_or_default();
if role == "developer" {
return false;
}
!(text.starts_with("# AGENTS.md instructions for ")
|| text.starts_with("<environment_context>")
|| text.starts_with("<turn_aborted>"))
})
.collect::<VecDeque<_>>();
// Permissions developer message
final_output.pop_front();
// User instructions (project docs/skills)
final_output.pop_front();
// Environment context
final_output.pop_front();
let _ = final_output
.iter_mut()
.map(drop_call_id)
@@ -2434,3 +2682,428 @@ async fn auto_compact_runs_when_reasoning_header_clears_between_turns() {
"remote compaction should run once after the reasoning header clears"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_pre_turn_compaction_including_incoming_user_message() {
skip_if_no_network!();
let server = start_mock_server().await;
let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 60),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", "SECOND_REPLY"),
ev_completed_with_tokens("r2", 500),
]);
let sse3 = sse(vec![
ev_assistant_message("m3", "PRE_TURN_SUMMARY"),
ev_completed_with_tokens("r3", 100),
]);
let sse4 = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 80),
]);
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
})
.build(&server)
.await
.expect("build codex")
.codex;
for user in ["USER_ONE", "USER_TWO"] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
let image_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGNgYAAAAAMAASsJTYQAAAAASUVORK5CYII="
.to_string();
codex
.submit(Op::UserInput {
items: vec![
UserInput::Image {
image_url: image_url.clone(),
},
UserInput::Text {
text: "USER_THREE".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 4, "expected user, user, compact, follow-up");
let compact_shape = request_input_shape(&requests[2]);
let follow_up_shape = request_input_shape(&requests[3]);
insta::assert_snapshot!(
"pre_turn_compaction_including_incoming_shapes",
sectioned_request_shapes(&[
("Local Compaction Request", &requests[2]),
("Local Post-Compaction History Request", &requests[3]),
])
);
assert!(
compact_shape.contains("<SUMMARIZATION_PROMPT>"),
"expected compact request to include summarization prompt"
);
assert!(
compact_shape.contains("USER_THREE"),
"expected compact request to include incoming user message"
);
let follow_up_has_incoming_image = requests[3].inputs_of_type("message").iter().any(|item| {
if item.get("role").and_then(Value::as_str) != Some("user") {
return false;
}
let Some(content) = item.get("content").and_then(Value::as_array) else {
return false;
};
let has_user_text = content.iter().any(|span| {
span.get("type").and_then(Value::as_str) == Some("input_text")
&& span.get("text").and_then(Value::as_str) == Some("USER_THREE")
});
let has_image = content.iter().any(|span| {
span.get("type").and_then(Value::as_str) == Some("input_image")
&& span.get("image_url").and_then(Value::as_str) == Some(image_url.as_str())
});
has_user_text && has_image
});
assert!(
follow_up_has_incoming_image,
"expected post-compaction follow-up request to keep incoming user image content"
);
assert!(
follow_up_shape.contains("<SUMMARY:PRE_TURN_SUMMARY>"),
"expected post-compaction request to include prefixed summary"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_pre_turn_compaction_context_window_exceeded() {
skip_if_no_network!();
let server = start_mock_server().await;
let first_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 500),
]);
let mut responses = vec![first_turn];
responses.extend(
(0..7).map(|_| {
sse_failed(
"compact-failed",
"context_length_exceeded",
"Your input exceeds the context window of this model. Please adjust your input and try again.",
)
}),
);
let request_log = mount_sse_sequence(&server, responses).await;
let mut model_provider = non_openai_model_provider(&server);
model_provider.stream_max_retries = Some(0);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit second user");
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(
requests.len() >= 2,
"expected first turn and at least one compaction request"
);
let include_attempt_shape = request_input_shape(&requests[1]);
insta::assert_snapshot!(
"pre_turn_compaction_context_window_exceeded_shapes",
sectioned_request_shapes(&[(
"Local Compaction Request (Including Incoming User Message)",
&requests[1]
),])
);
assert!(
include_attempt_shape.contains("USER_TWO"),
"first pre-turn attempt should include incoming user message"
);
assert!(
error_message.contains(
"Incoming user message and/or turn context is too large to fit in context window"
),
"expected context window exceeded message, got {error_message}"
);
assert!(
error_message.contains("incoming_items_tokens_estimate="),
"expected token estimate in error message, got {error_message}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_mid_turn_continuation_compaction() {
skip_if_no_network!();
let server = start_mock_server().await;
let first_turn = sse(vec![
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r1", 500),
]);
let auto_compact_turn = sse(vec![
ev_assistant_message("m2", "MID_TURN_SUMMARY"),
ev_completed_with_tokens("r2", 100),
]);
let post_compact_turn = sse(vec![
ev_assistant_message("m3", FINAL_REPLY),
ev_completed_with_tokens("r3", 80),
]);
let request_log = mount_sse_sequence(
&server,
vec![first_turn, auto_compact_turn, post_compact_turn],
)
.await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 3, "expected user, compact, follow-up");
let compact_shape = request_input_shape(&requests[1]);
let follow_up_shape = request_input_shape(&requests[2]);
insta::assert_snapshot!(
"mid_turn_compaction_shapes",
sectioned_request_shapes(&[
("Local Compaction Request", &requests[1]),
("Local Post-Compaction History Request", &requests[2]),
])
);
assert!(
compact_shape.contains("function_call_output"),
"mid-turn compaction request should include function call output"
);
assert!(
compact_shape.contains("<SUMMARIZATION_PROMPT>"),
"mid-turn compaction request should include summarization prompt"
);
assert!(
follow_up_shape.contains("<SUMMARY:MID_TURN_SUMMARY>"),
"post-mid-turn compaction request should include summary"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_manual_compact_without_previous_user_messages() {
skip_if_no_network!();
let server = start_mock_server().await;
let follow_up_turn = sse(vec![
ev_assistant_message("m1", FINAL_REPLY),
ev_completed_with_tokens("r1", 80),
]);
let request_log = mount_sse_once(&server, follow_up_turn).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex.submit(Op::Compact).await.expect("run /compact");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "AFTER_MANUAL_EMPTY_COMPACT".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit follow-up user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 1, "expected only follow-up turn request");
let follow_up_shape = request_input_shape(&requests[0]);
insta::assert_snapshot!(
"manual_compact_without_prev_user_shapes",
sectioned_request_shapes(&[("Local Post-Compaction History", &requests[0]),])
);
assert!(
!follow_up_shape.contains("<SUMMARY:MANUAL_EMPTY_SUMMARY>"),
"follow-up request should not include compact summary after no-op /compact"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_manual_compact_with_previous_user_messages() {
skip_if_no_network!();
let server = start_mock_server().await;
let first_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 80),
]);
let compact_turn = sse(vec![
ev_assistant_message("m2", "MANUAL_SUMMARY"),
ev_completed_with_tokens("r2", 90),
]);
let follow_up_turn = sse(vec![
ev_assistant_message("m3", FINAL_REPLY),
ev_completed_with_tokens("r3", 80),
]);
let request_log =
mount_sse_sequence(&server, vec![first_turn, compact_turn, follow_up_turn]).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("run /compact");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit second user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 3, "expected user, compact, follow-up");
let compact_shape = request_input_shape(&requests[1]);
let follow_up_shape = request_input_shape(&requests[2]);
insta::assert_snapshot!(
"manual_compact_with_history_shapes",
sectioned_request_shapes(&[
("Local Compaction Request", &requests[1]),
("Local Post-Compaction History Request", &requests[2]),
])
);
assert!(
compact_shape.contains("USER_ONE"),
"manual compact request should include existing user history"
);
assert!(
compact_shape.contains("<SUMMARIZATION_PROMPT>"),
"manual compact request should include summarization prompt"
);
assert!(
follow_up_shape.contains("<SUMMARY:MANUAL_SUMMARY>"),
"post-compact request should include compact summary"
);
assert!(
follow_up_shape.contains("USER_TWO"),
"post-compact request should include the latest user message"
);
}

View File

@@ -4,6 +4,7 @@ use std::fs;
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ItemCompletedEvent;
use codex_core::protocol::ItemStartedEvent;
@@ -23,6 +24,7 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
@@ -39,6 +41,151 @@ fn estimate_compact_payload_tokens(request: &responses::ResponsesRequest) -> i64
.saturating_add(approx_token_count(&request.instructions_text()))
}
const DUMMY_FUNCTION_NAME: &str = "unsupported_tool";
fn summary_with_prefix(summary: &str) -> String {
format!("{SUMMARY_PREFIX}\n{summary}")
}
fn user_message_item(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn normalize_shape_text(text: &str) -> String {
if let Some(summary) = text.strip_prefix(format!("{SUMMARY_PREFIX}\n").as_str()) {
return format!("<SUMMARY:{summary}>");
}
if text.starts_with("# AGENTS.md instructions for ") {
return "<AGENTS_MD>".to_string();
}
if text.starts_with("<environment_context>") {
return "<ENVIRONMENT_CONTEXT>".to_string();
}
if text.contains("<permissions instructions>") {
return "<PERMISSIONS_INSTRUCTIONS>".to_string();
}
text.replace('\n', "\\n")
}
fn message_text_for_shape(item: &Value) -> String {
item.get("content")
.and_then(Value::as_array)
.map(|content| {
content
.iter()
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
.map(normalize_shape_text)
.collect::<Vec<String>>()
.join(" | ")
})
.filter(|text| !text.is_empty())
.unwrap_or_else(|| "<NO_TEXT>".to_string())
}
fn request_input_shape(request: &responses::ResponsesRequest) -> String {
request
.input()
.into_iter()
.enumerate()
.map(|(idx, item)| {
let Some(item_type) = item.get("type").and_then(Value::as_str) else {
return format!("{idx:02}:<MISSING_TYPE>");
};
match item_type {
"message" => {
let role = item.get("role").and_then(Value::as_str).unwrap_or("unknown");
let text = message_text_for_shape(&item);
format!("{idx:02}:message/{role}:{text}")
}
"function_call" => {
let name = item.get("name").and_then(Value::as_str).unwrap_or("unknown");
let normalized_name = if name == DUMMY_FUNCTION_NAME {
"<TOOL_CALL>"
} else {
name
};
format!("{idx:02}:function_call/{normalized_name}")
}
"function_call_output" => {
let output = item
.get("output")
.and_then(Value::as_str)
.map(|output| {
if output.starts_with("unsupported call: ")
|| output.starts_with("unsupported custom tool call: ")
{
"<TOOL_ERROR_OUTPUT>".to_string()
} else {
normalize_shape_text(output)
}
})
.unwrap_or_else(|| "<NON_STRING_OUTPUT>".to_string());
format!("{idx:02}:function_call_output:{output}")
}
"local_shell_call" => {
let command = item
.get("action")
.and_then(|action| action.get("command"))
.and_then(Value::as_array)
.map(|parts| {
parts
.iter()
.filter_map(Value::as_str)
.collect::<Vec<&str>>()
.join(" ")
})
.filter(|cmd| !cmd.is_empty())
.unwrap_or_else(|| "<NO_COMMAND>".to_string());
format!("{idx:02}:local_shell_call:{command}")
}
"reasoning" => {
let summary_text = item
.get("summary")
.and_then(Value::as_array)
.and_then(|summary| summary.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.map(normalize_shape_text)
.unwrap_or_else(|| "<NO_SUMMARY>".to_string());
let has_encrypted_content = item
.get("encrypted_content")
.and_then(Value::as_str)
.is_some_and(|value| !value.is_empty());
format!(
"{idx:02}:reasoning:summary={summary_text}:encrypted={has_encrypted_content}"
)
}
"compaction" => {
let has_encrypted_content = item
.get("encrypted_content")
.and_then(Value::as_str)
.is_some_and(|value| !value.is_empty());
format!("{idx:02}:compaction:encrypted={has_encrypted_content}")
}
other => format!("{idx:02}:{other}"),
}
})
.collect::<Vec<String>>()
.join("\n")
}
fn sectioned_request_shapes(sections: &[(&str, &responses::ResponsesRequest)]) -> String {
sections
.iter()
.map(|(title, request)| format!("## {title}\n{}", request_input_shape(request)))
.collect::<Vec<String>>()
.join("\n\n")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -502,7 +649,7 @@ async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> {
)
.await;
let compact_mock = responses::mount_compact_json_once(
let first_compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
@@ -545,13 +692,20 @@ async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(
error_message.contains("Error running remote compact task"),
"expected compact failure error, got {error_message}"
!error_message.contains(
"Incoming user message and/or turn context is too large to fit in context window"
),
"non-context compaction failures should surface real error messages, got {error_message}"
);
assert!(
error_message.contains("invalid compact payload shape")
|| error_message.contains("invalid type: string"),
"expected remote compact parse failure to surface, got {error_message}"
);
assert_eq!(
compact_mock.requests().len(),
first_compact_mock.requests().len(),
1,
"expected exactly one remote compact attempt"
"expected first remote compact attempt with incoming items"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
@@ -855,6 +1009,65 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_failure_emits_task_error_event() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "manual remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
assert!(
error_message.contains("Error running remote compact task"),
"expected remote compact task error prefix, got {error_message}"
);
assert!(
error_message.contains("invalid compact payload shape")
|| error_message.contains("invalid type: string"),
"expected invalid compact payload details, got {error_message}"
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -985,11 +1198,11 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
)
});
if has_compacted_user_summary
&& has_compaction_item
&& has_compacted_assistant_note
&& has_permissions_developer_message
{
if has_compacted_user_summary && has_compaction_item && has_compacted_assistant_note {
assert!(
!has_permissions_developer_message,
"manual remote compact rollout replacement history should not inject permissions context"
);
saw_compacted_history = true;
break;
}
@@ -1255,3 +1468,425 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message()
-> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "REMOTE_FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", 80),
]),
],
)
.await;
let compacted_history = vec![
user_message_item("USER_ONE"),
user_message_item("USER_TWO"),
user_message_item("USER_THREE"),
user_message_item(&summary_with_prefix("REMOTE_PRE_TURN_SUMMARY")),
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
for user in ["USER_ONE", "USER_TWO", "USER_THREE"] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
3,
"expected user, user, and post-compact turn"
);
let compact_request = compact_mock.single_request();
let compact_shape = request_input_shape(&compact_request);
let follow_up_shape = request_input_shape(&requests[2]);
insta::assert_snapshot!(
"remote_pre_turn_compaction_including_incoming_shapes",
sectioned_request_shapes(&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Request", &requests[2]),
])
);
assert!(
compact_shape.contains("USER_THREE"),
"remote compaction request should include incoming user message"
);
assert!(
follow_up_shape.contains("<SUMMARY:REMOTE_PRE_TURN_SUMMARY>"),
"post-compaction request should include remote summary"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_failure_stops_without_retry()
-> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 500),
])],
)
.await;
let first_compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
.await;
let post_compact_turn_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_POST_COMPACT_SHOULD_NOT_RUN"),
responses::ev_completed_with_tokens("r2", 80),
]),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(first_compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
1,
"expected no post-compaction follow-up turn request after compact failure"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
"expected turn to stop after compaction failure"
);
let include_attempt_request = first_compact_mock.single_request();
let include_attempt_shape = request_input_shape(&include_attempt_request);
insta::assert_snapshot!(
"remote_pre_turn_compaction_failure_shapes",
sectioned_request_shapes(&[(
"Remote Compaction Request (Including Incoming User Message)",
&include_attempt_request
),])
);
assert!(
include_attempt_shape.contains("USER_TWO"),
"first remote pre-turn compaction attempt should include incoming user message"
);
assert!(
error_message.contains("invalid compact payload shape")
|| error_message.contains("invalid type: string"),
"expected compact parse failure to surface, got {error_message}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_continuation_compaction() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compacted_history = vec![
user_message_item("USER_ONE"),
user_message_item(&summary_with_prefix("REMOTE_MID_TURN_SUMMARY")),
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
2,
"expected initial and post-compact requests"
);
let compact_request = compact_mock.single_request();
let compact_shape = request_input_shape(&compact_request);
let follow_up_shape = request_input_shape(&requests[1]);
insta::assert_snapshot!(
"remote_mid_turn_compaction_shapes",
sectioned_request_shapes(&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Request", &requests[1]),
])
);
assert!(
compact_shape.contains("function_call_output"),
"remote mid-turn compaction request should include function call output"
);
assert!(
follow_up_shape.contains("<SUMMARY:REMOTE_MID_TURN_SUMMARY>"),
"post-mid-turn request should include remote compaction summary"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_manual_compact_without_previous_user_messages() -> Result<()>
{
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_MANUAL_EMPTY_FOLLOW_UP_REPLY"),
responses::ev_completed_with_tokens("r1", 80),
]),
)
.await;
let compact_mock =
responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] }))
.await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
0,
"manual remote /compact should skip remote compaction when there is no associated user"
);
let follow_up_request = responses_mock.single_request();
let follow_up_shape = request_input_shape(&follow_up_request);
insta::assert_snapshot!(
"remote_manual_compact_without_prev_user_shapes",
format!(
"## Remote Post-Compaction History Request\n{}",
request_input_shape(&follow_up_request)
)
);
assert!(
!follow_up_shape.contains("<SUMMARY:REMOTE_MANUAL_EMPTY_SUMMARY>"),
"post-compact request should not include compact summary when remote compaction is skipped"
);
assert!(
follow_up_shape.contains("USER_ONE"),
"post-compact request should include the submitted user message"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_manual_compact_with_previous_user_messages() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_MANUAL_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 80),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MANUAL_FOLLOW_UP_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compacted_history = vec![
user_message_item("USER_ONE"),
user_message_item(&summary_with_prefix("REMOTE_MANUAL_WITH_HISTORY_SUMMARY")),
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected user and post-compact requests");
let compact_request = compact_mock.single_request();
let compact_shape = request_input_shape(&compact_request);
let follow_up_shape = request_input_shape(&requests[1]);
insta::assert_snapshot!(
"remote_manual_compact_with_history_shapes",
sectioned_request_shapes(&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Request", &requests[1]),
])
);
assert!(
compact_shape.contains("USER_ONE"),
"remote compaction request should include existing user history"
);
assert!(
follow_up_shape.contains("USER_TWO"),
"post-compact request should include latest user message"
);
assert!(
follow_up_shape.contains("<SUMMARY:REMOTE_MANUAL_WITH_HISTORY_SUMMARY>"),
"post-compact request should include remote compact summary"
);
Ok(())
}

View File

@@ -113,6 +113,94 @@ async fn model_change_appends_model_instructions_developer_message() -> Result<(
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn settings_only_empty_turn_persists_updates_for_next_non_empty_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let resp_mock = mount_sse_sequence(
&server,
vec![sse_completed("resp-1"), sse_completed("resp-2")],
)
.await;
let mut builder = test_codex().with_model("gpt-5.2-codex");
let test = builder.build(&server).await?;
let model = test.session_configured.model.clone();
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "first".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
model: model.clone(),
effort: test.config.model_reasoning_effort,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
// Settings-only turn with no user message.
test.codex
.submit(Op::UserTurn {
items: Vec::new(),
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: model.clone(),
effort: test.config.model_reasoning_effort,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "after settings-only turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model,
effort: test.config.model_reasoning_effort,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = resp_mock.requests();
assert_eq!(
requests.len(),
2,
"expected only first and third turns to hit the model"
);
let third_turn_request = requests.last().expect("expected third turn request");
let developer_texts = third_turn_request.message_input_texts("developer");
assert!(
developer_texts
.iter()
.any(|text| text.contains("sandbox_mode` is `danger-full-access`")),
"expected danger-full-access permissions update in next non-empty turn: {developer_texts:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn model_and_personality_change_only_appends_model_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -0,0 +1,19 @@
---
source: core/tests/suite/compact.rs
expression: "sectioned_request_shapes(&[(\"Local Compaction Request\", &requests[1]),\n(\"Local Post-Compaction History Request\", &requests[2]),])"
---
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/assistant:FIRST_REPLY
05:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Request
00:message/user:USER_ONE
01:message/user:<SUMMARY:MANUAL_SUMMARY>
02:message/developer:<PERMISSIONS_INSTRUCTIONS>
03:message/user:<AGENTS_MD>
04:message/user:<ENVIRONMENT_CONTEXT>
05:message/user:USER_TWO

View File

@@ -0,0 +1,9 @@
---
source: core/tests/suite/compact.rs
expression: "sectioned_request_shapes(&[(\"Local Post-Compaction History\", &requests[0]),])"
---
## Local Post-Compaction History
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:AFTER_MANUAL_EMPTY_COMPACT

View File

@@ -0,0 +1,19 @@
---
source: core/tests/suite/compact.rs
expression: "sectioned_request_shapes(&[(\"Local Compaction Request\", &requests[1]),\n(\"Local Post-Compaction History Request\", &requests[2]),])"
---
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:function_call/<TOOL_CALL>
05:function_call_output:<TOOL_ERROR_OUTPUT>
06:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/user:<SUMMARY:MID_TURN_SUMMARY>

View File

@@ -0,0 +1,12 @@
---
source: core/tests/suite/compact.rs
expression: "sectioned_request_shapes(&[(\"Local Compaction Request (Including Incoming User Message)\",\n&requests[1]),])"
---
## Local Compaction Request (Including Incoming User Message)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/assistant:FIRST_REPLY
05:message/user:USER_TWO
06:message/user:<SUMMARIZATION_PROMPT>

View File

@@ -0,0 +1,23 @@
---
source: core/tests/suite/compact.rs
expression: "sectioned_request_shapes(&[(\"Local Compaction Request\", &requests[2]),\n(\"Local Post-Compaction History Request\", &requests[3]),])"
---
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/assistant:FIRST_REPLY
05:message/user:USER_TWO
06:message/assistant:SECOND_REPLY
07:message/user:<image> | </image> | USER_THREE
08:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Request
00:message/user:USER_ONE
01:message/user:USER_TWO
02:message/developer:<PERMISSIONS_INSTRUCTIONS>
03:message/user:<AGENTS_MD>
04:message/user:<ENVIRONMENT_CONTEXT>
05:message/user:<image> | </image> | USER_THREE
06:message/user:<SUMMARY:PRE_TURN_SUMMARY>

View File

@@ -0,0 +1,18 @@
---
source: core/tests/suite/compact_remote.rs
expression: "sectioned_request_shapes(&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Request\", &requests[1]),])"
---
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/assistant:REMOTE_MANUAL_FIRST_REPLY
## Remote Post-Compaction History Request
00:message/user:USER_ONE
01:message/user:<SUMMARY:REMOTE_MANUAL_WITH_HISTORY_SUMMARY>
02:message/developer:<PERMISSIONS_INSTRUCTIONS>
03:message/user:<AGENTS_MD>
04:message/user:<ENVIRONMENT_CONTEXT>
05:message/user:USER_TWO

View File

@@ -0,0 +1,9 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format!(\"## Remote Post-Compaction History Request\\n{}\",\nrequest_input_shape(&follow_up_request))"
---
## Remote Post-Compaction History Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE

View File

@@ -0,0 +1,18 @@
---
source: core/tests/suite/compact_remote.rs
expression: "sectioned_request_shapes(&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Request\", &requests[1]),])"
---
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:function_call/<TOOL_CALL>
05:function_call_output:<TOOL_ERROR_OUTPUT>
## Remote Post-Compaction History Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/user:<SUMMARY:REMOTE_MID_TURN_SUMMARY>

View File

@@ -0,0 +1,11 @@
---
source: core/tests/suite/compact_remote.rs
expression: "sectioned_request_shapes(&[(\"Remote Compaction Request (Including Incoming User Message)\",\n&include_attempt_request),])"
---
## Remote Compaction Request (Including Incoming User Message)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
05:message/user:USER_TWO

View File

@@ -0,0 +1,22 @@
---
source: core/tests/suite/compact_remote.rs
expression: "sectioned_request_shapes(&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Request\", &requests[2]),])"
---
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT>
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
05:message/user:USER_TWO
06:message/assistant:REMOTE_SECOND_REPLY
07:message/user:USER_THREE
## Remote Post-Compaction History Request
00:message/user:USER_ONE
01:message/user:USER_TWO
02:message/developer:<PERMISSIONS_INSTRUCTIONS>
03:message/user:<AGENTS_MD>
04:message/user:<ENVIRONMENT_CONTEXT>
05:message/user:USER_THREE
06:message/user:<SUMMARY:REMOTE_PRE_TURN_SUMMARY>