fix(app-server, core): defer initial context write to rollout file until first turn (#9950)

### Overview
Currently calling `thread/resume` will always bump the thread's
`updated_at` timestamp. This PR makes it the `updated_at` timestamp
changes only if a turn is triggered.

### Additonal context
What we typically do on resuming a thread is **always** writing “initial
context” to the rollout file immediately. This initial context includes:
- Developer instructions derived from sandbox/approval policy + cwd
- Optional developer instructions (if provided)
- Optional collaboration-mode instructions
- Optional user instructions (if provided)
- Environment context (cwd, shell, etc.)

This PR defers writing the “initial context” to the rollout file until
the first `turn/start`, so we don't inadvertently bump the thread's
`updated_at` timestamp until a turn is actually triggered.

This works even though both `thread/resume` and `turn/start` accept
overrides (such as `model`, `cwd`, etc.) because the initial context is
seeded from the effective `TurnContext` in memory, computed at
`turn/start` time, after both sets of overrides have been applied.

**NOTE**: This is a very short-lived solution until we introduce sqlite.
Then we can remove this.
This commit is contained in:
Owen Lin
2026-01-27 10:41:54 -08:00
committed by GitHub
parent 877b76bb9d
commit fc0fd85349
4 changed files with 269 additions and 17 deletions

View File

@@ -933,23 +933,28 @@ impl Session {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
let persist = matches!(conversation_history, InitialHistory::Forked(_));
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = false;
}
// If resuming, warn when the last recorded model differs from the current one.
if let InitialHistory::Resumed(_) = conversation_history
&& let Some(prev) = rollout_items.iter().rev().find_map(|it| {
if let RolloutItem::TurnContext(ctx) = it {
Some(ctx.model.as_str())
} else {
None
}
})
{
if let Some(prev) = rollout_items.iter().rev().find_map(|it| {
if let RolloutItem::TurnContext(ctx) = it {
Some(ctx.model.as_str())
} else {
None
}
}) {
let curr = turn_context.client.get_model();
if prev != curr {
warn!(
@@ -984,8 +989,29 @@ impl Session {
state.set_token_info(Some(info));
}
// Defer seeding the session's initial context until the first turn starts so
// turn/start overrides can be merged before we write to the rollout.
self.flush_rollout().await;
}
InitialHistory::Forked(rollout_items) => {
// Always add response items to conversation history
let reconstructed_history = self
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
if !reconstructed_history.is_empty() {
self.record_into_history(&reconstructed_history, &turn_context)
.await;
}
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
// If persisting, persist all rollout items as-is (recorder filters)
if persist && !rollout_items.is_empty() {
if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
@@ -993,6 +1019,10 @@ impl Session {
let initial_context = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &initial_context)
.await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
}
@@ -1643,6 +1673,21 @@ impl Session {
state.replace_history(items);
}
pub(crate) async fn seed_initial_context_if_needed(&self, turn_context: &TurnContext) {
{
let mut state = self.state.lock().await;
if state.initial_context_seeded {
return;
}
state.initial_context_seeded = true;
}
let initial_context = self.build_initial_context(turn_context).await;
self.record_conversation_items(turn_context, &initial_context)
.await;
self.flush_rollout().await;
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
@@ -2332,6 +2377,11 @@ mod handlers {
return;
}
let initial_context_seeded = sess.state.lock().await.initial_context_seeded;
if !initial_context_seeded {
return;
}
let current_context = sess.new_default_turn_with_sub_id(sub_id).await;
let update_items = sess.build_settings_update_items(
Some(&previous_context),
@@ -2419,6 +2469,7 @@ mod handlers {
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
sess.seed_initial_context_if_needed(&current_context).await;
let update_items = sess.build_settings_update_items(
previous_context.as_ref(),
&current_context,
@@ -3719,6 +3770,23 @@ mod tests {
#[tokio::test]
async fn record_initial_history_reconstructs_resumed_transcript() {
let (session, turn_context) = make_session_and_context().await;
let (rollout_items, expected) = sample_rollout(&session, &turn_context).await;
session
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
conversation_id: ThreadId::default(),
history: rollout_items,
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
}))
.await;
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn resumed_history_seeds_initial_context_on_first_turn_only() {
let (session, turn_context) = make_session_and_context().await;
let (rollout_items, mut expected) = sample_rollout(&session, &turn_context).await;
@@ -3730,9 +3798,17 @@ mod tests {
}))
.await;
let history_before_seed = session.state.lock().await.clone_history();
assert_eq!(expected, history_before_seed.raw_items());
session.seed_initial_context_if_needed(&turn_context).await;
expected.extend(session.build_initial_context(&turn_context).await);
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
let history_after_seed = session.clone_history().await;
assert_eq!(expected, history_after_seed.raw_items());
session.seed_initial_context_if_needed(&turn_context).await;
let history_after_second_seed = session.clone_history().await;
assert_eq!(expected, history_after_second_seed.raw_items());
}
#[tokio::test]
@@ -4347,7 +4423,8 @@ mod tests {
session_configuration.session_source.clone(),
);
let state = SessionState::new(session_configuration.clone());
let mut state = SessionState::new(session_configuration.clone());
mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
let services = SessionServices {
@@ -4456,7 +4533,8 @@ mod tests {
session_configuration.session_source.clone(),
);
let state = SessionState::new(session_configuration.clone());
let mut state = SessionState::new(session_configuration.clone());
mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
let services = SessionServices {
@@ -4502,6 +4580,10 @@ mod tests {
(session, turn_context, rx_event)
}
fn mark_state_initial_context_seeded(state: &mut SessionState) {
state.initial_context_seeded = true;
}
#[tokio::test]
async fn refresh_mcp_servers_is_deferred_until_next_turn() {
let (session, turn_context) = make_session_and_context().await;