Handle compaction in turn context rebuild

This commit is contained in:
Charles Cunningham
2026-01-31 23:46:30 -08:00
parent ef4ed65fee
commit d1dbd31284

View File

@@ -1809,34 +1809,113 @@ impl Session {
fn reconstruct_turn_context_history_from_rollout(
rollout_items: &[RolloutItem],
) -> Vec<Option<TurnContextItem>> {
let mut history = Vec::new();
let mut history_items = Vec::new();
let mut awaiting_turn_context = false;
let mut turn_context_history = Vec::new();
for item in rollout_items {
match item {
RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) if role == "user" => {
history.push(None);
awaiting_turn_context = true;
RolloutItem::ResponseItem(response_item) => {
history_items.push(response_item.clone());
if matches!(response_item, ResponseItem::Message { role, .. } if role == "user")
{
turn_context_history.push(None);
awaiting_turn_context = true;
}
}
RolloutItem::TurnContext(ctx) => {
if awaiting_turn_context {
if let Some(last) = history.last_mut() {
if let Some(last) = turn_context_history.last_mut() {
*last = Some(ctx.clone());
} else {
history.push(Some(ctx.clone()));
turn_context_history.push(Some(ctx.clone()));
}
awaiting_turn_context = false;
}
}
RolloutItem::Compacted(compacted) => {
if let Some(replacement) = &compacted.replacement_history {
history_items = replacement.clone();
// Remote compaction can reorder messages, so drop contexts to avoid
// misalignment.
turn_context_history = vec![None; Session::user_turn_count(&history_items)];
} else {
let user_messages = collect_user_messages(&history_items);
history_items = compact::build_compacted_history(
Vec::new(),
&user_messages,
&compacted.message,
);
turn_context_history = Self::rebuild_turn_context_history_after_compaction(
turn_context_history,
&history_items,
);
}
awaiting_turn_context = false;
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
Self::drop_last_n_user_turns_from_items(&mut history_items, rollback.num_turns);
let drop = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
let new_len = history.len().saturating_sub(drop);
history.truncate(new_len);
let new_len = turn_context_history.len().saturating_sub(drop);
turn_context_history.truncate(new_len);
awaiting_turn_context = false;
}
_ => {}
}
}
history
turn_context_history
}
fn rebuild_turn_context_history_after_compaction(
previous: Vec<Option<TurnContextItem>>,
compacted_history: &[ResponseItem],
) -> Vec<Option<TurnContextItem>> {
let user_messages = collect_user_messages(compacted_history);
let retained = user_messages.len();
let mut updated = Self::take_last_with_padding(previous, retained);
updated.push(None);
updated
}
fn take_last_with_padding(
mut history: Vec<Option<TurnContextItem>>,
retained: usize,
) -> Vec<Option<TurnContextItem>> {
if retained == 0 {
return Vec::new();
}
if history.len() >= retained {
history.split_off(history.len() - retained)
} else {
let mut padded = Vec::with_capacity(retained);
padded.resize_with(retained - history.len(), || None);
padded.append(&mut history);
padded
}
}
fn drop_last_n_user_turns_from_items(items: &mut Vec<ResponseItem>, num_turns: u32) {
if num_turns == 0 {
return;
}
let user_positions: Vec<usize> = items
.iter()
.enumerate()
.filter_map(|(idx, item)| {
matches!(item, ResponseItem::Message { role, .. } if role == "user").then_some(idx)
})
.collect();
let Some(&first_user_idx) = user_positions.first() else {
return;
};
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
let cut_idx = if n_from_end >= user_positions.len() {
first_user_idx
} else {
user_positions[user_positions.len() - n_from_end]
};
items.truncate(cut_idx);
}
fn user_turn_count(items: &[ResponseItem]) -> usize {
@@ -5131,6 +5210,67 @@ mod tests {
assert!(update_items.contains(&expected_item));
}
#[test]
fn reconstruct_turn_context_history_handles_compaction() {
let collaboration_mode = CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: "gpt-test".to_string(),
reasoning_effort: None,
developer_instructions: None,
},
};
let turn_context = TurnContextItem {
cwd: PathBuf::from("/tmp"),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
model: collaboration_mode.settings.model.clone(),
personality: None,
collaboration_mode: Some(collaboration_mode.clone()),
effort: None,
summary: ReasoningSummaryConfig::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
};
let user = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "first user".to_string(),
}],
end_turn: None,
};
let assistant = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "assistant reply".to_string(),
}],
end_turn: None,
};
let compacted = RolloutItem::Compacted(CompactedItem {
message: format!("{}\nsummary", compact::SUMMARY_PREFIX),
replacement_history: None,
});
let rollout_items = vec![
RolloutItem::ResponseItem(user),
RolloutItem::TurnContext(turn_context),
RolloutItem::ResponseItem(assistant),
compacted,
];
let history = Session::reconstruct_turn_context_history_from_rollout(&rollout_items);
let modes: Vec<Option<CollaborationMode>> = history
.iter()
.map(|item| item.as_ref().and_then(|ctx| ctx.collaboration_mode.clone()))
.collect();
assert_eq!(history.len(), 2);
assert_eq!(modes, vec![Some(collaboration_mode), None]);
}
#[tokio::test]
async fn thread_rollback_ignores_stale_turn_context() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;