Compare commits

...

24 Commits

Author SHA1 Message Date
Charles Cunningham
97e64b8e4d Preserve persisted history when compact retries trim prompts
Build compaction replacement history from persisted session history instead of the retry-local prompt buffer, so ContextWindowExceeded trimming does not drop earlier real user messages or ghost snapshots from replacement history.

Add a regression test covering repeated compaction retries and verifying follow-up requests retain earlier user messages even after retry prompt trimming.

Co-authored-by: Codex <noreply@openai.com>
2026-03-06 01:38:49 -08:00
Charles Cunningham
ff975f2f62 Clarify initial turn context naming in task lifecycle
Rename task-snapshot TurnContext parameters and locals to initial_turn_context across task spawning, completion, and task implementations to avoid confusion with mutable active-turn context.

Co-authored-by: Codex <noreply@openai.com>
2026-03-06 01:33:15 -08:00
Charles Cunningham
e2fae5de8d Clarify turn context ownership and retry refresh CAS
Rename RunningTask.turn_context to initial_turn_context to make task snapshot semantics explicit, and retry session-context refresh when compare-and-swap installation races with concurrent mid-turn updates.

Co-authored-by: Codex <noreply@openai.com>
2026-03-06 00:53:14 -08:00
Charles Cunningham
c360485a13 Clarify newest surviving turn-context snapshot selection
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 20:01:21 -08:00
Charles Cunningham
be08067aca Stabilize metadata spawn test with local git repo
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 17:28:04 -08:00
Charles Cunningham
c727a66bb9 Carry trace_id in refreshed turn contexts
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 17:10:03 -08:00
Charles Cunningham
4b6c5880fb Prevent stale same-turn context refresh overwrites
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
03119b822c Serialize session config updates with MCP side effects
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
23e54a9593 Guard refreshed turn-context installs by sub_id
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
8f56878928 Add span_name to network approval test task
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
ec5f273255 Install refreshed turn metadata tasks atomically
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
174cc46016 Update compact-remote realtime snapshots
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
ae9050fb7c Fix rebased TUI and compact-remote build breaks
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
f06e7c9bb9 Cancel refreshed turn metadata tasks on teardown
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
857fb71ea1 Keep real user messages in inline compaction
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:20 -08:00
Charles Cunningham
1f50631422 Use refreshed turn context for network approvals
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
2fc485dd28 Rebuild local compaction history from prompt state
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
82e08ddece Persist refreshed context before first sampling request
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
d88b47d010 Advance previous-turn baseline on mid-turn compaction
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
62d206169e Keep real user messages that match compact prompt
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
f9f0aa8b57 Replace impossible same-turn realtime snapshots
Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
068272f32f Support mid-turn realtime context refresh
Update active turn contexts when realtime starts or stops, keep previous_turn_settings aligned with committed same-turn context updates, and teach replay plus remote compact snapshots to use the latest TurnContextItem within a turn.

Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
d8f4093818 Fix rebased turn-context refresh regressions
Restore cross-turn model-switch diffing to use previous-turn settings while keeping same-turn updates keyed to the persisted reference context. Also gate pre-request mid-turn compaction on the immediately preceding follow-up response so continuation requests do not compact repeatedly, and drop the synthetic compaction prompt from rebuilt local compact history.

Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
Charles Cunningham
2327519d34 Allow turn context refresh between sampling requests
Reload the active turn context before each sampling request so nudges and other mid-turn setting changes can take effect without mutating TurnContext in place. Move mid-turn compaction to the start of follow-up requests, skip separate diff injection when compaction re-establishes full context, and persist TurnContextItem baselines only when the active context actually changes.

Also refresh the active turn context after override_turn_context, reset the reused model client session on mid-turn model changes, and key model-switch diffs off the persisted reference context so same-turn switch-backs still re-inject model instructions.

Co-authored-by: Codex <noreply@openai.com>
2026-03-05 15:19:19 -08:00
21 changed files with 2318 additions and 177 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -170,10 +170,15 @@ impl Session {
active_segment.turn_id.as_deref(),
ctx.turn_id.as_deref(),
) {
active_segment.previous_turn_settings = Some(PreviousTurnSettings {
model: ctx.model.clone(),
realtime_active: ctx.realtime_active,
});
// Reverse replay walks newest-to-oldest, so the first surviving
// `TurnContextItem` we see for this turn segment is the newest
// surviving context snapshot for the turn.
if active_segment.previous_turn_settings.is_none() {
active_segment.previous_turn_settings = Some(PreviousTurnSettings {
model: ctx.model.clone(),
realtime_active: ctx.realtime_active,
});
}
if matches!(
active_segment.reference_context_item,
TurnReferenceContextItem::NeverSet

View File

@@ -835,6 +835,129 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
);
}
#[tokio::test]
async fn record_initial_history_resumed_prefers_latest_turn_context_item_within_single_turn() {
let (session, turn_context) = make_session_and_context().await;
let mut earlier_context_item = turn_context.to_turn_context_item();
earlier_context_item.realtime_active = Some(true);
let mut latest_context_item = earlier_context_item.clone();
latest_context_item.realtime_active = Some(false);
let turn_id = earlier_context_item
.turn_id
.clone()
.expect("turn context should have turn_id");
let rollout_items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
turn_id: turn_id.clone(),
model_context_window: Some(128_000),
collaboration_mode_kind: ModeKind::Default,
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
},
)),
RolloutItem::TurnContext(earlier_context_item),
RolloutItem::TurnContext(latest_context_item.clone()),
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id,
last_agent_message: None,
},
)),
];
session
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
conversation_id: ThreadId::default(),
history: rollout_items,
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
}))
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(false),
})
);
assert_eq!(
session.reference_context_item().await,
Some(latest_context_item)
);
}
#[tokio::test]
async fn record_initial_history_resumed_compaction_reestablishes_latest_turn_context_item_within_single_turn()
{
let (session, turn_context) = make_session_and_context().await;
let mut earlier_context_item = turn_context.to_turn_context_item();
earlier_context_item.realtime_active = Some(true);
let mut latest_context_item = earlier_context_item.clone();
latest_context_item.realtime_active = Some(false);
let turn_id = earlier_context_item
.turn_id
.clone()
.expect("turn context should have turn_id");
let rollout_items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
turn_id: turn_id.clone(),
model_context_window: Some(128_000),
collaboration_mode_kind: ModeKind::Default,
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
},
)),
RolloutItem::TurnContext(earlier_context_item),
RolloutItem::Compacted(CompactedItem {
message: String::new(),
replacement_history: Some(Vec::new()),
}),
RolloutItem::TurnContext(latest_context_item.clone()),
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id,
last_agent_message: None,
},
)),
];
session
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
conversation_id: ThreadId::default(),
history: rollout_items,
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
}))
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(false),
})
);
assert_eq!(
session.reference_context_item().await,
Some(latest_context_item)
);
}
#[tokio::test]
async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_turn_for_compaction_accounting()
{

View File

@@ -99,6 +99,7 @@ async fn run_compact_task_inner(
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
let persisted_history_items = history.raw_items().to_vec();
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
@@ -188,12 +189,15 @@ async fn run_compact_task_inner(
}
}
let history_snapshot = sess.clone_history().await;
let history_items = history_snapshot.raw_items();
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let summary_suffix = {
let history_snapshot = sess.clone_history().await;
get_last_assistant_message_from_turn(history_snapshot.raw_items()).unwrap_or_default()
};
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(history_items);
// Build replacement history from persisted session history, not the retry-local
// prompt buffer. Retries may trim oldest prompt items to fit context limits, but
// replacement history must preserve prior real user messages and ghost snapshots.
let user_messages = collect_user_messages(&persisted_history_items);
let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text);
if matches!(
@@ -204,7 +208,7 @@ async fn run_compact_task_inner(
new_history =
insert_initial_context_before_last_real_user_or_summary(new_history, initial_context);
}
let ghost_snapshots: Vec<ResponseItem> = history_items
let ghost_snapshots: Vec<ResponseItem> = persisted_history_items
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()

View File

@@ -310,6 +310,8 @@ pub(crate) async fn handle_start(
};
info!("realtime conversation started");
sess.refresh_current_active_turn_context_from_realtime_state()
.await;
sess.send_event_raw(Event {
id: sub_id.clone(),
@@ -354,6 +356,9 @@ pub(crate) async fn handle_start(
}
if realtime_active.swap(false, Ordering::Relaxed) {
info!("realtime conversation transport closed");
sess_clone
.refresh_current_active_turn_context_from_realtime_state()
.await;
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
RealtimeConversationClosedEvent {
@@ -455,6 +460,8 @@ pub(crate) async fn handle_text(
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
match sess.conversation.shutdown().await {
Ok(()) => {
sess.refresh_current_active_turn_context_from_realtime_state()
.await;
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {

View File

@@ -20,6 +20,7 @@ use crate::tasks::SessionTask;
/// Metadata about the currently running turn.
pub(crate) struct ActiveTurn {
pub(crate) current_turn_context: Option<Arc<TurnContext>>,
pub(crate) tasks: IndexMap<String, RunningTask>,
pub(crate) turn_state: Arc<Mutex<TurnState>>,
}
@@ -27,6 +28,7 @@ pub(crate) struct ActiveTurn {
impl Default for ActiveTurn {
fn default() -> Self {
Self {
current_turn_context: None,
tasks: IndexMap::new(),
turn_state: Arc::new(Mutex::new(TurnState::default())),
}
@@ -46,14 +48,15 @@ pub(crate) struct RunningTask {
pub(crate) task: Arc<dyn SessionTask>,
pub(crate) cancellation_token: CancellationToken,
pub(crate) handle: Arc<AbortOnDropHandle<()>>,
pub(crate) turn_context: Arc<TurnContext>,
pub(crate) initial_turn_context: Arc<TurnContext>,
// Timer recorded when the task drops to capture the full turn duration.
pub(crate) _timer: Option<codex_otel::Timer>,
}
impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
self.current_turn_context = Some(Arc::clone(&task.initial_turn_context));
let sub_id = task.initial_turn_context.sub_id.clone();
self.tasks.insert(sub_id, task);
}

View File

@@ -24,25 +24,26 @@ impl SessionTask for CompactTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
let _ = if crate::compact::should_use_remote_compact_task(&initial_turn_context.provider) {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
crate::compact_remote::run_remote_compact_task(session.clone(), initial_turn_context)
.await
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "local")],
);
crate::compact::run_compact_task(session.clone(), ctx, input).await
crate::compact::run_compact_task(session.clone(), initial_turn_context, input).await
};
None
}

View File

@@ -39,18 +39,18 @@ impl SessionTask for GhostSnapshotTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
tokio::task::spawn(async move {
let token = self.token;
let warnings_enabled = !ctx.ghost_snapshot.disable_warnings;
let warnings_enabled = !initial_turn_context.ghost_snapshot.disable_warnings;
// Channel used to signal when the snapshot work has finished so the
// timeout warning task can exit early without sending a warning.
let (snapshot_done_tx, snapshot_done_rx) = oneshot::channel::<()>();
if warnings_enabled {
let ctx_for_warning = ctx.clone();
let initial_turn_context_for_warning = initial_turn_context.clone();
let cancellation_token_for_warning = cancellation_token.clone();
let session_for_warning = session.clone();
// Fire a generic warning if the snapshot is still running after
@@ -61,7 +61,7 @@ impl SessionTask for GhostSnapshotTask {
_ = tokio::time::sleep(SNAPSHOT_WARNING_THRESHOLD) => {
session_for_warning.session
.send_event(
&ctx_for_warning,
&initial_turn_context_for_warning,
EventMsg::Warning(WarningEvent {
message: "Repository snapshot is taking longer than expected. Large untracked or ignored files can slow snapshots; consider adding large files or directories to .gitignore or disabling `undo` in your config.".to_string()
}),
@@ -76,12 +76,12 @@ impl SessionTask for GhostSnapshotTask {
drop(snapshot_done_rx);
}
let ctx_for_task = ctx.clone();
let initial_turn_context_for_task = initial_turn_context.clone();
let cancelled = tokio::select! {
_ = cancellation_token.cancelled() => true,
_ = async {
let repo_path = ctx_for_task.cwd.clone();
let ghost_snapshot = ctx_for_task.ghost_snapshot.clone();
let repo_path = initial_turn_context_for_task.cwd.clone();
let ghost_snapshot = initial_turn_context_for_task.ghost_snapshot.clone();
let ghost_snapshot_for_commit = ghost_snapshot.clone();
// Required to run in a dedicated blocking pool.
match tokio::task::spawn_blocking(move || {
@@ -102,7 +102,7 @@ impl SessionTask for GhostSnapshotTask {
session
.session
.send_event(
&ctx_for_task,
&initial_turn_context_for_task,
EventMsg::Warning(WarningEvent { message }),
)
.await;
@@ -110,7 +110,7 @@ impl SessionTask for GhostSnapshotTask {
}
session
.session
.record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot {
.record_conversation_items(&initial_turn_context, &[ResponseItem::GhostSnapshot {
ghost_commit: ghost_commit.clone(),
}])
.await;
@@ -118,26 +118,26 @@ impl SessionTask for GhostSnapshotTask {
}
Ok(Err(err)) => match err {
GitToolingError::NotAGitRepository { .. } => info!(
sub_id = ctx_for_task.sub_id.as_str(),
sub_id = initial_turn_context_for_task.sub_id.as_str(),
"skipping ghost snapshot because current directory is not a Git repository"
),
_ => {
warn!(
sub_id = ctx_for_task.sub_id.as_str(),
sub_id = initial_turn_context_for_task.sub_id.as_str(),
"failed to capture ghost snapshot: {err}"
);
}
},
Err(err) => {
warn!(
sub_id = ctx_for_task.sub_id.as_str(),
sub_id = initial_turn_context_for_task.sub_id.as_str(),
"ghost snapshot task panicked: {err}"
);
let message =
format!("Snapshots disabled after ghost snapshot panic: {err}.");
session
.session
.notify_background_event(&ctx_for_task, message)
.notify_background_event(&initial_turn_context_for_task, message)
.await;
}
}
@@ -150,7 +150,7 @@ impl SessionTask for GhostSnapshotTask {
info!("ghost snapshot task cancelled");
}
match ctx.tool_call_gate.mark_ready(token).await {
match initial_turn_context.tool_call_gate.mark_ready(token).await {
Ok(true) => info!("ghost snapshot gate marked ready"),
Ok(false) => warn!("ghost snapshot gate already ready"),
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),

View File

@@ -103,7 +103,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String>;
@@ -113,15 +113,19 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// The default implementation is a no-op; override this if additional
/// teardown or notifications are required once
/// [`Session::abort_all_tasks`] cancels the task.
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
let _ = (session, ctx);
async fn abort(
&self,
session: Arc<SessionTaskContext>,
initial_turn_context: Arc<TurnContext>,
) {
let _ = (session, initial_turn_context);
}
}
impl Session {
pub async fn spawn_task<T: SessionTask>(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
task: T,
) {
@@ -138,7 +142,7 @@ impl Session {
let done_clone = Arc::clone(&done);
let handle = {
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_initial_turn_context = Arc::clone(&initial_turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
@@ -147,16 +151,16 @@ impl Session {
"turn",
otel.name = span_name,
thread.id = %self.conversation_id,
turn.id = %turn_context.sub_id,
model = %turn_context.model_info.slug,
turn.id = %initial_turn_context.sub_id,
model = %initial_turn_context.model_info.slug,
);
tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let initial_turn_context_for_finish = Arc::clone(&task_initial_turn_context);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
task_initial_turn_context,
input,
task_cancellation_token.child_token(),
)
@@ -165,8 +169,11 @@ impl Session {
sess.flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
.await;
sess.on_task_finished(
Arc::clone(&initial_turn_context_for_finish),
last_agent_message,
)
.await;
}
done_clone.notify_waiters();
}
@@ -174,7 +181,7 @@ impl Session {
)
};
let timer = turn_context
let timer = initial_turn_context
.otel_manager
.start_timer("codex.turn.e2e_duration_ms", &[])
.ok();
@@ -185,7 +192,7 @@ impl Session {
kind: task_kind,
task,
cancellation_token,
turn_context: Arc::clone(&turn_context),
initial_turn_context: Arc::clone(&initial_turn_context),
_timer: timer,
};
self.register_new_active_task(running_task).await;
@@ -202,10 +209,10 @@ impl Session {
pub async fn on_task_finished(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
) {
turn_context
initial_turn_context
.turn_metadata_state
.cancel_git_enrichment_task();
@@ -214,9 +221,14 @@ impl Session {
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_tool_calls = 0_u64;
let mut current_turn_metadata_state = None;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
&& at.remove_task(&initial_turn_context.sub_id)
{
current_turn_metadata_state = at
.current_turn_context
.take()
.map(|current_turn_context| Arc::clone(&current_turn_context.turn_metadata_state));
let mut ts = at.turn_state.lock().await;
pending_input = ts.take_pending_input();
turn_tool_calls = ts.tool_calls;
@@ -227,6 +239,9 @@ impl Session {
*active = None;
}
drop(active);
if let Some(current_turn_metadata_state) = current_turn_metadata_state {
current_turn_metadata_state.cancel_git_enrichment_task();
}
if !pending_input.is_empty() {
let pending_response_items = pending_input
.into_iter()
@@ -238,14 +253,14 @@ impl Session {
// normal pre-sampling drain. This helper records the response item once, then
// emits ItemStarted/UserMessage and ItemCompleted/UserMessage for clients.
self.record_user_prompt_and_emit_turn_item(
turn_context.as_ref(),
initial_turn_context.as_ref(),
&user_message.content,
response_item,
)
.await;
} else {
self.record_conversation_items(
turn_context.as_ref(),
initial_turn_context.as_ref(),
std::slice::from_ref(&response_item),
)
.await;
@@ -313,10 +328,10 @@ impl Session {
);
}
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
turn_id: initial_turn_context.sub_id.clone(),
last_agent_message,
});
self.send_event(turn_context.as_ref(), event).await;
self.send_event(initial_turn_context.as_ref(), event).await;
}
async fn register_new_active_task(&self, task: RunningTask) {
@@ -332,6 +347,11 @@ impl Session {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
if let Some(current_turn_context) = at.current_turn_context.take() {
current_turn_context
.turn_metadata_state
.cancel_git_enrichment_task();
}
at.clear_pending().await;
at.drain_tasks()
@@ -348,14 +368,14 @@ impl Session {
}
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
let sub_id = task.turn_context.sub_id.clone();
let sub_id = task.initial_turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
return;
}
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
task.cancellation_token.cancel();
task.turn_context
task.initial_turn_context
.turn_metadata_state
.cancel_git_enrichment_task();
let session_task = task.task;
@@ -372,7 +392,7 @@ impl Session {
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
session_task
.abort(session_ctx, Arc::clone(&task.turn_context))
.abort(session_ctx, Arc::clone(&task.initial_turn_context))
.await;
if reason == TurnAbortReason::Interrupted {
@@ -387,8 +407,11 @@ impl Session {
end_turn: None,
phase: None,
};
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
.await;
self.record_into_history(
std::slice::from_ref(&marker),
task.initial_turn_context.as_ref(),
)
.await;
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
.await;
// Ensure the marker is durably visible before emitting TurnAborted: some clients
@@ -397,10 +420,11 @@ impl Session {
}
let event = EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(task.turn_context.sub_id.clone()),
turn_id: Some(task.initial_turn_context.sub_id.clone()),
reason,
});
self.send_event(task.turn_context.as_ref(), event).await;
self.send_event(task.initial_turn_context.as_ref(), event)
.await;
}
}

View File

@@ -75,7 +75,7 @@ impl SessionTask for RegularTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -85,7 +85,7 @@ impl SessionTask for RegularTask {
let prewarmed_client_session = self.take_prewarmed_session().await;
run_turn(
sess,
ctx,
initial_turn_context,
input,
prewarmed_client_session,
cancellation_token,

View File

@@ -50,7 +50,7 @@ impl SessionTask for ReviewTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -63,33 +63,44 @@ impl SessionTask for ReviewTask {
// Start sub-codex conversation and get the receiver for events.
let output = match start_review_conversation(
session.clone(),
ctx.clone(),
initial_turn_context.clone(),
input,
cancellation_token.clone(),
)
.await
{
Some(receiver) => process_review_events(session.clone(), ctx.clone(), receiver).await,
Some(receiver) => {
process_review_events(session.clone(), initial_turn_context.clone(), receiver).await
}
None => None,
};
if !cancellation_token.is_cancelled() {
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
exit_review_mode(
session.clone_session(),
output.clone(),
initial_turn_context.clone(),
)
.await;
}
None
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
exit_review_mode(session.clone_session(), None, ctx).await;
async fn abort(
&self,
session: Arc<SessionTaskContext>,
initial_turn_context: Arc<TurnContext>,
) {
exit_review_mode(session.clone_session(), None, initial_turn_context).await;
}
}
async fn start_review_conversation(
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<async_channel::Receiver<Event>> {
let config = ctx.config.clone();
let config = initial_turn_context.config.clone();
let mut sub_agent_config = config.as_ref().clone();
// Carry over review-only feature restrictions so the delegate cannot
// re-enable blocked tools (web search, collab tools, view image).
@@ -108,7 +119,7 @@ async fn start_review_conversation(
let model = config
.review_model
.clone()
.unwrap_or_else(|| ctx.model_info.slug.clone());
.unwrap_or_else(|| initial_turn_context.model_info.slug.clone());
sub_agent_config.model = Some(model);
(run_codex_thread_one_shot(
sub_agent_config,
@@ -116,7 +127,7 @@ async fn start_review_conversation(
session.models_manager(),
input,
session.clone_session(),
ctx.clone(),
initial_turn_context.clone(),
cancellation_token,
None,
)
@@ -127,7 +138,7 @@ async fn start_review_conversation(
async fn process_review_events(
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
receiver: async_channel::Receiver<Event>,
) -> Option<ReviewOutputEvent> {
let mut prev_agent_message: Option<Event> = None;
@@ -137,7 +148,7 @@ async fn process_review_events(
if let Some(prev) = prev_agent_message.take() {
session
.clone_session()
.send_event(ctx.as_ref(), prev.msg)
.send_event(initial_turn_context.as_ref(), prev.msg)
.await;
}
prev_agent_message = Some(event);
@@ -166,7 +177,7 @@ async fn process_review_events(
other => {
session
.clone_session()
.send_event(ctx.as_ref(), other)
.send_event(initial_turn_context.as_ref(), other)
.await;
}
}
@@ -202,7 +213,7 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
pub(crate) async fn exit_review_mode(
session: Arc<Session>,
review_output: Option<ReviewOutputEvent>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
) {
const REVIEW_USER_MESSAGE_ID: &str = "review_rollout_user";
const REVIEW_ASSISTANT_MESSAGE_ID: &str = "review_rollout_assistant";
@@ -230,7 +241,7 @@ pub(crate) async fn exit_review_mode(
session
.record_conversation_items(
&ctx,
&initial_turn_context,
&[ResponseItem::Message {
id: Some(REVIEW_USER_MESSAGE_ID.to_string()),
role: "user".to_string(),
@@ -243,13 +254,13 @@ pub(crate) async fn exit_review_mode(
session
.send_event(
ctx.as_ref(),
initial_turn_context.as_ref(),
EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }),
)
.await;
session
.record_response_item_and_emit_turn_item(
ctx.as_ref(),
initial_turn_context.as_ref(),
ResponseItem::Message {
id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()),
role: "assistant".to_string(),

View File

@@ -38,7 +38,7 @@ impl SessionTask for UndoTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -49,7 +49,7 @@ impl SessionTask for UndoTask {
.counter("codex.task.undo", 1, &[]);
let sess = session.clone_session();
sess.send_event(
ctx.as_ref(),
initial_turn_context.as_ref(),
EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo in progress...".to_string()),
}),
@@ -58,7 +58,7 @@ impl SessionTask for UndoTask {
if cancellation_token.is_cancelled() {
sess.send_event(
ctx.as_ref(),
initial_turn_context.as_ref(),
EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Undo cancelled.".to_string()),
@@ -88,14 +88,17 @@ impl SessionTask for UndoTask {
})
else {
completed.message = Some("No ghost snapshot available to undo.".to_string());
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
sess.send_event(
initial_turn_context.as_ref(),
EventMsg::UndoCompleted(completed),
)
.await;
return None;
};
let commit_id = ghost_commit.id().to_string();
let repo_path = ctx.cwd.clone();
let ghost_snapshot = ctx.ghost_snapshot.clone();
let repo_path = initial_turn_context.cwd.clone();
let ghost_snapshot = initial_turn_context.ghost_snapshot.clone();
let restore_result = tokio::task::spawn_blocking(move || {
let options = RestoreGhostCommitOptions::new(&repo_path).ghost_snapshot(ghost_snapshot);
restore_ghost_commit_with_options(&options, &ghost_commit)
@@ -124,8 +127,11 @@ impl SessionTask for UndoTask {
}
}
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
sess.send_event(
initial_turn_context.as_ref(),
EventMsg::UndoCompleted(completed),
)
.await;
None
}
}

View File

@@ -73,13 +73,13 @@ impl SessionTask for UserShellCommandTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
turn_context: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
execute_user_shell_command(
session.clone_session(),
turn_context,
initial_turn_context,
self.command.clone(),
cancellation_token,
UserShellCommandMode::StandaloneTurn,

View File

@@ -248,14 +248,6 @@ impl NetworkApprovalService {
.await;
}
async fn active_turn_context(session: &Session) -> Option<Arc<crate::codex::TurnContext>> {
let active_turn = session.active_turn.lock().await;
active_turn
.as_ref()
.and_then(|turn| turn.tasks.first())
.map(|(_, task)| Arc::clone(&task.turn_context))
}
fn format_network_target(protocol: &str, host: &str, port: u16) -> String {
format!("{protocol}://{host}:{port}")
}
@@ -303,7 +295,7 @@ impl NetworkApprovalService {
format!("Network access to \"{target}\" was blocked by policy.");
let prompt_reason = format!("{} is not in the allowed_domains", request.host);
let Some(turn_context) = Self::active_turn_context(session).await else {
let Some(turn_context) = session.current_active_turn_context().await else {
pending.set_decision(PendingApprovalDecision::Deny).await;
let mut pending_approvals = self.pending_host_approvals.lock().await;
pending_approvals.remove(&key);
@@ -555,9 +547,43 @@ pub(crate) async fn finish_deferred_network_approval(
#[cfg(test)]
mod tests {
use super::*;
use crate::codex::make_session_and_context_with_rx;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use codex_network_proxy::BlockedRequestArgs;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::user_input::UserInput;
use pretty_assertions::assert_eq;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
#[derive(Clone, Copy)]
struct NoopTask;
#[async_trait::async_trait]
impl SessionTask for NoopTask {
fn kind(&self) -> TaskKind {
TaskKind::Regular
}
fn span_name(&self) -> &'static str {
"network_approval_test_noop_task"
}
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_initial_turn_context: Arc<crate::codex::TurnContext>,
_input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
None
}
}
#[tokio::test]
async fn pending_approvals_are_deduped_per_host_protocol_and_port() {
@@ -697,4 +723,44 @@ mod tests {
assert_eq!(service.take_call_outcome("registration-1").await, None);
assert_eq!(service.take_call_outcome("registration-2").await, None);
}
#[tokio::test]
async fn active_turn_context_prefers_refreshed_current_turn_context_over_task_snapshot() {
let (session, original_turn_context, _rx) = make_session_and_context_with_rx().await;
let next_model = if original_turn_context.model_info.slug == "gpt-5.1" {
"gpt-5"
} else {
"gpt-5.1"
};
let refreshed_turn_context = Arc::new(
original_turn_context
.with_model(next_model.to_string(), &session.services.models_manager)
.await,
);
let handle = tokio::spawn(async {});
*session.active_turn.lock().await = Some(ActiveTurn {
current_turn_context: Some(Arc::clone(&refreshed_turn_context)),
tasks: IndexMap::from([(
original_turn_context.sub_id.clone(),
RunningTask {
done: Arc::new(Notify::new()),
kind: TaskKind::Regular,
task: Arc::new(NoopTask),
cancellation_token: CancellationToken::new(),
handle: Arc::new(AbortOnDropHandle::new(handle)),
initial_turn_context: Arc::clone(&original_turn_context),
_timer: None,
},
)]),
..Default::default()
});
let active_turn_context = session
.current_active_turn_context()
.await
.expect("active turn context");
assert!(Arc::ptr_eq(&active_turn_context, &refreshed_turn_context));
}
}

View File

@@ -220,6 +220,25 @@ impl TurnMetadataState {
}
}
#[cfg(test)]
pub(crate) fn replace_enrichment_task_for_test(&self, handle: JoinHandle<()>) {
let mut task_guard = self
.enrichment_task
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(existing) = task_guard.replace(handle) {
existing.abort();
}
}
#[cfg(test)]
pub(crate) fn has_enrichment_task_for_test(&self) -> bool {
self.enrichment_task
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_some()
}
async fn fetch_workspace_git_metadata(&self) -> WorkspaceGitMetadata {
let (latest_git_commit_hash, associated_remote_urls, has_changes) = tokio::join!(
get_head_commit_hash(&self.cwd),

View File

@@ -494,6 +494,91 @@ async fn manual_compact_uses_custom_prompt() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_keeps_real_user_message_matching_compact_prompt() {
skip_if_no_network!();
let server = start_mock_server().await;
let first_turn = sse(vec![
ev_assistant_message("m0", FIRST_REPLY),
ev_completed_with_tokens("r0", 80),
]);
let compact_turn = sse(vec![
ev_assistant_message("m1", SUMMARY_TEXT),
ev_completed_with_tokens("r1", 100),
]);
let follow_up_turn = sse(vec![
ev_assistant_message("m2", FINAL_REPLY),
ev_completed_with_tokens("r2", 120),
]);
let request_log =
mount_sse_sequence(&server, vec![first_turn, compact_turn, follow_up_turn]).await;
let compact_prompt = "real user message that matches compact prompt";
let follow_up_message = "after compact";
let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex().with_config(move |config| {
config.model_provider = model_provider;
config.compact_prompt = Some(compact_prompt.to_string());
});
let codex = builder
.build(&server)
.await
.expect("create conversation")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: compact_prompt.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user turn");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("trigger compact");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: follow_up_message.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit follow-up turn");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(
requests.len(),
3,
"expected first turn, compact, and follow-up requests"
);
let follow_up_user_texts = requests[2].message_input_texts("user");
let compact_prompt_occurrences = follow_up_user_texts
.iter()
.filter(|text| text.as_str() == compact_prompt)
.count();
assert_eq!(
compact_prompt_occurrences, 1,
"post-compaction history should preserve the real user message that matches the compact prompt exactly once"
);
assert!(
follow_up_user_texts
.iter()
.any(|text| text.as_str() == follow_up_message),
"follow-up request should include the new user message"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_emits_api_and_local_token_usage_events() {
skip_if_no_network!();
@@ -1391,6 +1476,103 @@ async fn auto_compact_runs_after_token_limit_hit() {
);
}
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn auto_compact_keeps_last_real_user_message_when_prompt_is_summary_shaped() {
skip_if_no_network!();
let server = start_mock_server().await;
let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 70_000),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", "SECOND_REPLY"),
ev_completed_with_tokens("r2", 330_000),
]);
let sse3 = sse(vec![
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
ev_completed_with_tokens("r3", 200),
]);
let sse4 = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 120),
]);
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
let model_provider = non_openai_model_provider(&server);
let summary_shaped_prompt = format!("{SUMMARY_PREFIX}\ncustom compact prompt");
let compact_prompt = summary_shaped_prompt.clone();
let mut builder = test_codex().with_config(move |config| {
config.model_provider = model_provider;
config.compact_prompt = Some(summary_shaped_prompt);
config.model_auto_compact_token_limit = Some(200_000);
});
let codex = builder.build(&server).await.unwrap().codex;
for user in [FIRST_AUTO_MSG, SECOND_AUTO_MSG, POST_AUTO_USER_MSG] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
let requests = request_log.requests();
let follow_up = requests
.iter()
.rev()
.find(|request| {
let body = request.body_json().to_string();
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(&body, &compact_prompt)
})
.expect("follow-up request missing");
let follow_up_body = follow_up.body_json();
let input_follow_up = follow_up_body
.get("input")
.and_then(|v| v.as_array())
.expect("follow-up input");
let user_texts: Vec<String> = input_follow_up
.iter()
.filter(|item| item.get("type").and_then(|v| v.as_str()) == Some("message"))
.filter(|item| item.get("role").and_then(|v| v.as_str()) == Some("user"))
.filter_map(|item| {
item.get("content")
.and_then(|v| v.as_array())
.and_then(|arr| arr.first())
.and_then(|entry| entry.get("text"))
.and_then(|v| v.as_str())
.map(std::string::ToString::to_string)
})
.collect();
assert!(
user_texts.iter().any(|text| text == SECOND_AUTO_MSG),
"summary-shaped compact prompts must not drop the last real user message"
);
assert!(
user_texts.iter().any(|text| text == POST_AUTO_USER_MSG),
"follow-up request should include the new user message"
);
assert!(
user_texts
.iter()
.any(|text| text.contains(AUTO_SUMMARY_TEXT)),
"follow-up request should include the compacted summary"
);
}
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
@@ -2201,6 +2383,106 @@ async fn manual_compact_retries_after_context_window_error() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_retry_trimming_preserves_full_persisted_user_messages() {
skip_if_no_network!();
let server = start_mock_server().await;
let first_user_message = "first turn should survive replacement history";
let follow_up_user_message = "follow up after compact";
let user_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
]);
let compact_succeeds = sse(vec![
ev_assistant_message("m2", SUMMARY_TEXT),
ev_completed("r2"),
]);
let follow_up_turn = sse(vec![ev_completed("r3")]);
let mut responses = vec![user_turn];
for attempt in 0..5 {
responses.push(sse_failed(
&format!("resp-fail-{attempt}"),
"context_length_exceeded",
CONTEXT_LIMIT_MESSAGE,
));
}
responses.push(compact_succeeds);
responses.push(follow_up_turn);
let request_log = mount_sse_sequence(&server, responses).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200_000);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user turn");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("submit compact");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: follow_up_user_message.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit follow-up turn");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(
requests.len(),
8,
"expected first turn, compact retries, and follow-up turn"
);
let compact_attempt_bodies: Vec<String> = requests[1..7]
.iter()
.map(|request| request.body_json().to_string())
.collect();
let saw_retry_trim_message = compact_attempt_bodies
.iter()
.any(|body| !body_contains_text(body, first_user_message));
assert!(
saw_retry_trim_message,
"expected at least one compact retry request to trim the oldest user message from prompt history"
);
let follow_up_body = requests[7].body_json().to_string();
assert!(
body_contains_text(&follow_up_body, first_user_message),
"follow-up request should keep the full persisted user messages in compact replacement history"
);
assert!(
body_contains_text(&follow_up_body, follow_up_user_message),
"follow-up request should include the incoming user message"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands.
// Current main behavior around non-context manual /compact failures is known-incorrect.

View File

@@ -1,24 +1,32 @@
#![allow(clippy::expect_used)]
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::config::Constrained;
use codex_core::features::Feature;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::user_input::UserInput;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
@@ -75,6 +83,22 @@ fn format_labeled_requests_snapshot(
)
}
fn format_labeled_requests_snapshot_with_normalized_exec_output(
scenario: &str,
sections: &[(&str, &responses::ResponsesRequest)],
) -> String {
format_labeled_requests_snapshot(scenario, sections)
.lines()
.map(|line| {
line.split_once("function_call_output:").map_or_else(
|| line.to_string(),
|(prefix, _)| format!("{prefix}function_call_output:<UNIFIED_EXEC_OUTPUT>"),
)
})
.collect::<Vec<_>>()
.join("\n")
}
fn compacted_summary_only_output(summary: &str) -> Vec<ResponseItem> {
vec![ResponseItem::Compaction {
encrypted_content: summary_with_prefix(summary),
@@ -93,7 +117,7 @@ fn remote_realtime_test_codex_builder(
}
async fn start_remote_realtime_server() -> responses::WebSocketTestServer {
start_websocket_server(vec![vec![
let scripted_connection = vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_remote_compact", "instructions": "backend prompt" }
@@ -108,7 +132,12 @@ async fn start_remote_realtime_server() -> responses::WebSocketTestServer {
vec![],
vec![],
vec![],
]])
];
start_websocket_server(vec![
scripted_connection.clone(),
scripted_connection.clone(),
scripted_connection,
])
.await
}
@@ -149,6 +178,61 @@ async fn close_realtime_conversation(codex: &codex_core::CodexThread) -> Result<
Ok(())
}
fn read_rollout_lines(path: &Path) -> Result<Vec<RolloutLine>> {
Ok(fs::read_to_string(path)?
.lines()
.filter_map(|line| serde_json::from_str::<RolloutLine>(line).ok())
.collect())
}
fn first_turn_context_items(path: &Path) -> Result<Vec<TurnContextItem>> {
let rollout_lines = read_rollout_lines(path)?;
let turn_id = rollout_lines
.iter()
.find_map(|line| match &line.item {
RolloutItem::TurnContext(ctx) => ctx.turn_id.clone(),
_ => None,
})
.expect("rollout should include a turn context item");
Ok(rollout_lines
.into_iter()
.filter_map(|line| match line.item {
RolloutItem::TurnContext(ctx) if ctx.turn_id.as_deref() == Some(turn_id.as_str()) => {
Some(ctx)
}
_ => None,
})
.collect())
}
async fn expect_exec_approval(
codex: &codex_core::CodexThread,
expected_command: &str,
) -> ExecApprovalRequestEvent {
let event = wait_for_event(codex, |event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
)
})
.await;
match event {
EventMsg::ExecApprovalRequest(approval) => {
let last_arg = approval
.command
.last()
.map(String::as_str)
.unwrap_or_default();
assert_eq!(last_arg, expected_command);
approval
}
EventMsg::TurnComplete(_) => panic!("expected approval request before completion"),
other => panic!("unexpected event: {other:?}"),
}
}
fn assert_request_contains_realtime_start(request: &responses::ResponsesRequest) {
let body = request.body_json().to_string();
assert!(
@@ -173,6 +257,36 @@ fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
);
}
fn assert_request_omits_realtime_update(request: &responses::ResponsesRequest) {
let body = request.body_json().to_string();
assert!(
!body.contains("<realtime_conversation>"),
"did not expect request to contain realtime instructions"
);
}
fn realtime_end_message_count(request: &responses::ResponsesRequest) -> usize {
request
.message_input_texts("developer")
.iter()
.filter(|text| text.contains("Reason: inactive"))
.count()
}
fn ev_exec_command_call_requiring_approval(call_id: &str, command: &str) -> serde_json::Value {
responses::ev_function_call(
call_id,
"exec_command",
&serde_json::to_string(&json!({
"cmd": command,
"yield_time_ms": 1_000_u64,
"sandbox_permissions": codex_protocol::models::SandboxPermissions::RequireEscalated,
"justification": "Need to continue the paused turn"
}))
.expect("serialize exec_command arguments"),
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1874,6 +1988,393 @@ async fn snapshot_request_shape_remote_compact_resume_restates_realtime_end() ->
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_realtime_end() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
let _ = config.features.enable(Feature::RequestPermissions);
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
config.permissions.sandbox_policy =
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
});
let test = builder.build(&server).await?;
let rollout_path = test
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let shell_command = "printf mid-turn-realtime-end";
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
ev_exec_command_call_requiring_approval(
"call-remote-mid-turn-realtime-end",
shell_command,
),
responses::ev_completed("r1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_REALTIME_ENDED_REPLY"),
responses::ev_completed("r2"),
]),
],
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let approval = expect_exec_approval(test.codex.as_ref(), shell_command).await;
close_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::ExecApproval {
id: approval.effective_approval_id(),
turn_id: None,
decision: ReviewDecision::Approved,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex.submit(Op::Shutdown).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let turn_context_items = first_turn_context_items(&rollout_path)?;
assert_eq!(
turn_context_items.len(),
2,
"expected two same-turn context snapshots"
);
assert_eq!(
turn_context_items
.last()
.and_then(|item| item.realtime_active),
Some(false)
);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let first_request = &requests[0];
let second_request = &requests[1];
assert_request_contains_realtime_end(second_request);
insta::assert_snapshot!(
"remote_mid_turn_realtime_end_shapes",
format_labeled_requests_snapshot_with_normalized_exec_output(
"Realtime closes while the turn is paused on exec approval. The continuation request in the same turn emits the realtime-ended developer update, and the rollout persists a later same-turn TurnContext snapshot with realtime inactive.",
&[
("First Request Before Realtime Close", first_request),
(
"Same-Turn Continuation After Realtime Close",
second_request
),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_resume_after_same_turn_realtime_end_uses_latest_turn_context_item()
-> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
let _ = config.features.enable(Feature::RequestPermissions);
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
config.permissions.sandbox_policy =
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
});
let initial = builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let shell_command = "printf mid-turn-realtime-end";
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
ev_exec_command_call_requiring_approval(
"call-remote-resume-mid-turn-realtime-end",
shell_command,
),
responses::ev_completed("r1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_REALTIME_ENDED_REPLY"),
responses::ev_completed("r2"),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "REMOTE_AFTER_RESUME_REPLY"),
responses::ev_completed("r3"),
]),
],
)
.await;
start_realtime_conversation(initial.codex.as_ref()).await?;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let approval = expect_exec_approval(initial.codex.as_ref(), shell_command).await;
close_realtime_conversation(initial.codex.as_ref()).await?;
initial
.codex
.submit(Op::ExecApproval {
id: approval.effective_approval_id(),
turn_id: None,
decision: ReviewDecision::Approved,
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Shutdown).await?;
wait_for_event(&initial.codex, |ev| {
matches!(ev, EventMsg::ShutdownComplete)
})
.await;
let turn_context_items = first_turn_context_items(&rollout_path)?;
assert_eq!(
turn_context_items.len(),
2,
"expected two same-turn context snapshots"
);
assert_eq!(
turn_context_items
.last()
.and_then(|item| item.realtime_active),
Some(false)
);
let mut resume_builder =
remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
let _ = config.features.enable(Feature::RequestPermissions);
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
config.permissions.sandbox_policy =
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
});
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3, "expected three model requests");
let second_request = &requests[1];
let after_resume_request = &requests[2];
assert_request_contains_realtime_end(second_request);
assert_eq!(
realtime_end_message_count(after_resume_request),
1,
"expected resume to preserve exactly one historical realtime-ended message without emitting a duplicate update"
);
insta::assert_snapshot!(
"remote_resume_after_same_turn_realtime_end_uses_latest_turn_context_item_shapes",
format_labeled_requests_snapshot_with_normalized_exec_output(
"A same-turn realtime-ended update is recorded before the turn completes. On resume, replay should use the later inactive TurnContext snapshot from that same turn and preserve that historical ended message exactly once instead of emitting a duplicate update.",
&[
(
"Same-Turn Continuation After Realtime Close",
second_request
),
("Remote Post-Resume History Layout", after_resume_request),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_compact_resume_after_same_turn_realtime_end_uses_latest_turn_context_item()
-> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
let _ = config.features.enable(Feature::RequestPermissions);
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
config.permissions.sandbox_policy =
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
});
let initial = builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let shell_command = "printf mid-turn-realtime-end";
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
ev_exec_command_call_requiring_approval(
"call-remote-compact-resume-mid-turn-realtime-end",
shell_command,
),
responses::ev_completed("r1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_REALTIME_ENDED_REPLY"),
responses::ev_completed("r2"),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "REMOTE_AFTER_RESUME_REPLY"),
responses::ev_completed("r3"),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_RESUME_LATEST_TURN_CONTEXT_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(initial.codex.as_ref()).await?;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let approval = expect_exec_approval(initial.codex.as_ref(), shell_command).await;
close_realtime_conversation(initial.codex.as_ref()).await?;
initial
.codex
.submit(Op::ExecApproval {
id: approval.effective_approval_id(),
turn_id: None,
decision: ReviewDecision::Approved,
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Compact).await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Shutdown).await?;
wait_for_event(&initial.codex, |ev| {
matches!(ev, EventMsg::ShutdownComplete)
})
.await;
let turn_context_items = first_turn_context_items(&rollout_path)?;
assert_eq!(
turn_context_items.len(),
2,
"expected two same-turn context snapshots"
);
assert_eq!(
turn_context_items
.last()
.and_then(|item| item.realtime_active),
Some(false)
);
let mut resume_builder =
remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
let _ = config.features.enable(Feature::RequestPermissions);
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
config.permissions.sandbox_policy =
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
});
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3, "expected three model requests");
let second_request = &requests[1];
let after_resume_request = &requests[2];
assert_request_contains_realtime_end(second_request);
assert_request_omits_realtime_update(after_resume_request);
insta::assert_snapshot!(
"remote_compact_resume_after_same_turn_realtime_end_uses_latest_turn_context_item_shapes",
format_labeled_requests_snapshot_with_normalized_exec_output(
"A same-turn realtime-ended update is recorded before manual /compact. After compact and resume, replay should still use the later inactive TurnContext snapshot from that same turn and avoid re-emitting a stale realtime-ended developer update.",
&[
(
"Same-Turn Continuation After Realtime Close",
second_request
),
("Remote Post-Resume History Layout", after_resume_request),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input.
async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message()

View File

@@ -0,0 +1,25 @@
---
source: core/tests/suite/compact_remote.rs
assertion_line: 2334
expression: "format_labeled_requests_snapshot_with_normalized_exec_output(\"A same-turn realtime-ended update is recorded before manual /compact. After compact and resume, replay should still use the later inactive TurnContext snapshot from that same turn and avoid re-emitting a stale realtime-ended developer update.\",\n&[(\"Same-Turn Continuation After Realtime Close\", second_request),\n(\"Remote Post-Resume History Layout\", after_resume_request),])"
---
Scenario: A same-turn realtime-ended update is recorded before manual /compact. After compact and resume, replay should still use the later inactive TurnContext snapshot from that same turn and avoid re-emitting a stale realtime-ended developer update.
## Same-Turn Continuation After Realtime Close
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:function_call/exec_command
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
## Remote Post-Resume History Layout
00:compaction:encrypted=true
01:message/developer:<PERMISSIONS_INSTRUCTIONS>
02:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_TWO

View File

@@ -0,0 +1,25 @@
---
source: core/tests/suite/compact_remote.rs
assertion_line: 2070
expression: "format_labeled_requests_snapshot_with_normalized_exec_output(\"Realtime closes while the turn is paused on exec approval. The continuation request in the same turn emits the realtime-ended developer update, and the rollout persists a later same-turn TurnContext snapshot with realtime inactive.\",\n&[(\"First Request Before Realtime Close\", first_request),\n(\"Same-Turn Continuation After Realtime Close\", second_request),])"
---
Scenario: Realtime closes while the turn is paused on exec approval. The continuation request in the same turn emits the realtime-ended developer update, and the rollout persists a later same-turn TurnContext snapshot with realtime inactive.
## First Request Before Realtime Close
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
## Same-Turn Continuation After Realtime Close
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:function_call/exec_command
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...

View File

@@ -0,0 +1,31 @@
---
source: core/tests/suite/compact_remote.rs
assertion_line: 2198
expression: "format_labeled_requests_snapshot_with_normalized_exec_output(\"A same-turn realtime-ended update is recorded before the turn completes. On resume, replay should use the later inactive TurnContext snapshot from that same turn and preserve that historical ended message exactly once instead of emitting a duplicate update.\",\n&[(\"Same-Turn Continuation After Realtime Close\", second_request),\n(\"Remote Post-Resume History Layout\", after_resume_request),])"
---
Scenario: A same-turn realtime-ended update is recorded before the turn completes. On resume, replay should use the later inactive TurnContext snapshot from that same turn and preserve that historical ended message exactly once instead of emitting a duplicate update.
## Same-Turn Continuation After Realtime Close
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:function_call/exec_command
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
## Remote Post-Resume History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:function_call/exec_command
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
07:message/assistant:REMOTE_MID_TURN_REALTIME_ENDED_REPLY
08:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
09:message/user:USER_TWO

View File

@@ -2212,11 +2212,10 @@ pub struct TurnContextNetworkItem {
pub denied_domains: Vec<String>,
}
/// Persist once per real user turn after computing that turn's model-visible
/// context updates, and again after mid-turn compaction when replacement
/// history re-establishes full context, so resume/fork replay can recover the
/// latest durable baseline.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)]
/// Persist after each committed model-visible context update, and again after
/// mid-turn compaction when replacement history re-establishes full context, so
/// resume/fork replay can recover the latest durable baseline.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, TS)]
pub struct TurnContextItem {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_id: Option<String>,