mirror of
https://github.com/openai/codex.git
synced 2026-05-06 12:26:38 +00:00
Compare commits
24 Commits
pr20381
...
cc/turn-co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97e64b8e4d | ||
|
|
ff975f2f62 | ||
|
|
e2fae5de8d | ||
|
|
c360485a13 | ||
|
|
be08067aca | ||
|
|
c727a66bb9 | ||
|
|
4b6c5880fb | ||
|
|
03119b822c | ||
|
|
23e54a9593 | ||
|
|
8f56878928 | ||
|
|
ec5f273255 | ||
|
|
174cc46016 | ||
|
|
ae9050fb7c | ||
|
|
f06e7c9bb9 | ||
|
|
857fb71ea1 | ||
|
|
1f50631422 | ||
|
|
2fc485dd28 | ||
|
|
82e08ddece | ||
|
|
d88b47d010 | ||
|
|
62d206169e | ||
|
|
f9f0aa8b57 | ||
|
|
068272f32f | ||
|
|
d8f4093818 | ||
|
|
2327519d34 |
File diff suppressed because it is too large
Load Diff
@@ -170,10 +170,15 @@ impl Session {
|
||||
active_segment.turn_id.as_deref(),
|
||||
ctx.turn_id.as_deref(),
|
||||
) {
|
||||
active_segment.previous_turn_settings = Some(PreviousTurnSettings {
|
||||
model: ctx.model.clone(),
|
||||
realtime_active: ctx.realtime_active,
|
||||
});
|
||||
// Reverse replay walks newest-to-oldest, so the first surviving
|
||||
// `TurnContextItem` we see for this turn segment is the newest
|
||||
// surviving context snapshot for the turn.
|
||||
if active_segment.previous_turn_settings.is_none() {
|
||||
active_segment.previous_turn_settings = Some(PreviousTurnSettings {
|
||||
model: ctx.model.clone(),
|
||||
realtime_active: ctx.realtime_active,
|
||||
});
|
||||
}
|
||||
if matches!(
|
||||
active_segment.reference_context_item,
|
||||
TurnReferenceContextItem::NeverSet
|
||||
|
||||
@@ -835,6 +835,129 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_prefers_latest_turn_context_item_within_single_turn() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let mut earlier_context_item = turn_context.to_turn_context_item();
|
||||
earlier_context_item.realtime_active = Some(true);
|
||||
let mut latest_context_item = earlier_context_item.clone();
|
||||
latest_context_item.realtime_active = Some(false);
|
||||
let turn_id = earlier_context_item
|
||||
.turn_id
|
||||
.clone()
|
||||
.expect("turn context should have turn_id");
|
||||
|
||||
let rollout_items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
turn_id: turn_id.clone(),
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: ModeKind::Default,
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(
|
||||
codex_protocol::protocol::UserMessageEvent {
|
||||
message: "seed".to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
},
|
||||
)),
|
||||
RolloutItem::TurnContext(earlier_context_item),
|
||||
RolloutItem::TurnContext(latest_context_item.clone()),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id,
|
||||
last_agent_message: None,
|
||||
},
|
||||
)),
|
||||
];
|
||||
|
||||
session
|
||||
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id: ThreadId::default(),
|
||||
history: rollout_items,
|
||||
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
|
||||
}))
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
session.previous_turn_settings().await,
|
||||
Some(PreviousTurnSettings {
|
||||
model: turn_context.model_info.slug.clone(),
|
||||
realtime_active: Some(false),
|
||||
})
|
||||
);
|
||||
assert_eq!(
|
||||
session.reference_context_item().await,
|
||||
Some(latest_context_item)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_compaction_reestablishes_latest_turn_context_item_within_single_turn()
|
||||
{
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let mut earlier_context_item = turn_context.to_turn_context_item();
|
||||
earlier_context_item.realtime_active = Some(true);
|
||||
let mut latest_context_item = earlier_context_item.clone();
|
||||
latest_context_item.realtime_active = Some(false);
|
||||
let turn_id = earlier_context_item
|
||||
.turn_id
|
||||
.clone()
|
||||
.expect("turn context should have turn_id");
|
||||
|
||||
let rollout_items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
turn_id: turn_id.clone(),
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: ModeKind::Default,
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(
|
||||
codex_protocol::protocol::UserMessageEvent {
|
||||
message: "seed".to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
},
|
||||
)),
|
||||
RolloutItem::TurnContext(earlier_context_item),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: String::new(),
|
||||
replacement_history: Some(Vec::new()),
|
||||
}),
|
||||
RolloutItem::TurnContext(latest_context_item.clone()),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id,
|
||||
last_agent_message: None,
|
||||
},
|
||||
)),
|
||||
];
|
||||
|
||||
session
|
||||
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id: ThreadId::default(),
|
||||
history: rollout_items,
|
||||
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
|
||||
}))
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
session.previous_turn_settings().await,
|
||||
Some(PreviousTurnSettings {
|
||||
model: turn_context.model_info.slug.clone(),
|
||||
realtime_active: Some(false),
|
||||
})
|
||||
);
|
||||
assert_eq!(
|
||||
session.reference_context_item().await,
|
||||
Some(latest_context_item)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_turn_for_compaction_accounting()
|
||||
{
|
||||
|
||||
@@ -99,6 +99,7 @@ async fn run_compact_task_inner(
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
|
||||
let mut history = sess.clone_history().await;
|
||||
let persisted_history_items = history.raw_items().to_vec();
|
||||
history.record_items(
|
||||
&[initial_input_for_turn.into()],
|
||||
turn_context.truncation_policy,
|
||||
@@ -188,12 +189,15 @@ async fn run_compact_task_inner(
|
||||
}
|
||||
}
|
||||
|
||||
let history_snapshot = sess.clone_history().await;
|
||||
let history_items = history_snapshot.raw_items();
|
||||
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
|
||||
let summary_suffix = {
|
||||
let history_snapshot = sess.clone_history().await;
|
||||
get_last_assistant_message_from_turn(history_snapshot.raw_items()).unwrap_or_default()
|
||||
};
|
||||
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
|
||||
let user_messages = collect_user_messages(history_items);
|
||||
|
||||
// Build replacement history from persisted session history, not the retry-local
|
||||
// prompt buffer. Retries may trim oldest prompt items to fit context limits, but
|
||||
// replacement history must preserve prior real user messages and ghost snapshots.
|
||||
let user_messages = collect_user_messages(&persisted_history_items);
|
||||
let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text);
|
||||
|
||||
if matches!(
|
||||
@@ -204,7 +208,7 @@ async fn run_compact_task_inner(
|
||||
new_history =
|
||||
insert_initial_context_before_last_real_user_or_summary(new_history, initial_context);
|
||||
}
|
||||
let ghost_snapshots: Vec<ResponseItem> = history_items
|
||||
let ghost_snapshots: Vec<ResponseItem> = persisted_history_items
|
||||
.iter()
|
||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned()
|
||||
|
||||
@@ -310,6 +310,8 @@ pub(crate) async fn handle_start(
|
||||
};
|
||||
|
||||
info!("realtime conversation started");
|
||||
sess.refresh_current_active_turn_context_from_realtime_state()
|
||||
.await;
|
||||
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id.clone(),
|
||||
@@ -354,6 +356,9 @@ pub(crate) async fn handle_start(
|
||||
}
|
||||
if realtime_active.swap(false, Ordering::Relaxed) {
|
||||
info!("realtime conversation transport closed");
|
||||
sess_clone
|
||||
.refresh_current_active_turn_context_from_realtime_state()
|
||||
.await;
|
||||
sess_clone
|
||||
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
|
||||
RealtimeConversationClosedEvent {
|
||||
@@ -455,6 +460,8 @@ pub(crate) async fn handle_text(
|
||||
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
|
||||
match sess.conversation.shutdown().await {
|
||||
Ok(()) => {
|
||||
sess.refresh_current_active_turn_context_from_realtime_state()
|
||||
.await;
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::tasks::SessionTask;
|
||||
|
||||
/// Metadata about the currently running turn.
|
||||
pub(crate) struct ActiveTurn {
|
||||
pub(crate) current_turn_context: Option<Arc<TurnContext>>,
|
||||
pub(crate) tasks: IndexMap<String, RunningTask>,
|
||||
pub(crate) turn_state: Arc<Mutex<TurnState>>,
|
||||
}
|
||||
@@ -27,6 +28,7 @@ pub(crate) struct ActiveTurn {
|
||||
impl Default for ActiveTurn {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
current_turn_context: None,
|
||||
tasks: IndexMap::new(),
|
||||
turn_state: Arc::new(Mutex::new(TurnState::default())),
|
||||
}
|
||||
@@ -46,14 +48,15 @@ pub(crate) struct RunningTask {
|
||||
pub(crate) task: Arc<dyn SessionTask>,
|
||||
pub(crate) cancellation_token: CancellationToken,
|
||||
pub(crate) handle: Arc<AbortOnDropHandle<()>>,
|
||||
pub(crate) turn_context: Arc<TurnContext>,
|
||||
pub(crate) initial_turn_context: Arc<TurnContext>,
|
||||
// Timer recorded when the task drops to capture the full turn duration.
|
||||
pub(crate) _timer: Option<codex_otel::Timer>,
|
||||
}
|
||||
|
||||
impl ActiveTurn {
|
||||
pub(crate) fn add_task(&mut self, task: RunningTask) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
self.current_turn_context = Some(Arc::clone(&task.initial_turn_context));
|
||||
let sub_id = task.initial_turn_context.sub_id.clone();
|
||||
self.tasks.insert(sub_id, task);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,25 +24,26 @@ impl SessionTask for CompactTask {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let session = session.clone_session();
|
||||
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
|
||||
let _ = if crate::compact::should_use_remote_compact_task(&initial_turn_context.provider) {
|
||||
let _ = session.services.otel_manager.counter(
|
||||
"codex.task.compact",
|
||||
1,
|
||||
&[("type", "remote")],
|
||||
);
|
||||
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
|
||||
crate::compact_remote::run_remote_compact_task(session.clone(), initial_turn_context)
|
||||
.await
|
||||
} else {
|
||||
let _ = session.services.otel_manager.counter(
|
||||
"codex.task.compact",
|
||||
1,
|
||||
&[("type", "local")],
|
||||
);
|
||||
crate::compact::run_compact_task(session.clone(), ctx, input).await
|
||||
crate::compact::run_compact_task(session.clone(), initial_turn_context, input).await
|
||||
};
|
||||
None
|
||||
}
|
||||
|
||||
@@ -39,18 +39,18 @@ impl SessionTask for GhostSnapshotTask {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
tokio::task::spawn(async move {
|
||||
let token = self.token;
|
||||
let warnings_enabled = !ctx.ghost_snapshot.disable_warnings;
|
||||
let warnings_enabled = !initial_turn_context.ghost_snapshot.disable_warnings;
|
||||
// Channel used to signal when the snapshot work has finished so the
|
||||
// timeout warning task can exit early without sending a warning.
|
||||
let (snapshot_done_tx, snapshot_done_rx) = oneshot::channel::<()>();
|
||||
if warnings_enabled {
|
||||
let ctx_for_warning = ctx.clone();
|
||||
let initial_turn_context_for_warning = initial_turn_context.clone();
|
||||
let cancellation_token_for_warning = cancellation_token.clone();
|
||||
let session_for_warning = session.clone();
|
||||
// Fire a generic warning if the snapshot is still running after
|
||||
@@ -61,7 +61,7 @@ impl SessionTask for GhostSnapshotTask {
|
||||
_ = tokio::time::sleep(SNAPSHOT_WARNING_THRESHOLD) => {
|
||||
session_for_warning.session
|
||||
.send_event(
|
||||
&ctx_for_warning,
|
||||
&initial_turn_context_for_warning,
|
||||
EventMsg::Warning(WarningEvent {
|
||||
message: "Repository snapshot is taking longer than expected. Large untracked or ignored files can slow snapshots; consider adding large files or directories to .gitignore or disabling `undo` in your config.".to_string()
|
||||
}),
|
||||
@@ -76,12 +76,12 @@ impl SessionTask for GhostSnapshotTask {
|
||||
drop(snapshot_done_rx);
|
||||
}
|
||||
|
||||
let ctx_for_task = ctx.clone();
|
||||
let initial_turn_context_for_task = initial_turn_context.clone();
|
||||
let cancelled = tokio::select! {
|
||||
_ = cancellation_token.cancelled() => true,
|
||||
_ = async {
|
||||
let repo_path = ctx_for_task.cwd.clone();
|
||||
let ghost_snapshot = ctx_for_task.ghost_snapshot.clone();
|
||||
let repo_path = initial_turn_context_for_task.cwd.clone();
|
||||
let ghost_snapshot = initial_turn_context_for_task.ghost_snapshot.clone();
|
||||
let ghost_snapshot_for_commit = ghost_snapshot.clone();
|
||||
// Required to run in a dedicated blocking pool.
|
||||
match tokio::task::spawn_blocking(move || {
|
||||
@@ -102,7 +102,7 @@ impl SessionTask for GhostSnapshotTask {
|
||||
session
|
||||
.session
|
||||
.send_event(
|
||||
&ctx_for_task,
|
||||
&initial_turn_context_for_task,
|
||||
EventMsg::Warning(WarningEvent { message }),
|
||||
)
|
||||
.await;
|
||||
@@ -110,7 +110,7 @@ impl SessionTask for GhostSnapshotTask {
|
||||
}
|
||||
session
|
||||
.session
|
||||
.record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot {
|
||||
.record_conversation_items(&initial_turn_context, &[ResponseItem::GhostSnapshot {
|
||||
ghost_commit: ghost_commit.clone(),
|
||||
}])
|
||||
.await;
|
||||
@@ -118,26 +118,26 @@ impl SessionTask for GhostSnapshotTask {
|
||||
}
|
||||
Ok(Err(err)) => match err {
|
||||
GitToolingError::NotAGitRepository { .. } => info!(
|
||||
sub_id = ctx_for_task.sub_id.as_str(),
|
||||
sub_id = initial_turn_context_for_task.sub_id.as_str(),
|
||||
"skipping ghost snapshot because current directory is not a Git repository"
|
||||
),
|
||||
_ => {
|
||||
warn!(
|
||||
sub_id = ctx_for_task.sub_id.as_str(),
|
||||
sub_id = initial_turn_context_for_task.sub_id.as_str(),
|
||||
"failed to capture ghost snapshot: {err}"
|
||||
);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
warn!(
|
||||
sub_id = ctx_for_task.sub_id.as_str(),
|
||||
sub_id = initial_turn_context_for_task.sub_id.as_str(),
|
||||
"ghost snapshot task panicked: {err}"
|
||||
);
|
||||
let message =
|
||||
format!("Snapshots disabled after ghost snapshot panic: {err}.");
|
||||
session
|
||||
.session
|
||||
.notify_background_event(&ctx_for_task, message)
|
||||
.notify_background_event(&initial_turn_context_for_task, message)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@@ -150,7 +150,7 @@ impl SessionTask for GhostSnapshotTask {
|
||||
info!("ghost snapshot task cancelled");
|
||||
}
|
||||
|
||||
match ctx.tool_call_gate.mark_ready(token).await {
|
||||
match initial_turn_context.tool_call_gate.mark_ready(token).await {
|
||||
Ok(true) => info!("ghost snapshot gate marked ready"),
|
||||
Ok(false) => warn!("ghost snapshot gate already ready"),
|
||||
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),
|
||||
|
||||
@@ -103,7 +103,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String>;
|
||||
@@ -113,15 +113,19 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
|
||||
/// The default implementation is a no-op; override this if additional
|
||||
/// teardown or notifications are required once
|
||||
/// [`Session::abort_all_tasks`] cancels the task.
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
let _ = (session, ctx);
|
||||
async fn abort(
|
||||
&self,
|
||||
session: Arc<SessionTaskContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
) {
|
||||
let _ = (session, initial_turn_context);
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub async fn spawn_task<T: SessionTask>(
|
||||
self: &Arc<Self>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
task: T,
|
||||
) {
|
||||
@@ -138,7 +142,7 @@ impl Session {
|
||||
let done_clone = Arc::clone(&done);
|
||||
let handle = {
|
||||
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
|
||||
let ctx = Arc::clone(&turn_context);
|
||||
let task_initial_turn_context = Arc::clone(&initial_turn_context);
|
||||
let task_for_run = Arc::clone(&task);
|
||||
let task_cancellation_token = cancellation_token.child_token();
|
||||
// Task-owned turn spans keep a core-owned span open for the
|
||||
@@ -147,16 +151,16 @@ impl Session {
|
||||
"turn",
|
||||
otel.name = span_name,
|
||||
thread.id = %self.conversation_id,
|
||||
turn.id = %turn_context.sub_id,
|
||||
model = %turn_context.model_info.slug,
|
||||
turn.id = %initial_turn_context.sub_id,
|
||||
model = %initial_turn_context.model_info.slug,
|
||||
);
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let ctx_for_finish = Arc::clone(&ctx);
|
||||
let initial_turn_context_for_finish = Arc::clone(&task_initial_turn_context);
|
||||
let last_agent_message = task_for_run
|
||||
.run(
|
||||
Arc::clone(&session_ctx),
|
||||
ctx,
|
||||
task_initial_turn_context,
|
||||
input,
|
||||
task_cancellation_token.child_token(),
|
||||
)
|
||||
@@ -165,8 +169,11 @@ impl Session {
|
||||
sess.flush_rollout().await;
|
||||
if !task_cancellation_token.is_cancelled() {
|
||||
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
|
||||
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
|
||||
.await;
|
||||
sess.on_task_finished(
|
||||
Arc::clone(&initial_turn_context_for_finish),
|
||||
last_agent_message,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
done_clone.notify_waiters();
|
||||
}
|
||||
@@ -174,7 +181,7 @@ impl Session {
|
||||
)
|
||||
};
|
||||
|
||||
let timer = turn_context
|
||||
let timer = initial_turn_context
|
||||
.otel_manager
|
||||
.start_timer("codex.turn.e2e_duration_ms", &[])
|
||||
.ok();
|
||||
@@ -185,7 +192,7 @@ impl Session {
|
||||
kind: task_kind,
|
||||
task,
|
||||
cancellation_token,
|
||||
turn_context: Arc::clone(&turn_context),
|
||||
initial_turn_context: Arc::clone(&initial_turn_context),
|
||||
_timer: timer,
|
||||
};
|
||||
self.register_new_active_task(running_task).await;
|
||||
@@ -202,10 +209,10 @@ impl Session {
|
||||
|
||||
pub async fn on_task_finished(
|
||||
self: &Arc<Self>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
last_agent_message: Option<String>,
|
||||
) {
|
||||
turn_context
|
||||
initial_turn_context
|
||||
.turn_metadata_state
|
||||
.cancel_git_enrichment_task();
|
||||
|
||||
@@ -214,9 +221,14 @@ impl Session {
|
||||
let mut should_clear_active_turn = false;
|
||||
let mut token_usage_at_turn_start = None;
|
||||
let mut turn_tool_calls = 0_u64;
|
||||
let mut current_turn_metadata_state = None;
|
||||
if let Some(at) = active.as_mut()
|
||||
&& at.remove_task(&turn_context.sub_id)
|
||||
&& at.remove_task(&initial_turn_context.sub_id)
|
||||
{
|
||||
current_turn_metadata_state = at
|
||||
.current_turn_context
|
||||
.take()
|
||||
.map(|current_turn_context| Arc::clone(¤t_turn_context.turn_metadata_state));
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
pending_input = ts.take_pending_input();
|
||||
turn_tool_calls = ts.tool_calls;
|
||||
@@ -227,6 +239,9 @@ impl Session {
|
||||
*active = None;
|
||||
}
|
||||
drop(active);
|
||||
if let Some(current_turn_metadata_state) = current_turn_metadata_state {
|
||||
current_turn_metadata_state.cancel_git_enrichment_task();
|
||||
}
|
||||
if !pending_input.is_empty() {
|
||||
let pending_response_items = pending_input
|
||||
.into_iter()
|
||||
@@ -238,14 +253,14 @@ impl Session {
|
||||
// normal pre-sampling drain. This helper records the response item once, then
|
||||
// emits ItemStarted/UserMessage and ItemCompleted/UserMessage for clients.
|
||||
self.record_user_prompt_and_emit_turn_item(
|
||||
turn_context.as_ref(),
|
||||
initial_turn_context.as_ref(),
|
||||
&user_message.content,
|
||||
response_item,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
self.record_conversation_items(
|
||||
turn_context.as_ref(),
|
||||
initial_turn_context.as_ref(),
|
||||
std::slice::from_ref(&response_item),
|
||||
)
|
||||
.await;
|
||||
@@ -313,10 +328,10 @@ impl Session {
|
||||
);
|
||||
}
|
||||
let event = EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
turn_id: initial_turn_context.sub_id.clone(),
|
||||
last_agent_message,
|
||||
});
|
||||
self.send_event(turn_context.as_ref(), event).await;
|
||||
self.send_event(initial_turn_context.as_ref(), event).await;
|
||||
}
|
||||
|
||||
async fn register_new_active_task(&self, task: RunningTask) {
|
||||
@@ -332,6 +347,11 @@ impl Session {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.take() {
|
||||
Some(mut at) => {
|
||||
if let Some(current_turn_context) = at.current_turn_context.take() {
|
||||
current_turn_context
|
||||
.turn_metadata_state
|
||||
.cancel_git_enrichment_task();
|
||||
}
|
||||
at.clear_pending().await;
|
||||
|
||||
at.drain_tasks()
|
||||
@@ -348,14 +368,14 @@ impl Session {
|
||||
}
|
||||
|
||||
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
let sub_id = task.initial_turn_context.sub_id.clone();
|
||||
if task.cancellation_token.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
|
||||
task.cancellation_token.cancel();
|
||||
task.turn_context
|
||||
task.initial_turn_context
|
||||
.turn_metadata_state
|
||||
.cancel_git_enrichment_task();
|
||||
let session_task = task.task;
|
||||
@@ -372,7 +392,7 @@ impl Session {
|
||||
|
||||
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
|
||||
session_task
|
||||
.abort(session_ctx, Arc::clone(&task.turn_context))
|
||||
.abort(session_ctx, Arc::clone(&task.initial_turn_context))
|
||||
.await;
|
||||
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
@@ -387,8 +407,11 @@ impl Session {
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
};
|
||||
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
|
||||
.await;
|
||||
self.record_into_history(
|
||||
std::slice::from_ref(&marker),
|
||||
task.initial_turn_context.as_ref(),
|
||||
)
|
||||
.await;
|
||||
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
|
||||
.await;
|
||||
// Ensure the marker is durably visible before emitting TurnAborted: some clients
|
||||
@@ -397,10 +420,11 @@ impl Session {
|
||||
}
|
||||
|
||||
let event = EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some(task.turn_context.sub_id.clone()),
|
||||
turn_id: Some(task.initial_turn_context.sub_id.clone()),
|
||||
reason,
|
||||
});
|
||||
self.send_event(task.turn_context.as_ref(), event).await;
|
||||
self.send_event(task.initial_turn_context.as_ref(), event)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ impl SessionTask for RegularTask {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
@@ -85,7 +85,7 @@ impl SessionTask for RegularTask {
|
||||
let prewarmed_client_session = self.take_prewarmed_session().await;
|
||||
run_turn(
|
||||
sess,
|
||||
ctx,
|
||||
initial_turn_context,
|
||||
input,
|
||||
prewarmed_client_session,
|
||||
cancellation_token,
|
||||
|
||||
@@ -50,7 +50,7 @@ impl SessionTask for ReviewTask {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
@@ -63,33 +63,44 @@ impl SessionTask for ReviewTask {
|
||||
// Start sub-codex conversation and get the receiver for events.
|
||||
let output = match start_review_conversation(
|
||||
session.clone(),
|
||||
ctx.clone(),
|
||||
initial_turn_context.clone(),
|
||||
input,
|
||||
cancellation_token.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(receiver) => process_review_events(session.clone(), ctx.clone(), receiver).await,
|
||||
Some(receiver) => {
|
||||
process_review_events(session.clone(), initial_turn_context.clone(), receiver).await
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
if !cancellation_token.is_cancelled() {
|
||||
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
|
||||
exit_review_mode(
|
||||
session.clone_session(),
|
||||
output.clone(),
|
||||
initial_turn_context.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
exit_review_mode(session.clone_session(), None, ctx).await;
|
||||
async fn abort(
|
||||
&self,
|
||||
session: Arc<SessionTaskContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
) {
|
||||
exit_review_mode(session.clone_session(), None, initial_turn_context).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_review_conversation(
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<async_channel::Receiver<Event>> {
|
||||
let config = ctx.config.clone();
|
||||
let config = initial_turn_context.config.clone();
|
||||
let mut sub_agent_config = config.as_ref().clone();
|
||||
// Carry over review-only feature restrictions so the delegate cannot
|
||||
// re-enable blocked tools (web search, collab tools, view image).
|
||||
@@ -108,7 +119,7 @@ async fn start_review_conversation(
|
||||
let model = config
|
||||
.review_model
|
||||
.clone()
|
||||
.unwrap_or_else(|| ctx.model_info.slug.clone());
|
||||
.unwrap_or_else(|| initial_turn_context.model_info.slug.clone());
|
||||
sub_agent_config.model = Some(model);
|
||||
(run_codex_thread_one_shot(
|
||||
sub_agent_config,
|
||||
@@ -116,7 +127,7 @@ async fn start_review_conversation(
|
||||
session.models_manager(),
|
||||
input,
|
||||
session.clone_session(),
|
||||
ctx.clone(),
|
||||
initial_turn_context.clone(),
|
||||
cancellation_token,
|
||||
None,
|
||||
)
|
||||
@@ -127,7 +138,7 @@ async fn start_review_conversation(
|
||||
|
||||
async fn process_review_events(
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
receiver: async_channel::Receiver<Event>,
|
||||
) -> Option<ReviewOutputEvent> {
|
||||
let mut prev_agent_message: Option<Event> = None;
|
||||
@@ -137,7 +148,7 @@ async fn process_review_events(
|
||||
if let Some(prev) = prev_agent_message.take() {
|
||||
session
|
||||
.clone_session()
|
||||
.send_event(ctx.as_ref(), prev.msg)
|
||||
.send_event(initial_turn_context.as_ref(), prev.msg)
|
||||
.await;
|
||||
}
|
||||
prev_agent_message = Some(event);
|
||||
@@ -166,7 +177,7 @@ async fn process_review_events(
|
||||
other => {
|
||||
session
|
||||
.clone_session()
|
||||
.send_event(ctx.as_ref(), other)
|
||||
.send_event(initial_turn_context.as_ref(), other)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@@ -202,7 +213,7 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
|
||||
pub(crate) async fn exit_review_mode(
|
||||
session: Arc<Session>,
|
||||
review_output: Option<ReviewOutputEvent>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
) {
|
||||
const REVIEW_USER_MESSAGE_ID: &str = "review_rollout_user";
|
||||
const REVIEW_ASSISTANT_MESSAGE_ID: &str = "review_rollout_assistant";
|
||||
@@ -230,7 +241,7 @@ pub(crate) async fn exit_review_mode(
|
||||
|
||||
session
|
||||
.record_conversation_items(
|
||||
&ctx,
|
||||
&initial_turn_context,
|
||||
&[ResponseItem::Message {
|
||||
id: Some(REVIEW_USER_MESSAGE_ID.to_string()),
|
||||
role: "user".to_string(),
|
||||
@@ -243,13 +254,13 @@ pub(crate) async fn exit_review_mode(
|
||||
|
||||
session
|
||||
.send_event(
|
||||
ctx.as_ref(),
|
||||
initial_turn_context.as_ref(),
|
||||
EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }),
|
||||
)
|
||||
.await;
|
||||
session
|
||||
.record_response_item_and_emit_turn_item(
|
||||
ctx.as_ref(),
|
||||
initial_turn_context.as_ref(),
|
||||
ResponseItem::Message {
|
||||
id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()),
|
||||
role: "assistant".to_string(),
|
||||
|
||||
@@ -38,7 +38,7 @@ impl SessionTask for UndoTask {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
@@ -49,7 +49,7 @@ impl SessionTask for UndoTask {
|
||||
.counter("codex.task.undo", 1, &[]);
|
||||
let sess = session.clone_session();
|
||||
sess.send_event(
|
||||
ctx.as_ref(),
|
||||
initial_turn_context.as_ref(),
|
||||
EventMsg::UndoStarted(UndoStartedEvent {
|
||||
message: Some("Undo in progress...".to_string()),
|
||||
}),
|
||||
@@ -58,7 +58,7 @@ impl SessionTask for UndoTask {
|
||||
|
||||
if cancellation_token.is_cancelled() {
|
||||
sess.send_event(
|
||||
ctx.as_ref(),
|
||||
initial_turn_context.as_ref(),
|
||||
EventMsg::UndoCompleted(UndoCompletedEvent {
|
||||
success: false,
|
||||
message: Some("Undo cancelled.".to_string()),
|
||||
@@ -88,14 +88,17 @@ impl SessionTask for UndoTask {
|
||||
})
|
||||
else {
|
||||
completed.message = Some("No ghost snapshot available to undo.".to_string());
|
||||
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
|
||||
.await;
|
||||
sess.send_event(
|
||||
initial_turn_context.as_ref(),
|
||||
EventMsg::UndoCompleted(completed),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
};
|
||||
|
||||
let commit_id = ghost_commit.id().to_string();
|
||||
let repo_path = ctx.cwd.clone();
|
||||
let ghost_snapshot = ctx.ghost_snapshot.clone();
|
||||
let repo_path = initial_turn_context.cwd.clone();
|
||||
let ghost_snapshot = initial_turn_context.ghost_snapshot.clone();
|
||||
let restore_result = tokio::task::spawn_blocking(move || {
|
||||
let options = RestoreGhostCommitOptions::new(&repo_path).ghost_snapshot(ghost_snapshot);
|
||||
restore_ghost_commit_with_options(&options, &ghost_commit)
|
||||
@@ -124,8 +127,11 @@ impl SessionTask for UndoTask {
|
||||
}
|
||||
}
|
||||
|
||||
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
|
||||
.await;
|
||||
sess.send_event(
|
||||
initial_turn_context.as_ref(),
|
||||
EventMsg::UndoCompleted(completed),
|
||||
)
|
||||
.await;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,13 +73,13 @@ impl SessionTask for UserShellCommandTask {
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_turn_context: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
execute_user_shell_command(
|
||||
session.clone_session(),
|
||||
turn_context,
|
||||
initial_turn_context,
|
||||
self.command.clone(),
|
||||
cancellation_token,
|
||||
UserShellCommandMode::StandaloneTurn,
|
||||
|
||||
@@ -248,14 +248,6 @@ impl NetworkApprovalService {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn active_turn_context(session: &Session) -> Option<Arc<crate::codex::TurnContext>> {
|
||||
let active_turn = session.active_turn.lock().await;
|
||||
active_turn
|
||||
.as_ref()
|
||||
.and_then(|turn| turn.tasks.first())
|
||||
.map(|(_, task)| Arc::clone(&task.turn_context))
|
||||
}
|
||||
|
||||
fn format_network_target(protocol: &str, host: &str, port: u16) -> String {
|
||||
format!("{protocol}://{host}:{port}")
|
||||
}
|
||||
@@ -303,7 +295,7 @@ impl NetworkApprovalService {
|
||||
format!("Network access to \"{target}\" was blocked by policy.");
|
||||
let prompt_reason = format!("{} is not in the allowed_domains", request.host);
|
||||
|
||||
let Some(turn_context) = Self::active_turn_context(session).await else {
|
||||
let Some(turn_context) = session.current_active_turn_context().await else {
|
||||
pending.set_decision(PendingApprovalDecision::Deny).await;
|
||||
let mut pending_approvals = self.pending_host_approvals.lock().await;
|
||||
pending_approvals.remove(&key);
|
||||
@@ -555,9 +547,43 @@ pub(crate) async fn finish_deferred_network_approval(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context_with_rx;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
use crate::tasks::SessionTaskContext;
|
||||
use codex_network_proxy::BlockedRequestArgs;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::Notify;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::task::AbortOnDropHandle;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct NoopTask;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionTask for NoopTask {
|
||||
fn kind(&self) -> TaskKind {
|
||||
TaskKind::Regular
|
||||
}
|
||||
|
||||
fn span_name(&self) -> &'static str {
|
||||
"network_approval_test_noop_task"
|
||||
}
|
||||
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_initial_turn_context: Arc<crate::codex::TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pending_approvals_are_deduped_per_host_protocol_and_port() {
|
||||
@@ -697,4 +723,44 @@ mod tests {
|
||||
assert_eq!(service.take_call_outcome("registration-1").await, None);
|
||||
assert_eq!(service.take_call_outcome("registration-2").await, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn active_turn_context_prefers_refreshed_current_turn_context_over_task_snapshot() {
|
||||
let (session, original_turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
let next_model = if original_turn_context.model_info.slug == "gpt-5.1" {
|
||||
"gpt-5"
|
||||
} else {
|
||||
"gpt-5.1"
|
||||
};
|
||||
let refreshed_turn_context = Arc::new(
|
||||
original_turn_context
|
||||
.with_model(next_model.to_string(), &session.services.models_manager)
|
||||
.await,
|
||||
);
|
||||
let handle = tokio::spawn(async {});
|
||||
|
||||
*session.active_turn.lock().await = Some(ActiveTurn {
|
||||
current_turn_context: Some(Arc::clone(&refreshed_turn_context)),
|
||||
tasks: IndexMap::from([(
|
||||
original_turn_context.sub_id.clone(),
|
||||
RunningTask {
|
||||
done: Arc::new(Notify::new()),
|
||||
kind: TaskKind::Regular,
|
||||
task: Arc::new(NoopTask),
|
||||
cancellation_token: CancellationToken::new(),
|
||||
handle: Arc::new(AbortOnDropHandle::new(handle)),
|
||||
initial_turn_context: Arc::clone(&original_turn_context),
|
||||
_timer: None,
|
||||
},
|
||||
)]),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let active_turn_context = session
|
||||
.current_active_turn_context()
|
||||
.await
|
||||
.expect("active turn context");
|
||||
|
||||
assert!(Arc::ptr_eq(&active_turn_context, &refreshed_turn_context));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,6 +220,25 @@ impl TurnMetadataState {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn replace_enrichment_task_for_test(&self, handle: JoinHandle<()>) {
|
||||
let mut task_guard = self
|
||||
.enrichment_task
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if let Some(existing) = task_guard.replace(handle) {
|
||||
existing.abort();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn has_enrichment_task_for_test(&self) -> bool {
|
||||
self.enrichment_task
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.is_some()
|
||||
}
|
||||
|
||||
async fn fetch_workspace_git_metadata(&self) -> WorkspaceGitMetadata {
|
||||
let (latest_git_commit_hash, associated_remote_urls, has_changes) = tokio::join!(
|
||||
get_head_commit_hash(&self.cwd),
|
||||
|
||||
@@ -494,6 +494,91 @@ async fn manual_compact_uses_custom_prompt() {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn manual_compact_keeps_real_user_message_matching_compact_prompt() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let first_turn = sse(vec![
|
||||
ev_assistant_message("m0", FIRST_REPLY),
|
||||
ev_completed_with_tokens("r0", 80),
|
||||
]);
|
||||
let compact_turn = sse(vec![
|
||||
ev_assistant_message("m1", SUMMARY_TEXT),
|
||||
ev_completed_with_tokens("r1", 100),
|
||||
]);
|
||||
let follow_up_turn = sse(vec![
|
||||
ev_assistant_message("m2", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r2", 120),
|
||||
]);
|
||||
let request_log =
|
||||
mount_sse_sequence(&server, vec![first_turn, compact_turn, follow_up_turn]).await;
|
||||
|
||||
let compact_prompt = "real user message that matches compact prompt";
|
||||
let follow_up_message = "after compact";
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
config.compact_prompt = Some(compact_prompt.to_string());
|
||||
});
|
||||
let codex = builder
|
||||
.build(&server)
|
||||
.await
|
||||
.expect("create conversation")
|
||||
.codex;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: compact_prompt.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.expect("submit first user turn");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex.submit(Op::Compact).await.expect("trigger compact");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: follow_up_message.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.expect("submit follow-up turn");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"expected first turn, compact, and follow-up requests"
|
||||
);
|
||||
|
||||
let follow_up_user_texts = requests[2].message_input_texts("user");
|
||||
let compact_prompt_occurrences = follow_up_user_texts
|
||||
.iter()
|
||||
.filter(|text| text.as_str() == compact_prompt)
|
||||
.count();
|
||||
assert_eq!(
|
||||
compact_prompt_occurrences, 1,
|
||||
"post-compaction history should preserve the real user message that matches the compact prompt exactly once"
|
||||
);
|
||||
assert!(
|
||||
follow_up_user_texts
|
||||
.iter()
|
||||
.any(|text| text.as_str() == follow_up_message),
|
||||
"follow-up request should include the new user message"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn manual_compact_emits_api_and_local_token_usage_events() {
|
||||
skip_if_no_network!();
|
||||
@@ -1391,6 +1476,103 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
||||
);
|
||||
}
|
||||
|
||||
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
||||
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
|
||||
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
|
||||
async fn auto_compact_keeps_last_real_user_message_when_prompt_is_summary_shaped() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let sse1 = sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed_with_tokens("r1", 70_000),
|
||||
]);
|
||||
let sse2 = sse(vec![
|
||||
ev_assistant_message("m2", "SECOND_REPLY"),
|
||||
ev_completed_with_tokens("r2", 330_000),
|
||||
]);
|
||||
let sse3 = sse(vec![
|
||||
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
|
||||
ev_completed_with_tokens("r3", 200),
|
||||
]);
|
||||
let sse4 = sse(vec![
|
||||
ev_assistant_message("m4", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r4", 120),
|
||||
]);
|
||||
|
||||
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
|
||||
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
let summary_shaped_prompt = format!("{SUMMARY_PREFIX}\ncustom compact prompt");
|
||||
let compact_prompt = summary_shaped_prompt.clone();
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
config.compact_prompt = Some(summary_shaped_prompt);
|
||||
config.model_auto_compact_token_limit = Some(200_000);
|
||||
});
|
||||
let codex = builder.build(&server).await.unwrap().codex;
|
||||
|
||||
for user in [FIRST_AUTO_MSG, SECOND_AUTO_MSG, POST_AUTO_USER_MSG] {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: user.into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
}
|
||||
|
||||
let requests = request_log.requests();
|
||||
let follow_up = requests
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|request| {
|
||||
let body = request.body_json().to_string();
|
||||
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(&body, &compact_prompt)
|
||||
})
|
||||
.expect("follow-up request missing");
|
||||
|
||||
let follow_up_body = follow_up.body_json();
|
||||
let input_follow_up = follow_up_body
|
||||
.get("input")
|
||||
.and_then(|v| v.as_array())
|
||||
.expect("follow-up input");
|
||||
let user_texts: Vec<String> = input_follow_up
|
||||
.iter()
|
||||
.filter(|item| item.get("type").and_then(|v| v.as_str()) == Some("message"))
|
||||
.filter(|item| item.get("role").and_then(|v| v.as_str()) == Some("user"))
|
||||
.filter_map(|item| {
|
||||
item.get("content")
|
||||
.and_then(|v| v.as_array())
|
||||
.and_then(|arr| arr.first())
|
||||
.and_then(|entry| entry.get("text"))
|
||||
.and_then(|v| v.as_str())
|
||||
.map(std::string::ToString::to_string)
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert!(
|
||||
user_texts.iter().any(|text| text == SECOND_AUTO_MSG),
|
||||
"summary-shaped compact prompts must not drop the last real user message"
|
||||
);
|
||||
assert!(
|
||||
user_texts.iter().any(|text| text == POST_AUTO_USER_MSG),
|
||||
"follow-up request should include the new user message"
|
||||
);
|
||||
assert!(
|
||||
user_texts
|
||||
.iter()
|
||||
.any(|text| text.contains(AUTO_SUMMARY_TEXT)),
|
||||
"follow-up request should include the compacted summary"
|
||||
);
|
||||
}
|
||||
|
||||
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
||||
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
|
||||
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
|
||||
@@ -2201,6 +2383,106 @@ async fn manual_compact_retries_after_context_window_error() {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn manual_compact_retry_trimming_preserves_full_persisted_user_messages() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let first_user_message = "first turn should survive replacement history";
|
||||
let follow_up_user_message = "follow up after compact";
|
||||
|
||||
let user_turn = sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed("r1"),
|
||||
]);
|
||||
let compact_succeeds = sse(vec![
|
||||
ev_assistant_message("m2", SUMMARY_TEXT),
|
||||
ev_completed("r2"),
|
||||
]);
|
||||
let follow_up_turn = sse(vec![ev_completed("r3")]);
|
||||
|
||||
let mut responses = vec![user_turn];
|
||||
for attempt in 0..5 {
|
||||
responses.push(sse_failed(
|
||||
&format!("resp-fail-{attempt}"),
|
||||
"context_length_exceeded",
|
||||
CONTEXT_LIMIT_MESSAGE,
|
||||
));
|
||||
}
|
||||
responses.push(compact_succeeds);
|
||||
responses.push(follow_up_turn);
|
||||
|
||||
let request_log = mount_sse_sequence(&server, responses).await;
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
|
||||
let codex = test_codex()
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(config);
|
||||
config.model_auto_compact_token_limit = Some(200_000);
|
||||
})
|
||||
.build(&server)
|
||||
.await
|
||||
.expect("build codex")
|
||||
.codex;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: first_user_message.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.expect("submit first user turn");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex.submit(Op::Compact).await.expect("submit compact");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: follow_up_user_message.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.expect("submit follow-up turn");
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
8,
|
||||
"expected first turn, compact retries, and follow-up turn"
|
||||
);
|
||||
|
||||
let compact_attempt_bodies: Vec<String> = requests[1..7]
|
||||
.iter()
|
||||
.map(|request| request.body_json().to_string())
|
||||
.collect();
|
||||
let saw_retry_trim_message = compact_attempt_bodies
|
||||
.iter()
|
||||
.any(|body| !body_contains_text(body, first_user_message));
|
||||
assert!(
|
||||
saw_retry_trim_message,
|
||||
"expected at least one compact retry request to trim the oldest user message from prompt history"
|
||||
);
|
||||
|
||||
let follow_up_body = requests[7].body_json().to_string();
|
||||
assert!(
|
||||
body_contains_text(&follow_up_body, first_user_message),
|
||||
"follow-up request should keep the full persisted user messages in compact replacement history"
|
||||
);
|
||||
assert!(
|
||||
body_contains_text(&follow_up_body, follow_up_user_message),
|
||||
"follow-up request should include the incoming user message"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands.
|
||||
// Current main behavior around non-context manual /compact failures is known-incorrect.
|
||||
|
||||
@@ -1,24 +1,32 @@
|
||||
#![allow(clippy::expect_used)]
|
||||
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::config::Constrained;
|
||||
use codex_core::features::Feature;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::ItemStartedEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::context_snapshot;
|
||||
use core_test_support::context_snapshot::ContextSnapshotOptions;
|
||||
@@ -75,6 +83,22 @@ fn format_labeled_requests_snapshot(
|
||||
)
|
||||
}
|
||||
|
||||
fn format_labeled_requests_snapshot_with_normalized_exec_output(
|
||||
scenario: &str,
|
||||
sections: &[(&str, &responses::ResponsesRequest)],
|
||||
) -> String {
|
||||
format_labeled_requests_snapshot(scenario, sections)
|
||||
.lines()
|
||||
.map(|line| {
|
||||
line.split_once("function_call_output:").map_or_else(
|
||||
|| line.to_string(),
|
||||
|(prefix, _)| format!("{prefix}function_call_output:<UNIFIED_EXEC_OUTPUT>"),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
}
|
||||
|
||||
fn compacted_summary_only_output(summary: &str) -> Vec<ResponseItem> {
|
||||
vec![ResponseItem::Compaction {
|
||||
encrypted_content: summary_with_prefix(summary),
|
||||
@@ -93,7 +117,7 @@ fn remote_realtime_test_codex_builder(
|
||||
}
|
||||
|
||||
async fn start_remote_realtime_server() -> responses::WebSocketTestServer {
|
||||
start_websocket_server(vec![vec![
|
||||
let scripted_connection = vec![
|
||||
vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_remote_compact", "instructions": "backend prompt" }
|
||||
@@ -108,7 +132,12 @@ async fn start_remote_realtime_server() -> responses::WebSocketTestServer {
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
]])
|
||||
];
|
||||
start_websocket_server(vec![
|
||||
scripted_connection.clone(),
|
||||
scripted_connection.clone(),
|
||||
scripted_connection,
|
||||
])
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -149,6 +178,61 @@ async fn close_realtime_conversation(codex: &codex_core::CodexThread) -> Result<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_rollout_lines(path: &Path) -> Result<Vec<RolloutLine>> {
|
||||
Ok(fs::read_to_string(path)?
|
||||
.lines()
|
||||
.filter_map(|line| serde_json::from_str::<RolloutLine>(line).ok())
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn first_turn_context_items(path: &Path) -> Result<Vec<TurnContextItem>> {
|
||||
let rollout_lines = read_rollout_lines(path)?;
|
||||
let turn_id = rollout_lines
|
||||
.iter()
|
||||
.find_map(|line| match &line.item {
|
||||
RolloutItem::TurnContext(ctx) => ctx.turn_id.clone(),
|
||||
_ => None,
|
||||
})
|
||||
.expect("rollout should include a turn context item");
|
||||
|
||||
Ok(rollout_lines
|
||||
.into_iter()
|
||||
.filter_map(|line| match line.item {
|
||||
RolloutItem::TurnContext(ctx) if ctx.turn_id.as_deref() == Some(turn_id.as_str()) => {
|
||||
Some(ctx)
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn expect_exec_approval(
|
||||
codex: &codex_core::CodexThread,
|
||||
expected_command: &str,
|
||||
) -> ExecApprovalRequestEvent {
|
||||
let event = wait_for_event(codex, |event| {
|
||||
matches!(
|
||||
event,
|
||||
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
match event {
|
||||
EventMsg::ExecApprovalRequest(approval) => {
|
||||
let last_arg = approval
|
||||
.command
|
||||
.last()
|
||||
.map(String::as_str)
|
||||
.unwrap_or_default();
|
||||
assert_eq!(last_arg, expected_command);
|
||||
approval
|
||||
}
|
||||
EventMsg::TurnComplete(_) => panic!("expected approval request before completion"),
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_request_contains_realtime_start(request: &responses::ResponsesRequest) {
|
||||
let body = request.body_json().to_string();
|
||||
assert!(
|
||||
@@ -173,6 +257,36 @@ fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_request_omits_realtime_update(request: &responses::ResponsesRequest) {
|
||||
let body = request.body_json().to_string();
|
||||
assert!(
|
||||
!body.contains("<realtime_conversation>"),
|
||||
"did not expect request to contain realtime instructions"
|
||||
);
|
||||
}
|
||||
|
||||
fn realtime_end_message_count(request: &responses::ResponsesRequest) -> usize {
|
||||
request
|
||||
.message_input_texts("developer")
|
||||
.iter()
|
||||
.filter(|text| text.contains("Reason: inactive"))
|
||||
.count()
|
||||
}
|
||||
|
||||
fn ev_exec_command_call_requiring_approval(call_id: &str, command: &str) -> serde_json::Value {
|
||||
responses::ev_function_call(
|
||||
call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&json!({
|
||||
"cmd": command,
|
||||
"yield_time_ms": 1_000_u64,
|
||||
"sandbox_permissions": codex_protocol::models::SandboxPermissions::RequireEscalated,
|
||||
"justification": "Need to continue the paused turn"
|
||||
}))
|
||||
.expect("serialize exec_command arguments"),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -1874,6 +1988,393 @@ async fn snapshot_request_shape_remote_compact_resume_restates_realtime_end() ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn snapshot_request_shape_remote_mid_turn_realtime_end() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = wiremock::MockServer::start().await;
|
||||
let realtime_server = start_remote_realtime_server().await;
|
||||
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
|
||||
let _ = config.features.enable(Feature::RequestPermissions);
|
||||
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.permissions.sandbox_policy =
|
||||
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let rollout_path = test
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("rollout path");
|
||||
|
||||
let shell_command = "printf mid-turn-realtime-end";
|
||||
let responses_mock = responses::mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
ev_exec_command_call_requiring_approval(
|
||||
"call-remote-mid-turn-realtime-end",
|
||||
shell_command,
|
||||
),
|
||||
responses::ev_completed("r1"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_REALTIME_ENDED_REPLY"),
|
||||
responses::ev_completed("r2"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
start_realtime_conversation(test.codex.as_ref()).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "USER_ONE".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let approval = expect_exec_approval(test.codex.as_ref(), shell_command).await;
|
||||
close_realtime_conversation(test.codex.as_ref()).await?;
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
test.codex.submit(Op::Shutdown).await?;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
let turn_context_items = first_turn_context_items(&rollout_path)?;
|
||||
assert_eq!(
|
||||
turn_context_items.len(),
|
||||
2,
|
||||
"expected two same-turn context snapshots"
|
||||
);
|
||||
assert_eq!(
|
||||
turn_context_items
|
||||
.last()
|
||||
.and_then(|item| item.realtime_active),
|
||||
Some(false)
|
||||
);
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(requests.len(), 2, "expected two model requests");
|
||||
|
||||
let first_request = &requests[0];
|
||||
let second_request = &requests[1];
|
||||
assert_request_contains_realtime_end(second_request);
|
||||
|
||||
insta::assert_snapshot!(
|
||||
"remote_mid_turn_realtime_end_shapes",
|
||||
format_labeled_requests_snapshot_with_normalized_exec_output(
|
||||
"Realtime closes while the turn is paused on exec approval. The continuation request in the same turn emits the realtime-ended developer update, and the rollout persists a later same-turn TurnContext snapshot with realtime inactive.",
|
||||
&[
|
||||
("First Request Before Realtime Close", first_request),
|
||||
(
|
||||
"Same-Turn Continuation After Realtime Close",
|
||||
second_request
|
||||
),
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn snapshot_request_shape_remote_resume_after_same_turn_realtime_end_uses_latest_turn_context_item()
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = wiremock::MockServer::start().await;
|
||||
let realtime_server = start_remote_realtime_server().await;
|
||||
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
|
||||
let _ = config.features.enable(Feature::RequestPermissions);
|
||||
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.permissions.sandbox_policy =
|
||||
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
|
||||
});
|
||||
let initial = builder.build(&server).await?;
|
||||
let home = initial.home.clone();
|
||||
let rollout_path = initial
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("rollout path");
|
||||
|
||||
let shell_command = "printf mid-turn-realtime-end";
|
||||
let responses_mock = responses::mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
ev_exec_command_call_requiring_approval(
|
||||
"call-remote-resume-mid-turn-realtime-end",
|
||||
shell_command,
|
||||
),
|
||||
responses::ev_completed("r1"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_REALTIME_ENDED_REPLY"),
|
||||
responses::ev_completed("r2"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m3", "REMOTE_AFTER_RESUME_REPLY"),
|
||||
responses::ev_completed("r3"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
start_realtime_conversation(initial.codex.as_ref()).await?;
|
||||
|
||||
initial
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "USER_ONE".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let approval = expect_exec_approval(initial.codex.as_ref(), shell_command).await;
|
||||
close_realtime_conversation(initial.codex.as_ref()).await?;
|
||||
initial
|
||||
.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
initial.codex.submit(Op::Shutdown).await?;
|
||||
wait_for_event(&initial.codex, |ev| {
|
||||
matches!(ev, EventMsg::ShutdownComplete)
|
||||
})
|
||||
.await;
|
||||
|
||||
let turn_context_items = first_turn_context_items(&rollout_path)?;
|
||||
assert_eq!(
|
||||
turn_context_items.len(),
|
||||
2,
|
||||
"expected two same-turn context snapshots"
|
||||
);
|
||||
assert_eq!(
|
||||
turn_context_items
|
||||
.last()
|
||||
.and_then(|item| item.realtime_active),
|
||||
Some(false)
|
||||
);
|
||||
|
||||
let mut resume_builder =
|
||||
remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
|
||||
let _ = config.features.enable(Feature::RequestPermissions);
|
||||
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.permissions.sandbox_policy =
|
||||
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
|
||||
});
|
||||
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
|
||||
|
||||
resumed
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "USER_TWO".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(requests.len(), 3, "expected three model requests");
|
||||
|
||||
let second_request = &requests[1];
|
||||
let after_resume_request = &requests[2];
|
||||
assert_request_contains_realtime_end(second_request);
|
||||
assert_eq!(
|
||||
realtime_end_message_count(after_resume_request),
|
||||
1,
|
||||
"expected resume to preserve exactly one historical realtime-ended message without emitting a duplicate update"
|
||||
);
|
||||
|
||||
insta::assert_snapshot!(
|
||||
"remote_resume_after_same_turn_realtime_end_uses_latest_turn_context_item_shapes",
|
||||
format_labeled_requests_snapshot_with_normalized_exec_output(
|
||||
"A same-turn realtime-ended update is recorded before the turn completes. On resume, replay should use the later inactive TurnContext snapshot from that same turn and preserve that historical ended message exactly once instead of emitting a duplicate update.",
|
||||
&[
|
||||
(
|
||||
"Same-Turn Continuation After Realtime Close",
|
||||
second_request
|
||||
),
|
||||
("Remote Post-Resume History Layout", after_resume_request),
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn snapshot_request_shape_remote_compact_resume_after_same_turn_realtime_end_uses_latest_turn_context_item()
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = wiremock::MockServer::start().await;
|
||||
let realtime_server = start_remote_realtime_server().await;
|
||||
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
|
||||
let _ = config.features.enable(Feature::RequestPermissions);
|
||||
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.permissions.sandbox_policy =
|
||||
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
|
||||
});
|
||||
let initial = builder.build(&server).await?;
|
||||
let home = initial.home.clone();
|
||||
let rollout_path = initial
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("rollout path");
|
||||
|
||||
let shell_command = "printf mid-turn-realtime-end";
|
||||
let responses_mock = responses::mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
ev_exec_command_call_requiring_approval(
|
||||
"call-remote-compact-resume-mid-turn-realtime-end",
|
||||
shell_command,
|
||||
),
|
||||
responses::ev_completed("r1"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_REALTIME_ENDED_REPLY"),
|
||||
responses::ev_completed("r2"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m3", "REMOTE_AFTER_RESUME_REPLY"),
|
||||
responses::ev_completed("r3"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let compact_mock = responses::mount_compact_json_once(
|
||||
&server,
|
||||
serde_json::json!({
|
||||
"output": compacted_summary_only_output(
|
||||
"REMOTE_RESUME_LATEST_TURN_CONTEXT_SUMMARY"
|
||||
)
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
start_realtime_conversation(initial.codex.as_ref()).await?;
|
||||
|
||||
initial
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "USER_ONE".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let approval = expect_exec_approval(initial.codex.as_ref(), shell_command).await;
|
||||
close_realtime_conversation(initial.codex.as_ref()).await?;
|
||||
initial
|
||||
.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
initial.codex.submit(Op::Compact).await?;
|
||||
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
initial.codex.submit(Op::Shutdown).await?;
|
||||
wait_for_event(&initial.codex, |ev| {
|
||||
matches!(ev, EventMsg::ShutdownComplete)
|
||||
})
|
||||
.await;
|
||||
|
||||
let turn_context_items = first_turn_context_items(&rollout_path)?;
|
||||
assert_eq!(
|
||||
turn_context_items.len(),
|
||||
2,
|
||||
"expected two same-turn context snapshots"
|
||||
);
|
||||
assert_eq!(
|
||||
turn_context_items
|
||||
.last()
|
||||
.and_then(|item| item.realtime_active),
|
||||
Some(false)
|
||||
);
|
||||
|
||||
let mut resume_builder =
|
||||
remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
|
||||
let _ = config.features.enable(Feature::RequestPermissions);
|
||||
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.permissions.sandbox_policy =
|
||||
Constrained::allow_any(SandboxPolicy::new_read_only_policy());
|
||||
});
|
||||
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
|
||||
|
||||
resumed
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "USER_TWO".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
assert_eq!(compact_mock.requests().len(), 1);
|
||||
let requests = responses_mock.requests();
|
||||
assert_eq!(requests.len(), 3, "expected three model requests");
|
||||
|
||||
let second_request = &requests[1];
|
||||
let after_resume_request = &requests[2];
|
||||
assert_request_contains_realtime_end(second_request);
|
||||
assert_request_omits_realtime_update(after_resume_request);
|
||||
|
||||
insta::assert_snapshot!(
|
||||
"remote_compact_resume_after_same_turn_realtime_end_uses_latest_turn_context_item_shapes",
|
||||
format_labeled_requests_snapshot_with_normalized_exec_output(
|
||||
"A same-turn realtime-ended update is recorded before manual /compact. After compact and resume, replay should still use the later inactive TurnContext snapshot from that same turn and avoid re-emitting a stale realtime-ended developer update.",
|
||||
&[
|
||||
(
|
||||
"Same-Turn Continuation After Realtime Close",
|
||||
second_request
|
||||
),
|
||||
("Remote Post-Resume History Layout", after_resume_request),
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input.
|
||||
async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message()
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
---
|
||||
source: core/tests/suite/compact_remote.rs
|
||||
assertion_line: 2334
|
||||
expression: "format_labeled_requests_snapshot_with_normalized_exec_output(\"A same-turn realtime-ended update is recorded before manual /compact. After compact and resume, replay should still use the later inactive TurnContext snapshot from that same turn and avoid re-emitting a stale realtime-ended developer update.\",\n&[(\"Same-Turn Continuation After Realtime Close\", second_request),\n(\"Remote Post-Resume History Layout\", after_resume_request),])"
|
||||
---
|
||||
Scenario: A same-turn realtime-ended update is recorded before manual /compact. After compact and resume, replay should still use the later inactive TurnContext snapshot from that same turn and avoid re-emitting a stale realtime-ended developer update.
|
||||
|
||||
## Same-Turn Continuation After Realtime Close
|
||||
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
01:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
|
||||
03:message/user:USER_ONE
|
||||
04:function_call/exec_command
|
||||
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
|
||||
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
|
||||
|
||||
## Remote Post-Resume History Layout
|
||||
00:compaction:encrypted=true
|
||||
01:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
02:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
03:message/user:USER_TWO
|
||||
@@ -0,0 +1,25 @@
|
||||
---
|
||||
source: core/tests/suite/compact_remote.rs
|
||||
assertion_line: 2070
|
||||
expression: "format_labeled_requests_snapshot_with_normalized_exec_output(\"Realtime closes while the turn is paused on exec approval. The continuation request in the same turn emits the realtime-ended developer update, and the rollout persists a later same-turn TurnContext snapshot with realtime inactive.\",\n&[(\"First Request Before Realtime Close\", first_request),\n(\"Same-Turn Continuation After Realtime Close\", second_request),])"
|
||||
---
|
||||
Scenario: Realtime closes while the turn is paused on exec approval. The continuation request in the same turn emits the realtime-ended developer update, and the rollout persists a later same-turn TurnContext snapshot with realtime inactive.
|
||||
|
||||
## First Request Before Realtime Close
|
||||
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
01:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
|
||||
03:message/user:USER_ONE
|
||||
|
||||
## Same-Turn Continuation After Realtime Close
|
||||
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
01:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
|
||||
03:message/user:USER_ONE
|
||||
04:function_call/exec_command
|
||||
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
|
||||
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
|
||||
@@ -0,0 +1,31 @@
|
||||
---
|
||||
source: core/tests/suite/compact_remote.rs
|
||||
assertion_line: 2198
|
||||
expression: "format_labeled_requests_snapshot_with_normalized_exec_output(\"A same-turn realtime-ended update is recorded before the turn completes. On resume, replay should use the later inactive TurnContext snapshot from that same turn and preserve that historical ended message exactly once instead of emitting a duplicate update.\",\n&[(\"Same-Turn Continuation After Realtime Close\", second_request),\n(\"Remote Post-Resume History Layout\", after_resume_request),])"
|
||||
---
|
||||
Scenario: A same-turn realtime-ended update is recorded before the turn completes. On resume, replay should use the later inactive TurnContext snapshot from that same turn and preserve that historical ended message exactly once instead of emitting a duplicate update.
|
||||
|
||||
## Same-Turn Continuation After Realtime Close
|
||||
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
01:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
|
||||
03:message/user:USER_ONE
|
||||
04:function_call/exec_command
|
||||
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
|
||||
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
|
||||
|
||||
## Remote Post-Resume History Layout
|
||||
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
|
||||
01:message/user[2]:
|
||||
[01] <AGENTS_MD>
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
|
||||
03:message/user:USER_ONE
|
||||
04:function_call/exec_command
|
||||
05:function_call_output:<UNIFIED_EXEC_OUTPUT>
|
||||
06:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nSubsequ...
|
||||
07:message/assistant:REMOTE_MID_TURN_REALTIME_ENDED_REPLY
|
||||
08:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
09:message/user:USER_TWO
|
||||
@@ -2212,11 +2212,10 @@ pub struct TurnContextNetworkItem {
|
||||
pub denied_domains: Vec<String>,
|
||||
}
|
||||
|
||||
/// Persist once per real user turn after computing that turn's model-visible
|
||||
/// context updates, and again after mid-turn compaction when replacement
|
||||
/// history re-establishes full context, so resume/fork replay can recover the
|
||||
/// latest durable baseline.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)]
|
||||
/// Persist after each committed model-visible context update, and again after
|
||||
/// mid-turn compaction when replacement history re-establishes full context, so
|
||||
/// resume/fork replay can recover the latest durable baseline.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, TS)]
|
||||
pub struct TurnContextItem {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub turn_id: Option<String>,
|
||||
|
||||
Reference in New Issue
Block a user