Compare commits

...

5 Commits

Author SHA1 Message Date
jif-oai
21a4f8753a comment 2026-04-21 15:16:35 +01:00
jif-oai
200afcd72d Merge remote-tracking branch 'origin/main' into jif/diff-based-phase-2
# Conflicts:
#	codex-rs/Cargo.lock
2026-04-21 15:16:06 +01:00
jif-oai
44e29840dc process comment 2026-04-21 15:12:09 +01:00
jif-oai
3cc268c947 make it better 2026-04-20 17:37:02 +01:00
jif-oai
738b915021 feat: move phase 2 to a diff based system 2026-04-20 17:11:42 +01:00
16 changed files with 2492 additions and 344 deletions

76
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

1079
codex-rs/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -244,6 +244,7 @@ env_logger = "0.11.9"
eventsource-stream = "0.2.3"
futures = { version = "0.3", default-features = false }
gethostname = "1.1.0"
gix = { version = "0.81.0", default-features = false, features = ["sha1"] }
glob = "0.3"
globset = "0.4"
hmac = "0.12.1"

View File

@@ -81,6 +81,7 @@ dunce = { workspace = true }
env-flags = { workspace = true }
eventsource-stream = { workspace = true }
futures = { workspace = true }
gix = { workspace = true }
http = { workspace = true }
iana-time-zone = { workspace = true }
image = { workspace = true, features = ["jpeg", "png", "webp"] }

View File

@@ -83,26 +83,31 @@ What it does:
- syncs local memory artifacts under the memories root:
- `raw_memories.md` (merged raw memories, latest first)
- `rollout_summaries/` (one summary file per retained rollout)
- keeps the memories root itself as a git repository, initialized under
`~/.codex/memories/.git`
- keeps `phase2_workspace_diff.md` in the memories root `.gitignore`
- prunes stale rollout summaries that are no longer retained
- finds old resource files from memory extensions under
`memories_extensions/<extension>/resources/` for extension directories that
`memories/extensions/<extension>/resources/` for extension directories that
have an `instructions.md`, using the memory module retention window
- if there are no Phase 1 inputs or old extension resources, marks the job
- removes old extension resources before the consolidation agent runs, so the
deletion appears in the workspace diff
- writes `phase2_workspace_diff.md` in the memories root with the git-style diff
from the previous successful Phase 2 commit to the current worktree; this
generated file is ignored by the memories git workspace and commits
- if the memory workspace has no git changes after artifact sync/pruning, marks the job
successful and exits
If there is input, it then:
- spawns an internal consolidation sub-agent
- builds the Phase 2 prompt with a diff of the current Phase 1 input
selection versus the last successful Phase 2 selection (`added`,
`retained`, `removed`)
- includes old extension resource paths in the prompt diff
- builds the Phase 2 prompt with the path to the generated workspace diff
- points the agent at `phase2_workspace_diff.md` for the detailed diff context
- runs it with no approvals, no network, and local write access only
- disables collab for that agent (to prevent recursive delegation)
- watches the agent status and heartbeats the global job lease while it runs
- commits all memory workspace changes after the agent completes successfully
- marks the phase-2 job success/failure in the state DB when the agent finishes
- prunes old extension resource files after the consolidation agent completes
and the successful Phase 2 job is recorded
Selection diff behavior:
@@ -118,8 +123,10 @@ Selection diff behavior:
- rows that were previously selected but still exist outside the current top-N
selection are surfaced as `removed`
- before the agent starts, local `rollout_summaries/` and `raw_memories.md`
keep the union of the current selection and the previous successful
selection, so removed-thread evidence stays available during forgetting
reflect the current selection so removed-thread summaries show up as
workspace deletions during forgetting; when removals are pending, existing
consolidated artifacts such as `MEMORY.md`, `memory_summary.md`, and
`skills/` are preserved for the agent's targeted cleanup
Watermark behavior:

View File

@@ -1,14 +1,15 @@
use std::path::Path;
pub async fn clear_memory_roots_contents(codex_home: &Path) -> std::io::Result<()> {
for memory_root in [
codex_home.join("memories"),
codex_home.join("memories_extensions"),
] {
clear_memory_root_contents(memory_root.as_path()).await?;
}
let memory_root = codex_home.join("memories");
clear_memory_root_contents(&memory_root).await?;
Ok(())
let legacy_extensions_root = codex_home.join("memories_extensions");
match tokio::fs::symlink_metadata(&legacy_extensions_root).await {
Ok(_) => clear_memory_root_contents(&legacy_extensions_root).await,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err),
}
}
pub(crate) async fn clear_memory_root_contents(memory_root: &Path) -> std::io::Result<()> {

View File

@@ -14,6 +14,7 @@ mod storage;
#[cfg(test)]
mod tests;
pub(crate) mod usage;
mod workspace;
use codex_protocol::openai_models::ReasoningEffort;
@@ -25,7 +26,7 @@ pub use control::clear_memory_roots_contents;
pub(crate) use start::start_memories_startup_task;
mod artifacts {
pub(super) const EXTENSIONS_SUBDIR: &str = "memories_extensions";
pub(super) const EXTENSIONS_SUBDIR: &str = "extensions";
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
}
@@ -111,7 +112,7 @@ fn rollout_summaries_dir(root: &Path) -> PathBuf {
}
fn memory_extensions_root(root: &Path) -> PathBuf {
root.with_file_name(artifacts::EXTENSIONS_SUBDIR)
root.join(artifacts::EXTENSIONS_SUBDIR)
}
fn raw_memories_file(root: &Path) -> PathBuf {

View File

@@ -1,16 +1,20 @@
use crate::agent::AgentStatus;
use crate::agent::status::is_final as is_final_agent_status;
use crate::config::Config;
use crate::memories::extensions::PendingExtensionResourceRemoval;
use crate::memories::extensions::find_old_extension_resources;
use crate::memories::extensions::remove_extension_resources;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::EmptyArtifactCleanup;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::rollout_summary_file_stem;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use crate::memories::workspace::commit_all;
use crate::memories::workspace::has_changes;
use crate::memories::workspace::prepare_git_repo;
use crate::memories::workspace::remove_workspace_diff;
use crate::memories::workspace::write_workspace_diff;
use crate::session::emit_subagent_session_started;
use crate::session::session::Session;
use codex_config::Constrained;
@@ -24,9 +28,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use codex_state::Stage1Output;
use codex_state::StateRuntime;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
@@ -73,6 +75,12 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
}
};
if let Err(err) = prepare_git_repo(&root).await {
tracing::error!("failed preparing memory workspace: {err}");
job::failed(session, db, &claim, "failed_prepare_workspace").await;
return;
}
// 2. Get the config for the agent
let Some(agent_config) = agent::get_config(config.clone()) else {
// If we can't get the config, we can't consolidate.
@@ -94,15 +102,24 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
}
};
let raw_memories = selection.selected.to_vec();
let artifact_memories = artifact_memories_for_phase2(&selection);
let artifact_memories = selection.selected.clone();
let empty_artifact_cleanup = if selection.removed.is_empty() {
EmptyArtifactCleanup::RemoveConsolidated
} else {
EmptyArtifactCleanup::PreserveConsolidated
};
let new_watermark = get_watermark(claim.watermark, &raw_memories);
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
// step 3
// [`rollout_summaries/`]
if let Err(err) =
sync_rollout_summaries_from_memories(&root, &artifact_memories, artifact_memories.len())
.await
if let Err(err) = sync_rollout_summaries_from_memories(
&root,
&artifact_memories,
artifact_memories.len(),
empty_artifact_cleanup,
)
.await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_sync_artifacts").await;
@@ -118,26 +135,40 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
return;
}
let pending_extension_resource_removals = find_old_extension_resources(&root).await;
let removed_extension_resources = pending_extension_resource_removals
.iter()
.map(|resource| resource.removed.clone())
.collect::<Vec<_>>();
if raw_memories.is_empty() && pending_extension_resource_removals.is_empty() {
remove_extension_resources(&pending_extension_resource_removals).await;
let workspace_has_changes = match has_changes(&root).await {
Ok(has_changes) => has_changes,
Err(err) => {
tracing::error!("failed checking memory workspace changes: {err}");
job::failed(session, db, &claim, "failed_workspace_status").await;
return;
}
};
if !workspace_has_changes {
if let Err(err) = remove_workspace_diff(&root).await {
tracing::warn!("failed removing stale memory workspace diff file: {err}");
}
// We check only after sync of the file system.
job::succeed(
session,
db,
&claim,
new_watermark,
&[],
&raw_memories,
"succeeded_no_input",
)
.await;
return;
}
if let Err(err) = write_workspace_diff(&root).await {
tracing::error!("failed writing memory workspace diff file: {err}");
job::failed(session, db, &claim, "failed_workspace_diff_file").await;
return;
}
// 5. Spawn the agent
let prompt = agent::get_prompt(config, &selection, &removed_extension_resources);
let prompt = agent::get_prompt(config);
let source = SessionSource::SubAgent(SubAgentSource::MemoryConsolidation);
let agent_control = session.services.agent_control.detached_registry();
let thread_id = match agent_control
@@ -179,7 +210,7 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
claim,
new_watermark,
raw_memories.clone(),
pending_extension_resource_removals,
root,
thread_id,
agent_control,
phase_two_e2e_timer,
@@ -192,22 +223,6 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
emit_metrics(session, counters);
}
fn artifact_memories_for_phase2(
selection: &codex_state::Phase2InputSelection,
) -> Vec<Stage1Output> {
let mut seen = HashSet::new();
let mut memories = selection.selected.clone();
for memory in &selection.selected {
seen.insert(rollout_summary_file_stem(memory));
}
for memory in &selection.previous_selected {
if seen.insert(rollout_summary_file_stem(memory)) {
memories.push(memory.clone());
}
}
memories
}
mod job {
use super::*;
@@ -348,13 +363,9 @@ mod agent {
Some(agent_config)
}
pub(super) fn get_prompt(
config: Arc<Config>,
selection: &codex_state::Phase2InputSelection,
removed_extension_resources: &[crate::memories::extensions::RemovedExtensionResource],
) -> Vec<UserInput> {
pub(super) fn get_prompt(config: Arc<Config>) -> Vec<UserInput> {
let root = memory_root(&config.codex_home);
let prompt = build_consolidation_prompt(&root, selection, removed_extension_resources);
let prompt = build_consolidation_prompt(&root);
vec![UserInput::Text {
text: prompt,
text_elements: vec![],
@@ -368,7 +379,7 @@ mod agent {
claim: Claim,
new_watermark: i64,
selected_outputs: Vec<codex_state::Stage1Output>,
pending_extension_resource_removals: Vec<PendingExtensionResourceRemoval>,
memory_root: codex_utils_absolute_path::AbsolutePathBuf,
thread_id: ThreadId,
agent_control: crate::agent::AgentControl,
phase_two_e2e_timer: Option<codex_otel::Timer>,
@@ -405,7 +416,32 @@ mod agent {
if let Some(token_usage) = agent_control.get_total_token_usage(thread_id).await {
emit_token_usage_metrics(&session, &token_usage);
}
if job::succeed(
// Do not commit if we lost the lock
match db
.heartbeat_global_phase2_job(&claim.token, phase_two::JOB_LEASE_SECONDS)
.await
{
Ok(true) => {}
Ok(false) => {
tracing::error!(
"lost global memory consolidation ownership before committing workspace"
);
return;
}
Err(err) => {
tracing::error!(
"failed confirming global memory consolidation ownership before committing workspace: {err}"
);
job::failed(&session, &db, &claim, "failed_confirm_ownership").await;
return;
}
}
if let Err(err) = commit_all(&memory_root).await {
tracing::error!("failed committing memory workspace: {err}");
job::failed(&session, &db, &claim, "failed_workspace_commit").await;
return;
}
if !job::succeed(
&session,
&db,
&claim,
@@ -415,7 +451,9 @@ mod agent {
)
.await
{
remove_extension_resources(&pending_extension_resource_removals).await;
tracing::error!(
"failed marking global memory consolidation job succeeded after committing workspace"
);
}
} else {
job::failed(&session, &db, &claim, "failed_agent").await;

View File

@@ -1,18 +1,12 @@
use crate::memories::extensions::EXTENSION_RESOURCE_RETENTION_DAYS;
use crate::memories::extensions::RemovedExtensionResource;
use crate::memories::memory_extensions_root;
use crate::memories::memory_root;
use crate::memories::phase_one;
use crate::memories::storage::rollout_summary_file_stem_from_parts;
use crate::memories::workspace::WORKSPACE_DIFF_FILENAME;
use codex_protocol::openai_models::ModelInfo;
use codex_state::Phase2InputSelection;
use codex_state::Stage1Output;
use codex_state::Stage1OutputRef;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
use codex_utils_template::Template;
use std::fmt::Write as _;
use std::path::Path;
use std::sync::LazyLock;
use tokio::fs;
@@ -65,9 +59,9 @@ Memory extensions (under {{ memory_extensions_root }}/):
source.
If the user has any memory extensions, you MUST read the instructions for each extension to
determine how to use the memory source. If the Phase 2 diff lists removed memory extension
resources, use that extension-specific deletion diff to remove stale memories derived only from
those resources. If it has no extension folders, continue with the standard memory inputs only.
determine how to use the memory source. If the workspace diff shows deleted extension resource files,
remove stale memories derived only from those resources. If it has no extension folders, continue
with the standard memory inputs only.
"#;
const MEMORY_EXTENSIONS_PRIMARY_INPUTS: &str = r#"
@@ -78,20 +72,17 @@ Under `{{ memory_extensions_root }}/`:
- If extension folders exist, read each instructions.md first and follow it when interpreting
that extension's memory source.
If the Phase 2 diff lists removed memory extension resources, use that extension-specific deletion
diff to remove stale memories derived only from those resources.
If the workspace diff shows deleted memory extension resources, use that extension-specific deletion
signal to remove stale memories derived only from those resources.
"#;
/// Builds the consolidation subagent prompt for a specific memory root.
pub(super) fn build_consolidation_prompt(
memory_root: &Path,
selection: &Phase2InputSelection,
removed_extension_resources: &[RemovedExtensionResource],
) -> String {
pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
let memory_extensions_root = memory_extensions_root(memory_root);
let memory_extensions_exist = memory_extensions_root.is_dir();
let memory_root = memory_root.display().to_string();
let memory_extensions_root = memory_extensions_root.display().to_string();
let phase2_workspace_diff_file = WORKSPACE_DIFF_FILENAME.to_string();
let memory_extensions_folder_structure = if memory_extensions_exist {
render_memory_extensions_block(
&MEMORY_EXTENSIONS_FOLDER_STRUCTURE_TEMPLATE,
@@ -108,8 +99,6 @@ pub(super) fn build_consolidation_prompt(
} else {
String::new()
};
let phase2_input_selection =
render_phase2_input_selection(selection, removed_extension_resources);
CONSOLIDATION_PROMPT_TEMPLATE
.render([
("memory_root", memory_root.as_str()),
@@ -121,12 +110,15 @@ pub(super) fn build_consolidation_prompt(
"memory_extensions_primary_inputs",
memory_extensions_primary_inputs.as_str(),
),
("phase2_input_selection", phase2_input_selection.as_str()),
(
"phase2_workspace_diff_file",
phase2_workspace_diff_file.as_str(),
),
])
.unwrap_or_else(|err| {
warn!("failed to render memories consolidation prompt template: {err}");
format!(
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\n{phase2_input_selection}"
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\nRead {phase2_workspace_diff_file} first."
)
})
}
@@ -140,94 +132,6 @@ fn render_memory_extensions_block(template: &Template, memory_extensions_root: &
})
}
fn render_phase2_input_selection(
selection: &Phase2InputSelection,
removed_extension_resources: &[RemovedExtensionResource],
) -> String {
let retained = selection.retained_thread_ids.len();
let added = selection.selected.len().saturating_sub(retained);
let selected = if selection.selected.is_empty() {
"- none".to_string()
} else {
selection
.selected
.iter()
.map(|item| {
render_selected_input_line(
item,
selection.retained_thread_ids.contains(&item.thread_id),
)
})
.collect::<Vec<_>>()
.join("\n")
};
let removed = if selection.removed.is_empty() {
"- none".to_string()
} else {
selection
.removed
.iter()
.map(render_removed_input_line)
.collect::<Vec<_>>()
.join("\n")
};
let mut rendered = format!(
"- selected inputs this run: {}\n- newly added since the last successful Phase 2 run: {added}\n- retained from the last successful Phase 2 run: {retained}\n- removed from the last successful Phase 2 run: {}\n\nCurrent selected Phase 1 inputs:\n{selected}\n\nRemoved from the last successful Phase 2 selection:\n{removed}\n",
selection.selected.len(),
selection.removed.len(),
);
if !removed_extension_resources.is_empty() {
rendered.push_str("\nMemory extension resources removed by retention pruning:\n");
let _ = writeln!(
rendered,
"- retention window: {EXTENSION_RESOURCE_RETENTION_DAYS} days"
);
let mut current_extension = "";
for removed_resource in removed_extension_resources {
if removed_resource.extension != current_extension {
current_extension = &removed_resource.extension;
let _ = writeln!(rendered, "- extension: {current_extension}");
}
let _ = writeln!(rendered, " - {}", removed_resource.resource_path);
}
}
rendered
}
fn render_selected_input_line(item: &Stage1Output, retained: bool) -> String {
let status = if retained { "retained" } else { "added" };
let rollout_summary_file = format!(
"rollout_summaries/{}.md",
rollout_summary_file_stem_from_parts(
item.thread_id,
item.source_updated_at,
item.rollout_slug.as_deref(),
)
);
format!(
"- [{status}] thread_id={}, rollout_summary_file={rollout_summary_file}",
item.thread_id
)
}
fn render_removed_input_line(item: &Stage1OutputRef) -> String {
let rollout_summary_file = format!(
"rollout_summaries/{}.md",
rollout_summary_file_stem_from_parts(
item.thread_id,
item.source_updated_at,
item.rollout_slug.as_deref(),
)
);
format!(
"- thread_id={}, rollout_summary_file={rollout_summary_file}",
item.thread_id
)
}
/// Builds the stage-1 user message containing rollout metadata and content.
///
/// Large rollout payloads are truncated to 70% of the active model's effective

View File

@@ -1,7 +1,5 @@
use super::*;
use crate::memories::extensions::RemovedExtensionResource;
use codex_models_manager::model_info::model_info_from_slug;
use codex_state::Phase2InputSelection;
use core_test_support::PathExt;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
@@ -58,33 +56,17 @@ fn build_stage_one_input_message_uses_default_limit_when_model_context_window_mi
}
#[test]
fn build_consolidation_prompt_includes_removed_extension_resources() {
fn build_consolidation_prompt_includes_workspace_changes() {
let temp = tempdir().unwrap();
let memory_root = temp.path().join("memories");
std::fs::create_dir_all(temp.path().join("memories_extensions")).unwrap();
let removed_extension_resources = vec![
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-06T11-59-59-abcd-10min-old.md".to_string(),
},
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-07T12-00-00-abcd-10min-cutoff.md".to_string(),
},
];
std::fs::create_dir_all(memory_root.join("extensions")).unwrap();
let prompt = build_consolidation_prompt(
&memory_root,
&Phase2InputSelection::default(),
&removed_extension_resources,
);
let prompt = build_consolidation_prompt(&memory_root);
assert!(prompt.contains("Memory extension resources removed by retention pruning:"));
assert!(prompt.contains("- retention window: 7 days"));
assert!(prompt.contains("- extension: chronicle"));
assert!(prompt.contains(" - resources/2026-04-06T11-59-59-abcd-10min-old.md"));
assert!(prompt.contains(" - resources/2026-04-07T12-00-00-abcd-10min-cutoff.md"));
assert!(prompt.contains("extension-specific deletion diff"));
assert!(prompt.contains("Memory workspace diff:"));
assert!(prompt.contains("phase2_workspace_diff.md"));
assert!(prompt.contains("previous successful Phase 2 commit"));
assert!(prompt.contains("extension-specific deletion"));
}
#[tokio::test]

View File

@@ -9,6 +9,12 @@ use crate::memories::ensure_layout;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum EmptyArtifactCleanup {
RemoveConsolidated,
PreserveConsolidated,
}
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
pub(super) async fn rebuild_raw_memories_file_from_memories(
root: &Path,
@@ -24,6 +30,7 @@ pub(super) async fn sync_rollout_summaries_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_consolidation: usize,
empty_artifact_cleanup: EmptyArtifactCleanup,
) -> std::io::Result<()> {
ensure_layout(root).await?;
@@ -38,7 +45,7 @@ pub(super) async fn sync_rollout_summaries_from_memories(
write_rollout_summary_for_thread(root, memory).await?;
}
if retained.is_empty() {
if retained.is_empty() && empty_artifact_cleanup == EmptyArtifactCleanup::RemoveConsolidated {
for file_name in ["MEMORY.md", "memory_summary.md"] {
let path = root.join(file_name);
if let Err(err) = tokio::fs::remove_file(path).await

View File

@@ -1,4 +1,6 @@
use super::control::clear_memory_root_contents;
use super::control::clear_memory_roots_contents;
use super::storage::EmptyArtifactCleanup;
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use crate::memories::ensure_layout;
@@ -102,6 +104,59 @@ async fn clear_memory_root_contents_preserves_root_directory() {
);
}
#[tokio::test]
async fn clear_memory_roots_contents_clears_legacy_extensions_root() {
let dir = tempdir().expect("tempdir");
let memory_root = dir.path().join("memories");
let new_extensions_root = memory_root.join("extensions");
let legacy_extensions_root = dir.path().join("memories_extensions");
tokio::fs::create_dir_all(&new_extensions_root)
.await
.expect("create new extensions dir");
tokio::fs::create_dir_all(&legacy_extensions_root)
.await
.expect("create legacy extensions dir");
tokio::fs::write(
new_extensions_root.join("stale.md"),
"new stale extension\n",
)
.await
.expect("write new extension artifact");
tokio::fs::write(
legacy_extensions_root.join("stale.md"),
"legacy stale extension\n",
)
.await
.expect("write legacy extension artifact");
clear_memory_roots_contents(dir.path())
.await
.expect("clear memory roots contents");
let mut memory_entries = tokio::fs::read_dir(&memory_root)
.await
.expect("read memory root after clear");
assert!(
memory_entries
.next_entry()
.await
.expect("read memory root entry")
.is_none(),
"memory root should be empty after clearing contents"
);
let mut legacy_entries = tokio::fs::read_dir(&legacy_extensions_root)
.await
.expect("read legacy extensions root after clear");
assert!(
legacy_entries
.next_entry()
.await
.expect("read legacy extensions root entry")
.is_none(),
"legacy extensions root should be empty after clearing contents"
);
}
#[cfg(unix)]
#[tokio::test]
async fn clear_memory_root_contents_rejects_symlinked_root() {
@@ -163,6 +218,7 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync rollout summaries");
@@ -236,6 +292,70 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
assert!(rollout_path_pos < file_pos);
}
#[tokio::test]
async fn sync_rollout_summaries_preserves_consolidated_artifacts_for_removed_inputs() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
let summaries_dir = rollout_summaries_dir(&root);
let skills_dir = root.join("skills/demo");
tokio::fs::create_dir_all(&summaries_dir)
.await
.expect("create rollout summaries dir");
tokio::fs::create_dir_all(&skills_dir)
.await
.expect("create skills dir");
let stale_summary_path = summaries_dir.join("stale.md");
let memory_path = root.join("MEMORY.md");
let memory_summary_path = root.join("memory_summary.md");
let skill_path = skills_dir.join("SKILL.md");
tokio::fs::write(&stale_summary_path, "stale rollout summary\n")
.await
.expect("write stale rollout summary");
tokio::fs::write(&memory_path, "consolidated memory\n")
.await
.expect("write memory");
tokio::fs::write(&memory_summary_path, "summary\n")
.await
.expect("write memory summary");
tokio::fs::write(&skill_path, "skill\n")
.await
.expect("write skill");
sync_rollout_summaries_from_memories(
&root,
&[],
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::PreserveConsolidated,
)
.await
.expect("sync rollout summaries");
assert!(
!tokio::fs::try_exists(&stale_summary_path)
.await
.expect("check stale summary path"),
"removed-only sync should still prune stale rollout summaries"
);
assert!(
tokio::fs::try_exists(&memory_path)
.await
.expect("check memory path"),
"removed-only sync should preserve MEMORY.md for surgical cleanup"
);
assert!(
tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary path"),
"removed-only sync should preserve memory_summary.md for surgical cleanup"
);
assert!(
tokio::fs::try_exists(&skill_path)
.await
.expect("check skill path"),
"removed-only sync should preserve skills for surgical cleanup"
);
}
#[tokio::test]
async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename() {
let dir = tempdir().expect("tempdir");
@@ -269,6 +389,7 @@ async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync rollout summaries");
@@ -372,6 +493,7 @@ task_outcome: success
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync rollout summaries");
@@ -422,6 +544,10 @@ mod phase2 {
use crate::memories::phase2;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use crate::memories::storage::EmptyArtifactCleanup;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use crate::memories::workspace::prepare_git_repo;
use crate::session::session::Session;
use crate::session::tests::make_session_and_context;
use chrono::Duration as ChronoDuration;
@@ -508,7 +634,7 @@ mod phase2 {
}
}
async fn seed_stage1_output(&self, source_updated_at: i64) {
async fn seed_stage1_output(&self, source_updated_at: i64) -> ThreadId {
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
@@ -557,6 +683,7 @@ mod phase2 {
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
thread_id
}
async fn shutdown_threads(&self) {
@@ -776,7 +903,7 @@ mod phase2 {
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
async fn dispatch_with_empty_stage1_outputs_spawns_for_workspace_changes() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
let summaries_dir = rollout_summaries_dir(&root);
@@ -858,15 +985,162 @@ mod phase2 {
.state_db
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global job after empty consolidation success");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
.expect("claim global job after empty consolidation dispatch");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedRunning);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
pretty_assertions::assert_eq!(thread_ids.len(), 1);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_removed_only_inputs_preserves_consolidated_artifacts() {
let harness = DispatchHarness::new().await;
let source_updated_at = Utc::now().timestamp();
let thread_id = harness.seed_stage1_output(source_updated_at).await;
let root = memory_root(&harness.config.codex_home);
let selection = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection");
let selected = selection.selected.clone();
sync_rollout_summaries_from_memories(
&root,
&selected,
selected.len(),
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync selected rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected raw memories");
let memory_index_path = root.join("MEMORY.md");
let memory_summary_path = root.join("memory_summary.md");
let skill_path = root.join("skills/demo/SKILL.md");
tokio::fs::write(&memory_index_path, "consolidated memory index\n")
.await
.expect("write memory index");
tokio::fs::write(&memory_summary_path, "consolidated memory summary\n")
.await
.expect("write memory summary");
tokio::fs::create_dir_all(skill_path.parent().expect("skill parent"))
.await
.expect("create skill dir");
tokio::fs::write(&skill_path, "consolidated skill\n")
.await
.expect("write skill");
prepare_git_repo(&root)
.await
.expect("commit current memory workspace as baseline");
let claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global phase2 job");
let Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} = claim
else {
panic!("unexpected phase2 claim outcome: {claim:?}");
};
assert!(
harness
.state_db
.mark_global_phase2_job_succeeded(&ownership_token, source_updated_at, &selected,)
.await
.expect("mark phase2 succeeded"),
"phase2 success should update selected baseline"
);
assert!(
harness
.state_db
.mark_thread_memory_mode_polluted(thread_id)
.await
.expect("mark thread polluted"),
"polluted selected thread should enqueue phase2 forgetting"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
assert!(
tokio::fs::try_exists(&memory_index_path)
.await
.expect("check memory index existence"),
"removed-only phase2 should preserve MEMORY.md before agent cleanup"
);
assert!(
tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary existence"),
"removed-only phase2 should preserve memory_summary.md before agent cleanup"
);
assert!(
tokio::fs::try_exists(&skill_path)
.await
.expect("check skill existence"),
"removed-only phase2 should preserve skills before agent cleanup"
);
let workspace_diff = tokio::fs::read_to_string(root.join("phase2_workspace_diff.md"))
.await
.expect("read workspace diff");
assert!(
workspace_diff.contains("- D rollout_summaries/"),
"removed-only phase2 should surface deleted rollout summaries: {workspace_diff}"
);
assert!(
!workspace_diff.contains("- D MEMORY.md"),
"removed-only phase2 should not delete consolidated memory wholesale: {workspace_diff}"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_clean_workspace_preserves_selected_phase2_baseline() {
let harness = DispatchHarness::new().await;
let thread_id = harness.seed_stage1_output(Utc::now().timestamp()).await;
let root = memory_root(&harness.config.codex_home);
let selection = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection");
let selected = selection.selected.clone();
sync_rollout_summaries_from_memories(
&root,
&selected,
selected.len(),
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync selected rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected raw memories");
prepare_git_repo(&root)
.await
.expect("commit current memory workspace as baseline");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let selection = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection after clean workspace success");
pretty_assertions::assert_eq!(selection.selected.len(), 1);
pretty_assertions::assert_eq!(selection.selected[0].thread_id, thread_id);
pretty_assertions::assert_eq!(selection.retained_thread_ids, vec![thread_id]);
pretty_assertions::assert_eq!(selection.removed, Vec::new());
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_sandbox_policy_cannot_be_overridden() {
let harness = DispatchHarness::new().await;
@@ -1007,14 +1281,14 @@ mod phase2 {
let chronicle_resources = config
.codex_home
.join("memories_extensions/chronicle/resources");
.join("memories/extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources)
.await
.expect("create chronicle resources");
tokio::fs::write(
config
.codex_home
.join("memories_extensions/chronicle/instructions.md"),
.join("memories/extensions/chronicle/instructions.md"),
"instructions",
)
.await
@@ -1039,10 +1313,10 @@ mod phase2 {
"spawn failures should leave the job in retry backoff instead of running"
);
assert!(
tokio::fs::try_exists(&old_file)
!tokio::fs::try_exists(&old_file)
.await
.expect("check old extension resource"),
"spawn failures should not prune extension resources before retry"
"phase2 should prune old extension resources before spawn"
);
}
}

View File

@@ -0,0 +1,615 @@
use anyhow::Context;
use gix::hash::ObjectId;
use gix::objs::Tree;
use gix::objs::tree::Entry;
use gix::objs::tree::EntryKind;
use gix::objs::tree::EntryMode;
use similar::TextDiff;
use std::collections::BTreeMap;
use std::ffi::OsStr;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use tokio::task;
/// Generated diff file the Phase 2 consolidation agent reads before editing memories.
pub(super) const WORKSPACE_DIFF_FILENAME: &str = "phase2_workspace_diff.md";
const GITIGNORE_FILENAME: &str = ".gitignore";
const INITIAL_COMMIT_MESSAGE: &str =
"Initialize Codex memories git state\n\nCo-authored-by: Codex <noreply@openai.com>";
const GITIGNORE_COMMIT_MESSAGE: &str =
"Ignore generated Codex memories diff\n\nCo-authored-by: Codex <noreply@openai.com>";
const CONSOLIDATION_COMMIT_MESSAGE: &str =
"Consolidate Codex memories\n\nCo-authored-by: Codex <noreply@openai.com>";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct MemoryWorkspaceFileEntry {
oid: ObjectId,
mode: EntryMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WorkspaceChangeStatus {
Added,
Modified,
Deleted,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct WorkspaceChange {
status: WorkspaceChangeStatus,
path: String,
}
/// Creates the memory root, initializes its git repository, and keeps the generated diff file in
/// `.gitignore`.
///
/// This commits an initial baseline when the repository has no `HEAD` yet. For existing clean
/// repositories that predate the generated diff file, it commits the `.gitignore` update by itself
/// so that internal housekeeping does not wake the consolidation agent.
pub(super) async fn prepare_git_repo(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
fs::create_dir_all(&root)
.with_context(|| format!("create memories root {}", root.display()))?;
let repo = open_or_init(&root)?;
let gitignore_changed = ensure_gitignore_ignores_workspace_diff(&root)?;
if repo.head_id().is_err() && has_workspace_files(&root)? {
commit_current_tree(&repo, INITIAL_COMMIT_MESSAGE)?;
} else if gitignore_changed && only_gitignore_changed(&repo, &root)? {
commit_current_tree(&repo, GITIGNORE_COMMIT_MESSAGE)?;
}
anyhow::Ok(())
})
.await?
}
/// Returns true when the memory root differs from the current git `HEAD` tree.
pub(super) async fn has_changes(root: &Path) -> anyhow::Result<bool> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
let repo = open_or_init(&root)?;
has_changes_blocking(&repo, &root)
})
.await?
}
/// Writes `phase2_workspace_diff.md` with a git-style diff from `HEAD` to the current worktree.
pub(super) async fn write_workspace_diff(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
let repo = open_or_init(&root)?;
let head_entries = head_file_entries(&repo)?;
let current_entries = current_file_entries(&repo, &root)?;
let changes = diff_entries(&head_entries, &current_entries);
let content =
render_workspace_diff_file(&repo, &root, &head_entries, &current_entries, &changes)?;
let path = root.join(WORKSPACE_DIFF_FILENAME);
fs::write(&path, content)
.with_context(|| format!("write memory workspace diff file {}", path.display()))?;
anyhow::Ok(())
})
.await?
}
/// Commits the current memory root as the next normal git commit when it differs from `HEAD`.
pub(super) async fn commit_all(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
let repo = open_or_init(&root)?;
commit_current_tree(&repo, CONSOLIDATION_COMMIT_MESSAGE)?;
anyhow::Ok(())
})
.await?
}
/// Removes the generated workspace diff file when no consolidation agent needs it.
pub(super) async fn remove_workspace_diff(root: &Path) -> anyhow::Result<()> {
let path = root.join(WORKSPACE_DIFF_FILENAME);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err)
.with_context(|| format!("remove memory workspace diff file {}", path.display())),
}
}
fn open_or_init(root: &Path) -> anyhow::Result<gix::Repository> {
if root.join(".git").is_dir() {
gix::open(root).with_context(|| format!("open memories git repo {}", root.display()))
} else {
gix::init(root).with_context(|| format!("init memories git repo {}", root.display()))
}
}
fn ensure_gitignore_ignores_workspace_diff(root: &Path) -> anyhow::Result<bool> {
let path = root.join(GITIGNORE_FILENAME);
let existing = match fs::read_to_string(&path) {
Ok(existing) => existing,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => String::new(),
Err(err) => return Err(err).with_context(|| format!("read {}", path.display())),
};
if existing
.lines()
.any(|line| line.trim() == WORKSPACE_DIFF_FILENAME)
{
return Ok(false);
}
let mut updated = existing;
if !updated.is_empty() && !updated.ends_with('\n') {
updated.push('\n');
}
updated.push_str(WORKSPACE_DIFF_FILENAME);
updated.push('\n');
fs::write(&path, updated).with_context(|| format!("write {}", path.display()))?;
Ok(true)
}
fn has_workspace_files(root: &Path) -> anyhow::Result<bool> {
for entry in fs::read_dir(root).with_context(|| format!("read {}", root.display()))? {
let entry = entry?;
if entry.file_name() != OsStr::new(".git")
&& !should_ignore_workspace_path(root, &entry.path())
{
return Ok(true);
}
}
Ok(false)
}
fn has_changes_blocking(repo: &gix::Repository, root: &Path) -> anyhow::Result<bool> {
let head_entries = head_file_entries(repo)?;
let current_entries = current_file_entries(repo, root)?;
Ok(head_entries != current_entries)
}
fn only_gitignore_changed(repo: &gix::Repository, root: &Path) -> anyhow::Result<bool> {
let head_entries = head_file_entries(repo)?;
let current_entries = current_file_entries(repo, root)?;
let changes = diff_entries(&head_entries, &current_entries);
Ok(!changes.is_empty()
&& changes
.iter()
.all(|change| change.path == GITIGNORE_FILENAME))
}
fn commit_current_tree(repo: &gix::Repository, message: &str) -> anyhow::Result<bool> {
let root = repo
.workdir()
.context("memories git repo must have a worktree")?;
let tree_id = write_tree(repo, root, root)?;
let parent = repo.head_id().ok().map(gix::Id::detach);
if let Some(parent) = parent {
let parent_tree = repo
.find_commit(parent)
.context("find memories HEAD commit")?
.tree_id()
.context("load memories HEAD tree id")?
.detach();
if parent_tree == tree_id {
return Ok(false);
}
}
let signature = codex_signature();
let mut time = gix::date::parse::TimeBuf::default();
let signature_ref = signature.to_ref(&mut time);
let parents = parent.into_iter().collect::<Vec<_>>();
repo.commit_as(
signature_ref,
signature_ref,
"HEAD",
message,
tree_id,
parents,
)
.context("commit memories git repo")?;
Ok(true)
}
fn codex_signature() -> gix::actor::Signature {
gix::actor::Signature {
name: "Codex".into(),
email: "noreply@openai.com".into(),
time: gix::date::Time {
seconds: chrono::Utc::now().timestamp(),
offset: 0,
},
}
}
fn write_tree(repo: &gix::Repository, root: &Path, dir: &Path) -> anyhow::Result<ObjectId> {
let mut entries = Vec::new();
for entry in fs::read_dir(dir).with_context(|| format!("read {}", dir.display()))? {
let entry = entry?;
let path = entry.path();
let file_name = entry.file_name();
if file_name == OsStr::new(".git") || should_ignore_workspace_path(root, &path) {
continue;
}
let file_type = entry.file_type()?;
if file_type.is_dir() {
let oid = write_tree(repo, root, &path)?;
let tree = repo
.find_tree(oid)
.with_context(|| format!("load tree {}", path.display()))?;
if tree.decode()?.entries.is_empty() {
continue;
}
entries.push(Entry {
mode: EntryKind::Tree.into(),
filename: os_str_to_bstring(&file_name),
oid,
});
} else if file_type.is_file() {
let bytes = fs::read(&path).with_context(|| format!("read {}", path.display()))?;
let oid = repo
.write_blob(bytes)
.with_context(|| format!("write blob {}", path.display()))?
.detach();
entries.push(Entry {
mode: file_mode(&path, EntryKind::Blob)?,
filename: os_str_to_bstring(&file_name),
oid,
});
} else if file_type.is_symlink() {
let target =
fs::read_link(&path).with_context(|| format!("read symlink {}", path.display()))?;
let oid = repo
.write_blob(path_to_bytes(&target))
.with_context(|| format!("write symlink blob {}", path.display()))?
.detach();
entries.push(Entry {
mode: EntryKind::Link.into(),
filename: os_str_to_bstring(&file_name),
oid,
});
}
}
entries.sort();
repo.write_object(&Tree { entries })
.context("write tree object")
.map(gix::Id::detach)
}
fn head_file_entries(
repo: &gix::Repository,
) -> anyhow::Result<BTreeMap<String, MemoryWorkspaceFileEntry>> {
let Ok(tree_id) = repo.head_tree_id() else {
return Ok(BTreeMap::new());
};
let tree = repo.find_tree(tree_id.detach()).context("load HEAD tree")?;
let mut entries = BTreeMap::new();
collect_tree_entries(repo, tree, PathBuf::new(), &mut entries)?;
Ok(entries)
}
fn collect_tree_entries(
repo: &gix::Repository,
tree: gix::Tree<'_>,
prefix: PathBuf,
entries: &mut BTreeMap<String, MemoryWorkspaceFileEntry>,
) -> anyhow::Result<()> {
for entry in tree.iter() {
let entry = entry?;
let file_name = bstr_to_path(entry.inner.filename);
let path = prefix.join(file_name);
if entry.inner.mode.is_tree() {
let tree = repo
.find_tree(entry.inner.oid.to_owned())
.context("load child tree")?;
collect_tree_entries(repo, tree, path, entries)?;
} else {
entries.insert(
path_to_slash_string(&path),
MemoryWorkspaceFileEntry {
oid: entry.inner.oid.to_owned(),
mode: entry.inner.mode,
},
);
}
}
Ok(())
}
fn current_file_entries(
repo: &gix::Repository,
root: &Path,
) -> anyhow::Result<BTreeMap<String, MemoryWorkspaceFileEntry>> {
let mut entries = BTreeMap::new();
collect_current_entries(repo, root, root, &mut entries)?;
Ok(entries)
}
fn collect_current_entries(
repo: &gix::Repository,
root: &Path,
dir: &Path,
entries: &mut BTreeMap<String, MemoryWorkspaceFileEntry>,
) -> anyhow::Result<()> {
for entry in fs::read_dir(dir).with_context(|| format!("read {}", dir.display()))? {
let entry = entry?;
let path = entry.path();
if path.file_name() == Some(OsStr::new(".git")) || should_ignore_workspace_path(root, &path)
{
continue;
}
let file_type = entry.file_type()?;
if file_type.is_dir() {
collect_current_entries(repo, root, &path, entries)?;
} else if file_type.is_file() {
let bytes = fs::read(&path).with_context(|| format!("read {}", path.display()))?;
entries.insert(
relative_slash_path(root, &path)?,
MemoryWorkspaceFileEntry {
oid: blob_oid(repo, &bytes)?,
mode: file_mode(&path, EntryKind::Blob)?,
},
);
} else if file_type.is_symlink() {
let target =
fs::read_link(&path).with_context(|| format!("read symlink {}", path.display()))?;
entries.insert(
relative_slash_path(root, &path)?,
MemoryWorkspaceFileEntry {
oid: blob_oid(repo, &path_to_bytes(&target))?,
mode: EntryKind::Link.into(),
},
);
}
}
Ok(())
}
fn blob_oid(repo: &gix::Repository, bytes: &[u8]) -> anyhow::Result<ObjectId> {
gix::objs::compute_hash(repo.object_hash(), gix::objs::Kind::Blob, bytes)
.context("compute memory workspace blob oid")
}
fn diff_entries(
head: &BTreeMap<String, MemoryWorkspaceFileEntry>,
current: &BTreeMap<String, MemoryWorkspaceFileEntry>,
) -> Vec<WorkspaceChange> {
let mut entries = Vec::new();
for (path, entry) in current {
match head.get(path) {
None => entries.push(WorkspaceChange {
status: WorkspaceChangeStatus::Added,
path: path.clone(),
}),
Some(head_entry) if head_entry != entry => entries.push(WorkspaceChange {
status: WorkspaceChangeStatus::Modified,
path: path.clone(),
}),
Some(_) => {}
}
}
for path in head.keys() {
if !current.contains_key(path) {
entries.push(WorkspaceChange {
status: WorkspaceChangeStatus::Deleted,
path: path.clone(),
});
}
}
entries.sort_by(|left, right| left.path.cmp(&right.path));
entries
}
fn render_workspace_diff_file(
repo: &gix::Repository,
root: &Path,
head_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
current_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
changes: &[WorkspaceChange],
) -> anyhow::Result<String> {
let mut rendered = String::from(
"# Memory Workspace Diff\n\n\
Generated by Codex before Phase 2 memory consolidation. Read this file first and do not edit it.\n\n\
## Status\n",
);
if changes.is_empty() {
rendered.push_str("- none\n");
return Ok(rendered);
}
for change in changes {
let status = workspace_change_status_label(change.status);
rendered.push_str(&format!("- {status} {}\n", change.path));
}
rendered.push_str("\n## Diff\n\n```diff\n");
for change in changes {
rendered.push_str(&render_workspace_change_diff(
repo,
root,
head_entries,
current_entries,
change,
)?);
}
rendered.push_str("```\n");
Ok(rendered)
}
fn render_workspace_change_diff(
repo: &gix::Repository,
root: &Path,
head_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
current_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
change: &WorkspaceChange,
) -> anyhow::Result<String> {
let old_entry = head_entries.get(&change.path);
let new_entry = current_entries.get(&change.path);
let old_bytes = old_entry
.map(|entry| read_head_blob(repo, entry))
.transpose()
.with_context(|| format!("read HEAD content for {}", change.path))?;
let new_bytes = new_entry
.map(|_| read_current_file_bytes(root, &change.path))
.transpose()
.with_context(|| format!("read current content for {}", change.path))?;
let old_text = String::from_utf8_lossy(old_bytes.as_deref().unwrap_or_default());
let new_text = String::from_utf8_lossy(new_bytes.as_deref().unwrap_or_default());
let old_header = if old_bytes.is_some() {
format!("a/{}", change.path)
} else {
"/dev/null".to_string()
};
let new_header = if new_bytes.is_some() {
format!("b/{}", change.path)
} else {
"/dev/null".to_string()
};
let mut section = format!("diff --git a/{0} b/{0}\n", change.path);
match (old_entry, new_entry) {
(None, Some(entry)) => {
section.push_str(&format!("new file mode {}\n", mode_label(entry.mode)));
}
(Some(entry), None) => {
section.push_str(&format!("deleted file mode {}\n", mode_label(entry.mode)));
}
(Some(old), Some(new)) if old.mode != new.mode => {
section.push_str(&format!(
"old mode {}\nnew mode {}\n",
mode_label(old.mode),
mode_label(new.mode)
));
}
(Some(_), Some(_)) => {}
(None, None) => return Ok(String::new()),
}
let diff = TextDiff::from_lines(&old_text, &new_text)
.unified_diff()
.context_radius(3)
.header(&old_header, &new_header)
.to_string();
section.push_str(&diff);
if !section.ends_with('\n') {
section.push('\n');
}
Ok(section)
}
fn read_head_blob(
repo: &gix::Repository,
entry: &MemoryWorkspaceFileEntry,
) -> anyhow::Result<Vec<u8>> {
let mut blob = repo.find_blob(entry.oid)?;
Ok(blob.take_data())
}
fn read_current_file_bytes(root: &Path, relative_path: &str) -> anyhow::Result<Vec<u8>> {
let path = root.join(relative_path);
let metadata =
fs::symlink_metadata(&path).with_context(|| format!("stat {}", path.display()))?;
if metadata.file_type().is_symlink() {
let target =
fs::read_link(&path).with_context(|| format!("read symlink {}", path.display()))?;
Ok(path_to_bytes(&target))
} else {
fs::read(&path).with_context(|| format!("read {}", path.display()))
}
}
fn workspace_change_status_label(status: WorkspaceChangeStatus) -> &'static str {
match status {
WorkspaceChangeStatus::Added => "A",
WorkspaceChangeStatus::Modified => "M",
WorkspaceChangeStatus::Deleted => "D",
}
}
fn mode_label(mode: EntryMode) -> &'static str {
match mode.kind() {
EntryKind::Blob => "100644",
EntryKind::BlobExecutable => "100755",
EntryKind::Link => "120000",
EntryKind::Tree => "040000",
EntryKind::Commit => "160000",
}
}
fn should_ignore_workspace_path(root: &Path, path: &Path) -> bool {
path.strip_prefix(root)
.is_ok_and(|relative| relative == Path::new(WORKSPACE_DIFF_FILENAME))
}
#[cfg(unix)]
fn file_mode(path: &Path, default: EntryKind) -> anyhow::Result<EntryMode> {
use std::os::unix::fs::PermissionsExt;
let mode = fs::metadata(path)?.permissions().mode();
Ok(if mode & 0o111 == 0 {
default.into()
} else {
EntryKind::BlobExecutable.into()
})
}
#[cfg(not(unix))]
fn file_mode(_path: &Path, default: EntryKind) -> anyhow::Result<EntryMode> {
Ok(default.into())
}
#[cfg(unix)]
fn os_str_to_bstring(value: &OsStr) -> gix::bstr::BString {
use std::os::unix::ffi::OsStrExt;
value.as_bytes().into()
}
#[cfg(not(unix))]
fn os_str_to_bstring(value: &OsStr) -> gix::bstr::BString {
value.to_string_lossy().as_bytes().into()
}
#[cfg(unix)]
fn path_to_bytes(path: &Path) -> Vec<u8> {
use std::os::unix::ffi::OsStrExt;
path.as_os_str().as_bytes().to_vec()
}
#[cfg(not(unix))]
fn path_to_bytes(path: &Path) -> Vec<u8> {
path.to_string_lossy().as_bytes().to_vec()
}
fn bstr_to_path(value: &gix::bstr::BStr) -> PathBuf {
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
PathBuf::from(OsStr::from_bytes(value))
}
#[cfg(not(unix))]
{
PathBuf::from(value.to_string())
}
}
fn relative_slash_path(root: &Path, path: &Path) -> anyhow::Result<String> {
path.strip_prefix(root)
.with_context(|| format!("strip {} from {}", root.display(), path.display()))
.map(path_to_slash_string)
}
fn path_to_slash_string(path: &Path) -> String {
path.components()
.map(|component| component.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/")
}
#[cfg(test)]
#[path = "workspace_tests.rs"]
mod tests;

View File

@@ -0,0 +1,187 @@
use super::*;
use pretty_assertions::assert_eq;
use std::fs;
use tempfile::TempDir;
#[tokio::test]
async fn prepare_creates_repo_gitignore_and_initial_commit() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "baseline").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
assert!(memory_root.join(".git").is_dir());
assert_eq!(
fs::read_to_string(memory_root.join(GITIGNORE_FILENAME)).expect("read gitignore"),
format!("{WORKSPACE_DIFF_FILENAME}\n")
);
assert!(!has_changes(&memory_root).await.expect("has changes"));
}
#[tokio::test]
async fn prepare_commits_gitignore_only_change_in_existing_repo() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "baseline").expect("write memory");
let repo = gix::init(&memory_root).expect("init repo");
commit_current_tree(&repo, INITIAL_COMMIT_MESSAGE).expect("commit baseline");
prepare_git_repo(&memory_root).await.expect("prepare repo");
assert!(!has_changes(&memory_root).await.expect("has changes"));
let repo = gix::open(&memory_root).expect("open repo");
assert!(
head_file_entries(&repo)
.expect("head entries")
.contains_key(GITIGNORE_FILENAME)
);
}
#[tokio::test]
async fn writes_diff_and_commits_workspace_changes() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "old").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
fs::write(memory_root.join("MEMORY.md"), "new").expect("update memory");
fs::write(memory_root.join("memory_summary.md"), "summary").expect("write summary");
assert!(has_changes(&memory_root).await.expect("has changes"));
write_workspace_diff(&memory_root)
.await
.expect("write workspace diff file");
let workspace_diff = fs::read_to_string(memory_root.join(WORKSPACE_DIFF_FILENAME))
.expect("read workspace diff file");
assert!(workspace_diff.contains("- M MEMORY.md"));
assert!(workspace_diff.contains("- A memory_summary.md"));
assert!(workspace_diff.contains("diff --git a/MEMORY.md b/MEMORY.md"));
assert!(workspace_diff.contains("-old"));
assert!(workspace_diff.contains("+new"));
assert!(workspace_diff.contains("diff --git a/memory_summary.md b/memory_summary.md"));
assert!(workspace_diff.contains("+summary"));
assert!(
has_changes(&memory_root).await.expect("has changes"),
"generated diff file should not affect workspace status"
);
commit_all(&memory_root).await.expect("commit workspace");
assert!(!has_changes(&memory_root).await.expect("has changes"));
assert!(
memory_root.join(WORKSPACE_DIFF_FILENAME).is_file(),
"generated diff file remains available but ignored after commit"
);
}
#[tokio::test]
async fn remove_workspace_diff_ignores_missing_file() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
remove_workspace_diff(&memory_root)
.await
.expect("remove missing workspace diff");
}
#[tokio::test]
async fn status_scan_does_not_write_added_file_blobs() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
prepare_git_repo(&memory_root).await.expect("prepare repo");
let added_content = b"new uncommitted memory";
fs::write(memory_root.join("MEMORY.md"), added_content).expect("write memory");
assert!(has_changes(&memory_root).await.expect("has changes"));
let repo = gix::open(&memory_root).expect("open repo");
let added_oid = blob_oid(&repo, added_content).expect("compute added oid");
assert!(
repo.find_blob(added_oid).is_err(),
"status scans should hash current files without writing loose git objects"
);
}
#[cfg(unix)]
#[tokio::test]
async fn reports_executable_bit_changes_as_modified() {
use std::os::unix::fs::PermissionsExt;
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
let path = memory_root.join("MEMORY.md");
fs::write(&path, "same content").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
let mut permissions = fs::metadata(&path).expect("stat memory").permissions();
permissions.set_mode(permissions.mode() | 0o111);
fs::set_permissions(&path, permissions).expect("chmod memory");
assert!(has_changes(&memory_root).await.expect("has changes"));
write_workspace_diff(&memory_root)
.await
.expect("write workspace diff file");
let workspace_diff = fs::read_to_string(memory_root.join(WORKSPACE_DIFF_FILENAME))
.expect("read workspace diff file");
assert!(workspace_diff.contains("- M MEMORY.md"));
assert!(workspace_diff.contains("old mode 100644"));
assert!(workspace_diff.contains("new mode 100755"));
}
#[tokio::test]
async fn commit_all_creates_normal_parented_history() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "old").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
let first_head = gix::open(&memory_root)
.expect("open repo")
.head_id()
.expect("first head")
.detach();
fs::write(memory_root.join("MEMORY.md"), "new").expect("update memory");
commit_all(&memory_root).await.expect("commit workspace");
let repo = gix::open(&memory_root).expect("open repo");
let second_head = repo.head_id().expect("second head").detach();
assert_ne!(first_head, second_head);
let second_commit = repo.find_commit(second_head).expect("find second commit");
assert_eq!(
second_commit.parent_ids().collect::<Vec<_>>(),
vec![first_head]
);
}
#[tokio::test]
async fn workspace_diff_file_includes_deleted_head_content() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(memory_root.join("rollout_summaries")).expect("create rollout summaries");
let summary_path = memory_root.join("rollout_summaries/deleted.md");
fs::write(
&summary_path,
"thread_id: 00000000-0000-4000-8000-000000000001\nimportant stale evidence\n",
)
.expect("write rollout summary");
prepare_git_repo(&memory_root).await.expect("prepare repo");
fs::remove_file(&summary_path).expect("delete rollout summary");
write_workspace_diff(&memory_root)
.await
.expect("write workspace diff file");
let workspace_diff = fs::read_to_string(memory_root.join(WORKSPACE_DIFF_FILENAME))
.expect("read workspace diff file");
assert!(workspace_diff.contains("- D rollout_summaries/deleted.md"));
assert!(workspace_diff.contains("deleted file mode 100644"));
assert!(workspace_diff.contains("-thread_id: 00000000-0000-4000-8000-000000000001"));
assert!(workspace_diff.contains("-important stale evidence"));
}

View File

@@ -143,29 +143,31 @@ Mode selection:
- INCREMENTAL UPDATE: existing artifacts already exist and `raw_memories.md`
mostly contains new additions.
Incremental thread diff snapshot (computed before the current artifact sync rewrites local files):
Memory workspace diff:
**Diff since last consolidation:**
{{ phase2_input_selection }}
The folder `{{ memory_root }}/` is a git repository managed by Codex. Read
`{{ phase2_workspace_diff_file }}` in this same folder first. It contains the git-style diff from
the previous successful Phase 2 commit to the current worktree. It is generated by Codex for this
run and is not part of the committed memory artifacts.
Incremental update and forgetting mechanism:
- Use the diff provided
- Use the git-style diff in `{{ phase2_workspace_diff_file }}` to identify relevant changed
sections and deleted inputs.
- Do not open raw sessions / original rollout transcripts.
- For each added thread id, search it in `raw_memories.md`, read that raw-memory section, and
read the corresponding `rollout_summaries/*.md` file only when needed for stronger evidence,
task placement, or conflict resolution.
- For added or modified `raw_memories.md` and `rollout_summaries/*.md` files, read the changed
raw-memory sections and the corresponding rollout summaries only when needed for stronger
evidence, task placement, or conflict resolution.
- When scanning a raw-memory section, read the task-level `Preference signals:` subsections
first, then the rest of the task blocks.
- For each removed thread id, search it in `MEMORY.md` and delete only the memory supported by
that thread. Use `thread_id=<thread_id>` in `### rollout_summary_files` when available; if not,
fall back to rollout summary filenames plus the corresponding `rollout_summaries/*.md` files.
- If a `MEMORY.md` block contains both removed and undeleted threads, do not delete the whole
block. Remove only the removed thread's references and thread-local guidance, preserve shared
or still-supported content, and split or rewrite the block only if needed to keep the undeleted
threads intact.
- For deleted `rollout_summaries/*.md` or `extensions/*/resources/*.md` files, search their
filenames, paths, and thread ids (when present) in `MEMORY.md`. Delete only memory supported
by deleted inputs.
- If a `MEMORY.md` block contains both deleted and still-present evidence, do not delete the whole
block. Remove only stale references and stale local guidance, preserve shared or still-supported
content, and split or rewrite the block only if needed.
- After `MEMORY.md` cleanup is done, revisit `memory_summary.md` and remove or rewrite stale
summary/index content that was only supported by removed thread ids.
summary/index content that was only supported by deleted files.
Outputs:
Under `{{ memory_root }}/`:
@@ -743,26 +745,28 @@ WORKFLOW
3. INCREMENTAL UPDATE behavior:
- Read existing `MEMORY.md` and `memory_summary.md` first for continuity and to locate
existing references that may need surgical cleanup.
- Use the injected thread-diff snapshot as the first routing pass:
- added thread ids = ingestion queue
- removed thread ids = forgetting / stale-cleanup queue
- Use the injected git-style workspace changes as the first routing pass:
- added/modified `raw_memories.md` and `rollout_summaries/*.md` = ingestion queue
- deleted `rollout_summaries/*.md` and `extensions/*/resources/*.md` = forgetting /
stale-cleanup queue
- Build an index of rollout references already present in existing `MEMORY.md` before
scanning raw memories so you can route net-new evidence into the right blocks.
- Work in this order:
1. For newly added thread ids, search them in `raw_memories.md`, read those sections, and
open the corresponding `rollout_summaries/*.md` files when necessary.
1. For added or modified rollout inputs, search their paths/thread ids in `raw_memories.md`,
read those sections, and open the corresponding `rollout_summaries/*.md` files when
necessary.
2. Route the new signal into existing `MEMORY.md` blocks or create new ones when needed.
3. For removed thread ids, search `MEMORY.md` and surgically delete or rewrite only the
unsupported thread-local memory.
4. If a block mixes removed and undeleted threads, preserve the undeleted-thread content;
split or rewrite the block if that is the cleanest way to delete only the removed part.
3. For deleted inputs, search `MEMORY.md` and surgically delete or rewrite only the
unsupported memory.
4. If a block mixes deleted and still-present evidence, preserve the still-supported content;
split or rewrite the block if that is the cleanest way to delete only the stale part.
5. After `MEMORY.md` is correct, revisit `memory_summary.md` and remove or rewrite stale
summary/index content that no longer has undeleted support.
summary/index content that no longer has current support.
- Integrate new signal into existing artifacts by:
- scanning the newly added raw-memory entries in recency order and identifying which existing blocks they should update
- scanning added or modified raw-memory entries in recency order and identifying which existing blocks they should update
- updating existing knowledge with better/newer evidence
- updating stale or contradicting guidance
- pruning or downgrading memory whose only provenance comes from removed thread ids
- pruning or downgrading memory whose only provenance comes from deleted inputs
- expanding terse old blocks when new summaries/raw memories make the task family clearer
- doing light clustering and merging if needed
- refreshing `MEMORY.md` top-of-file ordering so recent high-utility task families stay easy to find
@@ -774,8 +778,8 @@ WORKFLOW
target, keep its wording, label, and relative order mostly stable. Rewrite/reorder/rename/
split/merge only when fixing a real problem (staleness, ambiguity, schema drift, wrong
boundaries) or when meaningful new evidence materially improves retrieval clarity/searchability.
- Spend most of your deep-dive budget on newly added thread ids and on mixed blocks touched by
removed thread ids. Do not re-read unchanged older threads unless you need them for
- Spend most of your deep-dive budget on added/modified inputs and on mixed blocks touched by
deleted inputs. Do not re-read unchanged older threads unless you need them for
conflict resolution, clustering, or provenance repair.
4. Evidence deep-dive rule (both modes):
@@ -793,8 +797,7 @@ WORKFLOW
evidence, procedural detail, validation signals, and user feedback before finalizing
`MEMORY.md`.
- When deleting stale memory from a mixed block, use the relevant rollout summaries to decide
which details are uniquely supported by removed threads versus still supported by undeleted
threads.
which details are uniquely supported by deleted inputs versus still-supported evidence.
- Use `updated_at` and validation strength together to resolve stale/conflicting notes.
- For user-profile or preference claims, recurrence matters: repeated evidence across
rollouts should generally outrank a single polished but isolated summary.
@@ -811,7 +814,7 @@ WORKFLOW
- remove duplication in memory_summary, skills/, and MEMORY.md
- remove stale or low-signal blocks that are less likely to be useful in the future
- remove or rewrite blocks/task sections whose supporting rollout references point only to
removed thread ids or missing rollout summary files
deleted inputs or missing rollout summary files
- run a global rollout-reference audit on final `MEMORY.md` and fix accidental duplicate
entries / redundant repetition, while preserving intentional multi-task or multi-block
reuse when it adds distinct task-local value

View File

@@ -55,26 +55,15 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
let first = build_test_codex(&server, home.clone()).await?;
let first_request = wait_for_single_request(&first_phase2).await;
let first_prompt = phase2_prompt_text(&first_request);
let _first_prompt = phase2_prompt_text(&first_request);
let first_workspace_diff = read_workspace_diff(&home).await?;
assert!(
first_prompt.contains("- selected inputs this run: 1"),
"expected selected count in first prompt: {first_prompt}"
first_workspace_diff.contains("- A raw_memories.md"),
"expected raw memories to be added in first workspace diff: {first_workspace_diff}"
);
assert!(
first_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected added count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains("- removed from the last successful Phase 2 run: 0"),
"expected removed count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains(&format!("- [added] thread_id={thread_a},")),
"expected thread A to be marked added: {first_prompt}"
);
assert!(
first_prompt.contains("Removed from the last successful Phase 2 selection:\n- none"),
"expected no removed items in first prompt: {first_prompt}"
first_workspace_diff.contains("rollout_a.md"),
"expected rollout A summary to be added: {first_workspace_diff}"
);
wait_for_phase2_success(db.as_ref(), thread_a).await?;
@@ -111,34 +100,27 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
let second = build_test_codex(&server, home.clone()).await?;
let second_request = wait_for_single_request(&second_phase2).await;
let second_prompt = phase2_prompt_text(&second_request);
let _second_prompt = phase2_prompt_text(&second_request);
let second_workspace_diff = read_workspace_diff(&home).await?;
assert!(
second_prompt.contains("- selected inputs this run: 1"),
"expected selected count in second prompt: {second_prompt}"
second_workspace_diff.contains("- M raw_memories.md"),
"expected raw memories to be modified in second workspace diff: {second_workspace_diff}"
);
assert!(
second_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected added count in second prompt: {second_prompt}"
second_workspace_diff.contains("rollout_b.md"),
"expected rollout B summary to be added: {second_workspace_diff}"
);
assert!(
second_prompt.contains("- removed from the last successful Phase 2 run: 1"),
"expected removed count in second prompt: {second_prompt}"
);
assert!(
second_prompt.contains(&format!("- [added] thread_id={thread_b},")),
"expected thread B to be marked added: {second_prompt}"
);
assert!(
second_prompt.contains(&format!("- thread_id={thread_a},")),
"expected thread A to be marked removed: {second_prompt}"
second_workspace_diff.contains("- D rollout_summaries/"),
"expected rollout A summary to be deleted: {second_workspace_diff}"
);
wait_for_phase2_success(db.as_ref(), thread_b).await?;
let raw_memories = tokio::fs::read_to_string(memory_root.join("raw_memories.md")).await?;
assert!(raw_memories.contains("raw memory B"));
assert!(raw_memories.contains("raw memory A"));
assert!(!raw_memories.contains("raw memory A"));
let rollout_summaries = read_rollout_summary_bodies(&memory_root).await?;
assert_eq!(rollout_summaries.len(), 2);
assert_eq!(rollout_summaries.len(), 1);
assert!(
rollout_summaries
.iter()
@@ -150,7 +132,7 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
.any(|summary| summary.contains("git_branch: branch-rollout-b"))
);
assert!(
rollout_summaries
!rollout_summaries
.iter()
.any(|summary| summary.contains("rollout summary A"))
);
@@ -175,11 +157,11 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
)
.await?;
let chronicle_resources = home.path().join("memories_extensions/chronicle/resources");
let chronicle_resources = home.path().join("memories/extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources).await?;
tokio::fs::write(
home.path()
.join("memories_extensions/chronicle/instructions.md"),
.join("memories/extensions/chronicle/instructions.md"),
"instructions",
)
.await?;
@@ -207,33 +189,32 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
let codex = build_test_codex(&server, home.clone()).await?;
let request = wait_for_single_request(&phase2).await;
let prompt = phase2_prompt_text(&request);
let _prompt = phase2_prompt_text(&request);
let workspace_diff = read_workspace_diff(&home).await?;
assert!(
prompt.contains("Memory extension resources removed by retention pruning:"),
"expected extension resource prune report in prompt: {prompt}"
);
assert!(
prompt.contains("- retention window: 7 days"),
"expected retention window in prompt: {prompt}"
);
assert!(
prompt.contains("- extension: chronicle"),
"expected extension name in prompt: {prompt}"
);
assert!(
prompt.contains(&format!(" - resources/{old_file_name}")),
"expected old resource in prompt: {prompt}"
workspace_diff.contains(&format!(
"- D extensions/chronicle/resources/{old_file_name}"
)),
"expected old resource deletion in workspace diff: {workspace_diff}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
let old_file = home.path().join(format!(
"memories/extensions/chronicle/resources/{old_file_name}"
));
wait_for_file_removed(&old_file).await?;
assert!(
!tokio::fs::try_exists(&old_file).await?,
"old extension resource should be pruned"
);
assert!(
tokio::fs::try_exists(&recent_file).await?,
tokio::fs::try_exists(
home.path()
.join("memories/extensions/chronicle/resources")
.join(recent_file.file_name().expect("recent file name"))
)
.await?,
"recent extension resource should be retained"
);
@@ -251,11 +232,11 @@ async fn memories_startup_phase2_processes_old_extension_resources_without_stage
.await?;
let now = Utc::now();
let chronicle_resources = home.path().join("memories_extensions/chronicle/resources");
let chronicle_resources = home.path().join("memories/extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources).await?;
tokio::fs::write(
home.path()
.join("memories_extensions/chronicle/instructions.md"),
.join("memories/extensions/chronicle/instructions.md"),
"instructions",
)
.await?;
@@ -278,20 +259,18 @@ async fn memories_startup_phase2_processes_old_extension_resources_without_stage
let codex = build_test_codex(&server, home.clone()).await?;
let request = wait_for_single_request(&phase2).await;
let prompt = phase2_prompt_text(&request);
let _prompt = phase2_prompt_text(&request);
let workspace_diff = read_workspace_diff(&home).await?;
assert!(
prompt.contains("- selected inputs this run: 0"),
"expected no selected raw inputs in prompt: {prompt}"
);
assert!(
prompt.contains("Memory extension resources removed by retention pruning:"),
"expected extension resource prune report in prompt: {prompt}"
);
assert!(
prompt.contains(&format!(" - resources/{old_file_name}")),
"expected old resource in prompt: {prompt}"
workspace_diff.contains(&format!(
"- D extensions/chronicle/resources/{old_file_name}"
)),
"expected old resource deletion in workspace diff: {workspace_diff}"
);
let old_file = home.path().join(format!(
"memories/extensions/chronicle/resources/{old_file_name}"
));
wait_for_file_removed(&old_file).await?;
shutdown_test_codex(&codex).await?;
@@ -395,18 +374,11 @@ async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs()
let first_phase2_request = wait_for_request(&responses, /*expected_count*/ 1)
.await
.remove(0);
let first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
let _first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
let first_workspace_diff = read_workspace_diff(&home).await?;
assert!(
first_phase2_prompt.contains("- selected inputs this run: 1"),
"expected seeded thread to be selected before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected seeded thread to be added before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains(&format!("- [added] thread_id={thread_id},")),
"expected selected thread in first phase2 prompt: {first_phase2_prompt}"
first_workspace_diff.contains("- A raw_memories.md"),
"expected raw memories to be added before pollution: {first_workspace_diff}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
@@ -461,6 +433,39 @@ async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs()
assert_eq!(selection.removed[0].thread_id, thread_id);
shutdown_test_codex(&resumed).await?;
let removed_phase2 = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-phase2-removed"),
ev_assistant_message("msg-phase2-removed", "phase2 removed complete"),
ev_completed("resp-phase2-removed"),
]),
)
.await;
let cleanup = build_test_codex(&server, home.clone()).await?;
let removed_request = wait_for_single_request(&removed_phase2).await;
let _removed_prompt = phase2_prompt_text(&removed_request);
let workspace_diff = read_workspace_diff(&home).await?;
assert!(
workspace_diff.contains("- D rollout_summaries/"),
"expected polluted thread rollout summary to be deleted: {workspace_diff}"
);
assert!(
workspace_diff.contains("deleted file mode 100644"),
"expected deleted file section in workspace diff: {workspace_diff}"
);
assert!(
workspace_diff.contains(&format!("-thread_id: {thread_id}")),
"expected deleted rollout summary metadata in workspace diff: {workspace_diff}"
);
assert!(
workspace_diff.contains("-rollout summary seeded for web search pollution"),
"expected deleted rollout summary content in workspace diff: {workspace_diff}"
);
wait_for_phase2_no_pending_inputs(db.as_ref()).await?;
shutdown_test_codex(&cleanup).await?;
Ok(())
}
@@ -525,6 +530,10 @@ async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
wait_for_request(mock, /*expected_count*/ 1).await.remove(0)
}
async fn read_workspace_diff(home: &TempDir) -> Result<String> {
Ok(tokio::fs::read_to_string(home.path().join("memories/phase2_workspace_diff.md")).await?)
}
async fn wait_for_file_removed(path: &Path) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
@@ -560,7 +569,7 @@ fn phase2_prompt_text(request: &ResponsesRequest) -> String {
request
.message_input_texts("user")
.into_iter()
.find(|text| text.contains("Current selected Phase 1 inputs:"))
.find(|text| text.contains("Memory workspace diff:"))
.expect("phase2 prompt text")
}
@@ -589,6 +598,27 @@ async fn wait_for_phase2_success(
}
}
async fn wait_for_phase2_no_pending_inputs(db: &codex_state::StateRuntime) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let selection = db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await?;
if selection.selected.is_empty()
&& selection.retained_thread_ids.is_empty()
&& selection.removed.is_empty()
{
return Ok(());
}
assert!(
Instant::now() < deadline,
"timed out waiting for phase2 to clear pending inputs: {selection:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
async fn seed_stage1_output_for_existing_thread(
db: &codex_state::StateRuntime,
thread_id: ThreadId,