chore: clean rollout extraction in memories (#11471)

This commit is contained in:
jif-oai
2026-02-11 18:25:45 +00:00
committed by GitHub
parent d4b2c230f1
commit 1170ffeeae
4 changed files with 48 additions and 197 deletions

View File

@@ -5,7 +5,6 @@
//! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent.
mod prompts;
mod rollout;
mod stage_one;
mod startup;
mod storage;

View File

@@ -1,84 +0,0 @@
use crate::error::CodexErr;
use crate::error::Result;
use crate::rollout::policy::should_persist_response_item;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
/// Controls which rollout item kinds are retained for stage-1 memory extraction.
#[derive(Debug, Clone, Copy)]
pub(super) struct StageOneRolloutFilter {
/// Keep `RolloutItem::ResponseItem` entries.
pub(super) keep_response_items: bool,
/// Keep `RolloutItem::Compacted` entries (converted to assistant messages).
pub(super) keep_compacted_items: bool,
/// Restricts kept `ResponseItem` entries. Defaults to rollout persistence policy.
pub(super) response_item_filter: fn(&ResponseItem) -> bool,
/// Optional cap on retained items after filtering.
pub(super) max_items: Option<usize>,
}
impl StageOneRolloutFilter {
pub(super) const fn response_and_compacted_items() -> Self {
Self {
keep_response_items: true,
keep_compacted_items: true,
response_item_filter: should_persist_response_item,
max_items: None,
}
}
}
impl Default for StageOneRolloutFilter {
fn default() -> Self {
Self::response_and_compacted_items()
}
}
/// Extracts stage-1 memory items from rollout JSONL entries.
///
/// `RolloutItem::Compacted` entries are converted to assistant messages so the
/// model sees the same response-item shape as normal transcript content.
pub(super) fn filter_rollout_response_items(
items: &[RolloutItem],
filter: StageOneRolloutFilter,
) -> Vec<ResponseItem> {
let mut out = Vec::new();
for item in items {
match item {
RolloutItem::ResponseItem(response_item)
if filter.keep_response_items && (filter.response_item_filter)(response_item) =>
{
out.push(response_item.clone());
}
RolloutItem::Compacted(compacted) if filter.keep_compacted_items => {
let compacted_as_message = ResponseItem::from(compacted.clone());
if (filter.response_item_filter)(&compacted_as_message) {
out.push(compacted_as_message);
}
}
RolloutItem::SessionMeta(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_) => {}
}
if let Some(limit) = filter.max_items
&& out.len() >= limit
{
break;
}
}
out
}
/// Serializes filtered stage-1 memory items for prompt inclusion.
pub(super) fn serialize_filtered_rollout_response_items(
items: &[RolloutItem],
filter: StageOneRolloutFilter,
) -> Result<String> {
let filtered = filter_rollout_response_items(items, filter);
serde_json::to_string(&filtered).map_err(|err| {
CodexErr::InvalidRequest(format!("failed to serialize rollout memory: {err}"))
})
}

View File

@@ -14,11 +14,11 @@ use tracing::warn;
use super::StageOneRequestContext;
use crate::memories::StageOneOutput;
use crate::memories::prompts::build_stage_one_input_message;
use crate::memories::rollout::StageOneRolloutFilter;
use crate::memories::rollout::serialize_filtered_rollout_response_items;
use crate::memories::stage_one::RAW_MEMORY_PROMPT;
use crate::memories::stage_one::parse_stage_one_output;
use crate::memories::stage_one::stage_one_output_schema;
use crate::rollout::policy::should_persist_response_item;
use codex_protocol::protocol::RolloutItem;
use std::path::Path;
pub(super) async fn extract_stage_one_output(
@@ -45,10 +45,7 @@ pub(super) async fn extract_stage_one_output(
);
}
let rollout_contents = match serialize_filtered_rollout_response_items(
&rollout_items,
StageOneRolloutFilter::default(),
) {
let rollout_contents = match serialize_filtered_rollout_response_items(&rollout_items) {
Ok(contents) => contents,
Err(err) => {
warn!(
@@ -149,3 +146,48 @@ async fn collect_response_text_until_completed(stream: &mut ResponseStream) -> C
}
}
}
/// Serializes filtered stage-1 memory items for prompt inclusion.
fn serialize_filtered_rollout_response_items(
items: &[RolloutItem],
) -> crate::error::Result<String> {
let filtered = items
.iter()
.filter_map(|item| {
if let RolloutItem::ResponseItem(item) = item
&& should_persist_response_item(item)
{
Some(item.clone())
} else {
None
}
})
.collect::<Vec<_>>();
serde_json::to_string(&filtered).map_err(|err| {
CodexErr::InvalidRequest(format!("failed to serialize rollout memory: {err}"))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serialize_filtered_rollout_response_items_keeps_response_items_only() {
let input = vec![RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "user input".to_string(),
}],
end_turn: None,
phase: None,
})];
let serialized = serialize_filtered_rollout_response_items(&input).expect("serialize");
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("deserialize");
pretty_assertions::assert_eq!(parsed.len(), 1);
assert!(matches!(parsed[0], ResponseItem::Message { .. }));
}
}

View File

@@ -1,5 +1,3 @@
use super::rollout::StageOneRolloutFilter;
use super::rollout::serialize_filtered_rollout_response_items;
use super::stage_one::parse_stage_one_output;
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
@@ -10,10 +8,6 @@ use crate::memories::rollout_summaries_dir;
use chrono::TimeZone;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::RolloutItem;
use codex_state::Stage1Output;
use pretty_assertions::assert_eq;
use serde_json::Value;
@@ -81,106 +75,6 @@ fn stage_one_output_schema_requires_all_declared_properties() {
assert_eq!(required_keys, property_keys);
}
#[test]
fn serialize_filtered_rollout_response_items_keeps_response_and_compacted() {
let input = vec![
RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "user input".to_string(),
}],
end_turn: None,
phase: None,
}),
RolloutItem::Compacted(CompactedItem {
message: "compacted summary".to_string(),
replacement_history: None,
}),
];
let serialized = serialize_filtered_rollout_response_items(
&input,
StageOneRolloutFilter::response_and_compacted_items(),
)
.expect("serialize");
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("deserialize");
assert_eq!(parsed.len(), 2);
assert!(matches!(parsed[0], ResponseItem::Message { .. }));
assert!(matches!(parsed[1], ResponseItem::Message { .. }));
}
#[test]
fn serialize_filtered_rollout_response_items_supports_response_only_filter() {
let input = vec![
RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "user input".to_string(),
}],
end_turn: None,
phase: None,
}),
RolloutItem::Compacted(CompactedItem {
message: "compacted summary".to_string(),
replacement_history: None,
}),
];
let serialized = serialize_filtered_rollout_response_items(
&input,
StageOneRolloutFilter {
keep_response_items: true,
keep_compacted_items: false,
response_item_filter: crate::rollout::policy::should_persist_response_item,
max_items: None,
},
)
.expect("serialize");
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("deserialize");
assert_eq!(parsed.len(), 1);
assert!(matches!(parsed[0], ResponseItem::Message { .. }));
}
#[test]
fn serialize_filtered_rollout_response_items_filters_by_response_item_kind() {
let input = vec![
RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "user input".to_string(),
}],
end_turn: None,
phase: None,
}),
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
id: None,
name: "shell".to_string(),
arguments: "{\"cmd\":\"pwd\"}".to_string(),
call_id: "call-1".to_string(),
}),
];
let serialized = serialize_filtered_rollout_response_items(
&input,
StageOneRolloutFilter {
keep_response_items: true,
keep_compacted_items: false,
response_item_filter: |item| matches!(item, ResponseItem::Message { .. }),
max_items: None,
},
)
.expect("serialize");
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("deserialize");
assert_eq!(parsed.len(), 1);
assert!(matches!(parsed[0], ResponseItem::Message { .. }));
}
#[tokio::test]
async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only() {
let dir = tempdir().expect("tempdir");