clean: all history cloning (#8916)

This commit is contained in:
jif-oai
2026-01-08 18:17:18 +00:00
committed by GitHub
parent c4af304c77
commit 69898e3dba
6 changed files with 84 additions and 89 deletions

View File

@@ -1256,12 +1256,10 @@ impl Session {
);
}
RolloutItem::Compacted(compacted) => {
let snapshot = history.get_history();
// TODO(jif) clean
if let Some(replacement) = &compacted.replacement_history {
history.replace(replacement.clone());
} else {
let user_messages = collect_user_messages(&snapshot);
let user_messages = collect_user_messages(history.raw_items());
let rebuilt = compact::build_compacted_history(
self.build_initial_context(turn_context),
&user_messages,
@@ -1276,7 +1274,7 @@ impl Session {
_ => {}
}
}
history.get_history()
history.raw_items().to_vec()
}
/// Append ResponseItems to the in-memory conversation history only.
@@ -2107,7 +2105,10 @@ mod handlers {
let mut history = sess.clone_history().await;
history.drop_last_n_user_turns(num_turns);
sess.replace_history(history.get_history()).await;
// Replace with the raw items. We don't want to replace with a normalized
// version of the history.
sess.replace_history(history.raw_items().to_vec()).await;
sess.recompute_token_usage(turn_context.as_ref()).await;
sess.send_event_raw_flushed(Event {
@@ -2124,10 +2125,9 @@ mod handlers {
.terminate_all_processes()
.await;
info!("Shutting down Codex instance");
let turn_count = sess
.clone_history()
.await
.get_history()
let history = sess.clone_history().await;
let turn_count = history
.raw_items()
.iter()
.filter(|item| is_user_turn_boundary(item))
.count();
@@ -2393,7 +2393,7 @@ pub(crate) async fn run_task(
let turn_input: Vec<ResponseItem> = {
sess.record_conversation_items(&turn_context, &pending_input)
.await;
sess.clone_history().await.get_history_for_prompt()
sess.clone_history().await.for_prompt()
};
let turn_input_messages = turn_input
@@ -2934,8 +2934,8 @@ mod tests {
}))
.await;
let actual = session.state.lock().await.clone_history().get_history();
assert_eq!(expected, actual);
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
@@ -3024,8 +3024,8 @@ mod tests {
.record_initial_history(InitialHistory::Forked(rollout_items))
.await;
let actual = session.state.lock().await.clone_history().get_history();
assert_eq!(expected, actual);
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
@@ -3081,8 +3081,8 @@ mod tests {
expected.extend(initial_context);
expected.extend(turn_1);
let actual = sess.clone_history().await.get_history();
assert_eq!(expected, actual);
let history = sess.clone_history().await;
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
@@ -3107,8 +3107,8 @@ mod tests {
let rollback_event = wait_for_thread_rolled_back(&rx).await;
assert_eq!(rollback_event.num_turns, 99);
let actual = sess.clone_history().await.get_history();
assert_eq!(initial_context, actual);
let history = sess.clone_history().await;
assert_eq!(initial_context, history.raw_items());
}
#[tokio::test]
@@ -3128,8 +3128,8 @@ mod tests {
Some(CodexErrorInfo::ThreadRollbackFailed)
);
let actual = sess.clone_history().await.get_history();
assert_eq!(initial_context, actual);
let history = sess.clone_history().await;
assert_eq!(initial_context, history.raw_items());
}
#[tokio::test]
@@ -3149,8 +3149,8 @@ mod tests {
Some(CodexErrorInfo::ThreadRollbackFailed)
);
let actual = sess.clone_history().await.get_history();
assert_eq!(initial_context, actual);
let history = sess.clone_history().await;
assert_eq!(initial_context, history.raw_items());
}
#[tokio::test]
@@ -3651,8 +3651,8 @@ mod tests {
.record_model_warning("too many unified exec processes", &turn_context)
.await;
let mut history = session.clone_history().await;
let history_items = history.get_history();
let history = session.clone_history().await;
let history_items = history.raw_items();
let last = history_items.last().expect("warning recorded");
match last {
@@ -3798,8 +3798,9 @@ mod tests {
}
}
let history = sess.clone_history().await.get_history();
let _ = history;
// TODO(jif) investigate what is this?
let history = sess.clone_history().await;
let _ = history.raw_items();
}
#[tokio::test]
@@ -3888,7 +3889,7 @@ mod tests {
rollout_items.push(RolloutItem::ResponseItem(assistant1.clone()));
let summary1 = "summary one";
let snapshot1 = live_history.get_history();
let snapshot1 = live_history.clone().for_prompt();
let user_messages1 = collect_user_messages(&snapshot1);
let rebuilt1 = compact::build_compacted_history(
session.build_initial_context(turn_context),
@@ -3922,7 +3923,7 @@ mod tests {
rollout_items.push(RolloutItem::ResponseItem(assistant2.clone()));
let summary2 = "summary two";
let snapshot2 = live_history.get_history();
let snapshot2 = live_history.clone().for_prompt();
let user_messages2 = collect_user_messages(&snapshot2);
let rebuilt2 = compact::build_compacted_history(
session.build_initial_context(turn_context),
@@ -3955,7 +3956,7 @@ mod tests {
live_history.record_items(std::iter::once(&assistant3), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant3.clone()));
(rollout_items, live_history.get_history())
(rollout_items, live_history.for_prompt())
}
#[tokio::test]

View File

@@ -95,9 +95,11 @@ async fn run_compact_task_inner(
sess.persist_rollout_items(&[rollout_item]).await;
loop {
let turn_input = history.get_history_for_prompt();
// Clone is required because of the loop
let turn_input = history.clone().for_prompt();
let turn_input_len = turn_input.len();
let prompt = Prompt {
input: turn_input.clone(),
input: turn_input,
..Default::default()
};
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
@@ -119,7 +121,7 @@ async fn run_compact_task_inner(
return;
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input.len() > 1 {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
@@ -155,15 +157,15 @@ async fn run_compact_task_inner(
}
}
let history_snapshot = sess.clone_history().await.get_history();
let summary_suffix =
get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let history_snapshot = sess.clone_history().await;
let history_items = history_snapshot.raw_items();
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(&history_snapshot);
let user_messages = collect_user_messages(history_items);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
let ghost_snapshots: Vec<ResponseItem> = history_items
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()

View File

@@ -40,9 +40,18 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
let mut history = sess.clone_history().await;
let history = sess.clone_history().await;
// Required to keep `/undo` available after compaction
let ghost_snapshots: Vec<ResponseItem> = history
.raw_items()
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
let prompt = Prompt {
input: history.get_history_for_prompt(),
input: history.for_prompt(),
tools: vec![],
parallel_tool_calls: false,
base_instructions_override: turn_context.base_instructions.clone(),
@@ -53,13 +62,6 @@ async fn run_remote_compact_task_inner_impl(
.client
.compact_conversation_history(&prompt)
.await?;
// Required to keep `/undo` available after compaction
let ghost_snapshots: Vec<ResponseItem> = history
.get_history()
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);

View File

@@ -67,17 +67,18 @@ impl ContextManager {
}
}
pub(crate) fn get_history(&mut self) -> Vec<ResponseItem> {
/// Returns the history prepared for sending to the model. This applies a proper
/// normalization and drop un-suited items.
pub(crate) fn for_prompt(mut self) -> Vec<ResponseItem> {
self.normalize_history();
self.contents()
self.items
.retain(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }));
self.items
}
// Returns the history prepared for sending to the model.
// With extra response items filtered out and GhostCommits removed.
pub(crate) fn get_history_for_prompt(&mut self) -> Vec<ResponseItem> {
let mut history = self.get_history();
Self::remove_ghost_snapshots(&mut history);
history
/// Returns raw items in the history.
pub(crate) fn raw_items(&self) -> &[ResponseItem] {
&self.items
}
// Estimate token usage using byte-based heuristics from the truncation helpers.
@@ -168,9 +169,7 @@ impl ContextManager {
return;
}
// Keep behavior consistent with call sites that previously operated on `get_history()`:
// normalize first (call/output invariants), then truncate based on the normalized view.
let snapshot = self.get_history();
let snapshot = self.items.clone();
let user_positions = user_message_positions(&snapshot);
let Some(&first_user_idx) = user_positions.first() else {
self.replace(snapshot);
@@ -250,15 +249,6 @@ impl ContextManager {
normalize::remove_orphan_outputs(&mut self.items);
}
/// Returns a clone of the contents in the transcript.
fn contents(&self) -> Vec<ResponseItem> {
self.items.clone()
}
fn remove_ghost_snapshots(items: &mut Vec<ResponseItem>) {
items.retain(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }));
}
fn process_item(&self, item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
let policy_with_serialization_budget = policy.mul(1.2);
match item {

View File

@@ -101,7 +101,7 @@ fn filters_non_api_messages() {
let a = assistant_msg("hello");
h.record_items([&u, &a], policy);
let items = h.contents();
let items = h.raw_items();
assert_eq!(
items,
vec![
@@ -160,8 +160,8 @@ fn get_history_for_prompt_drops_ghost_commits() {
let items = vec![ResponseItem::GhostSnapshot {
ghost_commit: GhostCommit::new("ghost-1".to_string(), None, Vec::new(), Vec::new()),
}];
let mut history = create_history_with_items(items);
let filtered = history.get_history_for_prompt();
let history = create_history_with_items(items);
let filtered = history.for_prompt();
assert_eq!(filtered, vec![]);
}
@@ -184,7 +184,7 @@ fn remove_first_item_removes_matching_output_for_function_call() {
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
assert_eq!(h.raw_items(), vec![]);
}
#[test]
@@ -206,7 +206,7 @@ fn remove_first_item_removes_matching_call_for_output() {
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
assert_eq!(h.raw_items(), vec![]);
}
#[test]
@@ -234,7 +234,7 @@ fn remove_first_item_handles_local_shell_pair() {
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
assert_eq!(h.raw_items(), vec![]);
}
#[test]
@@ -250,7 +250,7 @@ fn drop_last_n_user_turns_preserves_prefix() {
let mut history = create_history_with_items(items);
history.drop_last_n_user_turns(1);
assert_eq!(
history.get_history(),
history.for_prompt(),
vec![
assistant_msg("session prefix item"),
user_msg("u1"),
@@ -267,7 +267,7 @@ fn drop_last_n_user_turns_preserves_prefix() {
]);
history.drop_last_n_user_turns(99);
assert_eq!(
history.get_history(),
history.for_prompt(),
vec![assistant_msg("session prefix item")]
);
}
@@ -307,7 +307,7 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
assistant_msg("turn 1 assistant"),
];
assert_eq!(history.get_history(), expected_prefix_and_first_turn);
assert_eq!(history.for_prompt(), expected_prefix_and_first_turn);
let expected_prefix_only = vec![
user_input_text_msg("<environment_context>ctx</environment_context>"),
@@ -337,7 +337,7 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
assistant_msg("turn 2 assistant"),
]);
history.drop_last_n_user_turns(2);
assert_eq!(history.get_history(), expected_prefix_only);
assert_eq!(history.for_prompt(), expected_prefix_only);
let mut history = create_history_with_items(vec![
user_input_text_msg("<environment_context>ctx</environment_context>"),
@@ -355,7 +355,7 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
assistant_msg("turn 2 assistant"),
]);
history.drop_last_n_user_turns(3);
assert_eq!(history.get_history(), expected_prefix_only);
assert_eq!(history.for_prompt(), expected_prefix_only);
}
#[test]
@@ -375,7 +375,7 @@ fn remove_first_item_handles_custom_tool_pair() {
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
assert_eq!(h.raw_items(), vec![]);
}
#[test]
@@ -402,8 +402,8 @@ fn normalization_retains_local_shell_outputs() {
},
];
let mut history = create_history_with_items(items.clone());
let normalized = history.get_history();
let history = create_history_with_items(items.clone());
let normalized = history.for_prompt();
assert_eq!(normalized, items);
}
@@ -607,7 +607,7 @@ fn normalize_adds_missing_output_for_function_call() {
h.normalize_history();
assert_eq!(
h.contents(),
h.raw_items(),
vec![
ResponseItem::FunctionCall {
id: None,
@@ -641,7 +641,7 @@ fn normalize_adds_missing_output_for_custom_tool_call() {
h.normalize_history();
assert_eq!(
h.contents(),
h.raw_items(),
vec![
ResponseItem::CustomToolCall {
id: None,
@@ -678,7 +678,7 @@ fn normalize_adds_missing_output_for_local_shell_call_with_id() {
h.normalize_history();
assert_eq!(
h.contents(),
h.raw_items(),
vec![
ResponseItem::LocalShellCall {
id: None,
@@ -717,7 +717,7 @@ fn normalize_removes_orphan_function_call_output() {
h.normalize_history();
assert_eq!(h.contents(), vec![]);
assert_eq!(h.raw_items(), vec![]);
}
#[cfg(not(debug_assertions))]
@@ -731,7 +731,7 @@ fn normalize_removes_orphan_custom_tool_call_output() {
h.normalize_history();
assert_eq!(h.contents(), vec![]);
assert_eq!(h.raw_items(), vec![]);
}
#[cfg(not(debug_assertions))]
@@ -780,7 +780,7 @@ fn normalize_mixed_inserts_and_removals() {
h.normalize_history();
assert_eq!(
h.contents(),
h.raw_items(),
vec![
ResponseItem::FunctionCall {
id: None,
@@ -840,7 +840,7 @@ fn normalize_adds_missing_output_for_function_call_inserts_output() {
let mut h = create_history_with_items(items);
h.normalize_history();
assert_eq!(
h.contents(),
h.raw_items(),
vec![
ResponseItem::FunctionCall {
id: None,

View File

@@ -64,8 +64,8 @@ impl SessionTask for UndoTask {
return None;
}
let mut history = sess.clone_history().await;
let mut items = history.get_history();
let history = sess.clone_history().await;
let mut items = history.raw_items().to_vec();
let mut completed = UndoCompletedEvent {
success: false,
message: None,