Compare commits

...

13 Commits

Author SHA1 Message Date
Charles Cunningham
7caafd1af3 codex: satisfy telemetry argument comment lint
Add the required argument-name comment to the stalled auto-compaction
telemetry counter increment.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 21:01:57 -07:00
Charles Cunningham
232ffcffc6 codex: clarify compacted replacement history sync
Add an inline comment explaining why replacement_history is refreshed
from the final in-memory items after the ghost snapshot tail merge.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:28 -07:00
Charles Cunningham
dd53b7e1b5 codex: simplify compaction ghost snapshot merge
Narrow the compaction race fix to the concrete ghost-snapshot case.
Remove the conflict classification and warning path, and make
replace_compacted_history opportunistically preserve only an
append-only GhostSnapshot tail under the state lock.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:28 -07:00
Charles Cunningham
1ed83c70bf codex: trim replace_compacted_history doc comment (#14563)
Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:28 -07:00
Charles Cunningham
85ee75c9bf codex: simplify compaction conflict handling (#14563)
Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:28 -07:00
Charles Cunningham
b758723878 codex: clarify replace_compacted_history contract (#14563)
Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:27 -07:00
Charles Cunningham
4abb1baa59 codex: fix CI failure on PR #14563
Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:27 -07:00
Charles Cunningham
65ab4a4203 codex: address PR review feedback (#14563)
Move ghost snapshot tail preservation into replace_compacted_history so the merge happens under the same session lock as history replacement and the persisted replacement history matches the live history.

Tests: just fmt
Tests: cargo test -p codex-core

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:26 -07:00
Charles Cunningham
54717e13bb Clarify stalled auto-compaction safeguard
Make the mid-turn compaction cap explicit as a stalled auto-compaction safeguard with dedicated error text, telemetry, and regression coverage.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:25 -07:00
Charles Cunningham
ae2511179f Limit compaction race merge to ghost snapshots
Narrow the compaction race fix to preserve only appended ghost snapshots, which are non-model-visible and can realistically arrive from detached background tasks while compaction is waiting.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:25 -07:00
Charles Cunningham
8335832f1c core: document compaction tail merge race
Explain the snapshot-to-replace race at both compaction call sites so the extra re-snapshot and append-only tail preservation logic is easier to review.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:25 -07:00
Charles Cunningham
1023dee640 core: simplify compaction tail merge helper
Rename the append-only compaction merge helper, switch it to a boolean success signal, and document the exact concurrency assumption.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:25 -07:00
Charles Cunningham
06d264b54e core: preserve compaction tail writes and cap retry loop
Preserve append-only session history items that arrive during local or remote compaction before replacing compacted history, and stop mid-turn auto compaction after three attempts with a user-visible error.

Co-authored-by: Codex <noreply@openai.com>
2026-03-16 20:43:25 -07:00
6 changed files with 378 additions and 10 deletions

View File

@@ -379,6 +379,7 @@ pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 512;
const CYBER_VERIFY_URL: &str = "https://chatgpt.com/cyber";
const CYBER_SAFETY_URL: &str = "https://developers.openai.com/codex/concepts/cyber-safety";
const DIRECT_APP_TOOL_EXPOSURE_THRESHOLD: usize = 100;
const MAX_MID_TURN_AUTO_COMPACTION_ATTEMPTS: u32 = 3;
impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
@@ -3357,14 +3358,33 @@ impl Session {
state.replace_history(items, reference_context_item);
}
/// Replaces session history with a compacted transcript and, under the same
/// state lock, preserves a concurrent append-only `GhostSnapshot` tail when
/// the live history still extends the compacted prefix.
pub(crate) async fn replace_compacted_history(
&self,
items: Vec<ResponseItem>,
mut items: Vec<ResponseItem>,
reference_context_item: Option<TurnContextItem>,
compacted_item: CompactedItem,
mut compacted_item: CompactedItem,
base_history: &[ResponseItem],
) {
self.replace_history(items, reference_context_item.clone())
.await;
// Compaction snapshots history and waits on a model/API call. Preserve
// any append-only ghost snapshot tail while holding the same lock that
// replaces history so detached `/undo` metadata writes are not lost in
// the gap before replacement.
{
let mut state = self.state.lock().await;
compact::append_concurrent_ghost_snapshot_tail(
&mut items,
base_history,
state.history.raw_items(),
);
state.replace_history(items.clone(), reference_context_item.clone());
}
// `items` may have gained a concurrent ghost snapshot tail under the
// state lock, so refresh the persisted replacement history to match the
// in-memory install.
compacted_item.replacement_history = Some(items);
self.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
@@ -5679,6 +5699,7 @@ pub(crate) async fn run_turn(
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let mut server_model_warning_emitted_for_turn = false;
let mut mid_turn_auto_compaction_attempts = 0_u32;
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
// one instance across retries within this turn.
@@ -5816,8 +5837,36 @@ pub(crate) async fn run_turn(
"post sampling token usage"
);
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
// Guard against a pathological loop where mid-turn auto-compaction
// keeps succeeding, but the turn still remains over the token
// threshold and still needs follow-up. Treat that as stalled
// auto-compaction so we fail explicitly instead of hot-looping.
if token_limit_reached && needs_follow_up {
mid_turn_auto_compaction_attempts += 1;
if mid_turn_auto_compaction_attempts > MAX_MID_TURN_AUTO_COMPACTION_ATTEMPTS {
sess.services.session_telemetry.counter(
"codex.auto_compaction.stalled",
/*inc*/ 1,
&[("phase", "mid_turn")],
);
error!(
turn_id = %turn_context.sub_id,
mid_turn_auto_compaction_attempts,
total_usage_tokens,
estimated_token_count = ?estimated_token_count,
auto_compact_limit,
"mid-turn auto-compaction stalled"
);
let event = EventMsg::Error(CodexErr::ContextWindowExceeded.to_error_event(
Some(
format!(
"Mid-turn auto-compaction stalled after {MAX_MID_TURN_AUTO_COMPACTION_ATTEMPTS} attempts without getting this turn back under the token threshold. Start a new thread or compact manually."
),
),
));
sess.send_event(&turn_context, event).await;
break;
}
if run_auto_compact(
&sess,
&turn_context,
@@ -5830,6 +5879,7 @@ pub(crate) async fn run_turn(
}
continue;
}
mid_turn_auto_compaction_attempts = 0;
if !needs_follow_up {
last_agent_message = sampling_request_last_agent_message;

View File

@@ -779,6 +779,51 @@ async fn reconstruct_history_uses_replacement_history_verbatim() {
assert_eq!(reconstructed.history, replacement_history);
}
#[tokio::test]
async fn replace_compacted_history_persists_merged_ghost_snapshot_tail() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let rollout_path = attach_rollout_recorder(&sess).await;
let base_history = vec![user_message("before compact")];
let summary_item = user_message("summary");
let ghost_snapshot = ResponseItem::GhostSnapshot {
ghost_commit: codex_git::GhostCommit::new(
"ghost-123".to_string(),
None,
Vec::new(),
Vec::new(),
),
};
sess.replace_history(base_history.clone(), Some(tc.to_turn_context_item()))
.await;
sess.record_conversation_items(tc.as_ref(), std::slice::from_ref(&ghost_snapshot))
.await;
sess.replace_compacted_history(
vec![summary_item.clone()],
Some(tc.to_turn_context_item()),
CompactedItem {
message: String::new(),
replacement_history: Some(vec![summary_item.clone()]),
},
&base_history,
)
.await;
let expected_history = vec![summary_item.clone(), ghost_snapshot.clone()];
assert_eq!(sess.clone_history().await.raw_items(), expected_history);
sess.flush_rollout().await;
let (rollout_items, _, _) = RolloutRecorder::load_rollout_items(&rollout_path)
.await
.expect("load rollout items");
let reconstructed = sess
.reconstruct_history_from_rollout(tc.as_ref(), &rollout_items)
.await;
assert_eq!(reconstructed.history, expected_history);
}
#[tokio::test]
async fn record_initial_history_reconstructs_resumed_transcript() {
let (session, turn_context) = make_session_and_context().await;

View File

@@ -218,8 +218,13 @@ async fn run_compact_task_inner(
message: summary_text.clone(),
replacement_history: Some(new_history.clone()),
};
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.replace_compacted_history(
new_history,
reference_context_item,
compacted_item,
history_items,
)
.await;
sess.recompute_token_usage(&turn_context).await;
sess.emit_turn_item_completed(&turn_context, compaction_item)
@@ -270,6 +275,30 @@ pub(crate) fn is_summary_message(message: &str) -> bool {
message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str())
}
/// Appends ghost snapshots added after `base_history` when `latest_history`
/// still preserves `base_history` as an exact prefix and only appends
/// `GhostSnapshot` items.
///
/// Ghost snapshots are appended by detached background tasks for `/undo`, but
/// `ContextManager::for_prompt()` strips them before any model request. That
/// makes it safe to preserve the concurrent ghost-snapshot-only case while
/// avoiding model-visible items that the compaction request never saw.
pub(crate) fn append_concurrent_ghost_snapshot_tail(
new_history: &mut Vec<ResponseItem>,
base_history: &[ResponseItem],
latest_history: &[ResponseItem],
) {
let Some(appended_items) = latest_history.strip_prefix(base_history) else {
return;
};
if appended_items
.iter()
.all(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
{
new_history.extend_from_slice(appended_items);
}
}
/// Inserts canonical initial context into compacted replacement history at the
/// model-expected boundary.
///

View File

@@ -73,7 +73,8 @@ async fn run_remote_compact_task_inner_impl(
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
let mut history = sess.clone_history().await;
let history_snapshot = sess.clone_history().await;
let mut history = history_snapshot.clone();
let base_instructions = sess.get_base_instructions().await;
let deleted_items = trim_function_call_history_to_fit_context_window(
&mut history,
@@ -156,8 +157,13 @@ async fn run_remote_compact_task_inner_impl(
message: String::new(),
replacement_history: Some(new_history.clone()),
};
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.replace_compacted_history(
new_history,
reference_context_item,
compacted_item,
history_snapshot.raw_items(),
)
.await;
sess.recompute_token_usage(turn_context).await;
sess.emit_turn_item_completed(turn_context, compaction_item)

View File

@@ -185,6 +185,147 @@ fn build_token_limited_compacted_history_appends_summary_message() {
assert_eq!(summary, summary_text);
}
#[test]
fn append_concurrent_ghost_snapshot_tail_appends_concurrent_tail() {
let base_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "before compact".to_string(),
}],
end_turn: None,
phase: None,
}];
let appended_item = ResponseItem::GhostSnapshot {
ghost_commit: codex_git::GhostCommit::new(
"ghost-123".to_string(),
None,
Vec::new(),
Vec::new(),
),
};
let latest_history = vec![base_history[0].clone(), appended_item.clone()];
let mut compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
}];
append_concurrent_ghost_snapshot_tail(&mut compacted_history, &base_history, &latest_history);
assert_eq!(
compacted_history,
vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
},
appended_item,
]
);
}
#[test]
fn append_concurrent_ghost_snapshot_tail_ignores_model_visible_tail() {
let base_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "before compact".to_string(),
}],
end_turn: None,
phase: None,
}];
let latest_history = vec![
base_history[0].clone(),
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "not compacted".to_string(),
}],
end_turn: None,
phase: None,
},
];
let mut compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
}];
append_concurrent_ghost_snapshot_tail(&mut compacted_history, &base_history, &latest_history);
assert_eq!(
compacted_history,
vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
}]
);
}
#[test]
fn append_concurrent_ghost_snapshot_tail_ignores_non_append_only_changes() {
let base_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "before compact".to_string(),
}],
end_turn: None,
phase: None,
}];
let latest_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "rewritten".to_string(),
}],
end_turn: None,
phase: None,
}];
let mut compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
}];
append_concurrent_ghost_snapshot_tail(&mut compacted_history, &base_history, &latest_history);
assert_eq!(
compacted_history,
vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
}]
);
}
#[tokio::test]
async fn process_compacted_history_replaces_developer_messages() {
let compacted_history = vec![

View File

@@ -2588,6 +2588,103 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mid_turn_auto_compaction_stall_emits_specific_error_after_limit() {
skip_if_no_network!();
let server = start_mock_server().await;
let context_window = 100;
let limit = context_window * 90 / 100;
let over_limit_tokens = context_window * 95 / 100 + 1;
let responses = vec![
sse(vec![
ev_function_call("call-stall-1", DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r1", over_limit_tokens),
]),
sse(vec![
ev_assistant_message("m2", &auto_summary("STALL_SUMMARY_1")),
ev_completed_with_tokens("r2", 10),
]),
sse(vec![
ev_function_call("call-stall-2", DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r3", over_limit_tokens),
]),
sse(vec![
ev_assistant_message("m4", &auto_summary("STALL_SUMMARY_2")),
ev_completed_with_tokens("r4", 10),
]),
sse(vec![
ev_function_call("call-stall-3", DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r5", over_limit_tokens),
]),
sse(vec![
ev_assistant_message("m6", &auto_summary("STALL_SUMMARY_3")),
ev_completed_with_tokens("r6", 10),
]),
sse(vec![
ev_function_call("call-stall-4", DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r7", over_limit_tokens),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_context_window = Some(context_window);
config.model_auto_compact_token_limit = Some(limit);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "STALL_AUTO_COMPACT".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit stalled auto compact turn");
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert!(
error_message.contains("Mid-turn auto-compaction stalled after 3 attempts"),
"expected stalled auto-compaction error, got {error_message}"
);
let request_bodies: Vec<String> = request_log
.requests()
.into_iter()
.map(|request| request.body_json().to_string())
.collect();
assert_eq!(
request_bodies.len(),
7,
"expected three auto compactions and no fourth compaction request"
);
assert_eq!(
request_bodies
.iter()
.filter(|body| body_contains_text(body, SUMMARIZATION_PROMPT))
.count(),
3,
"stalled safeguard should stop before a fourth auto compaction request"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_mid_turn_continuation_compaction() {
skip_if_no_network!();