mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
feat: mem v2 - PR3 (#11366)
# Memories migration plan (simplified global workflow) ## Target behavior - One shared memory root only: `~/.codex/memories/`. - No per-cwd memory buckets, no cwd hash handling. - Phase 1 candidate rules: - Not currently being processed unless the job lease is stale. - Rollout updated within the max-age window (currently 30 days). - Rollout idle for at least 12 hours (new constant). - Global cap: at most 64 stage-1 jobs in `running` state at any time (new invariant). - Stage-1 model output shape (new): - `rollout_slug` (accepted but ignored for now). - `rollout_summary`. - `raw_memory`. - Phase-1 artifacts written under the shared root: - `rollout_summaries/<thread_id>.md` for each rollout summary. - `raw_memories.md` containing appended/merged raw memory paragraphs. - Phase 2 runs one consolidation agent for the shared `memories/` directory. - Phase-2 lock is DB-backed with 1 hour lease and heartbeat/expiry. ## Current code map - Core startup pipeline: `core/src/memories/startup/mod.rs`. - Stage-1 request+parse: `core/src/memories/startup/extract.rs`, `core/src/memories/stage_one.rs`, templates in `core/templates/memories/`. - File materialization: `core/src/memories/storage.rs`, `core/src/memories/layout.rs`. - Scope routing (cwd/user): `core/src/memories/scope.rs`, `core/src/memories/startup/mod.rs`. - DB job lifecycle and scope queueing: `state/src/runtime/memory.rs`. ## PR plan ## PR 1: Correct phase-1 selection invariants (no behavior-breaking layout changes yet) - Add `PHASE_ONE_MIN_ROLLOUT_IDLE_HOURS: i64 = 12` in `core/src/memories/mod.rs`. - Thread this into `state::claim_stage1_jobs_for_startup(...)`. - Enforce idle-time filter in DB selection logic (not only in-memory filtering after `scan_limit`) so eligible threads are not starved by very recent threads. - Enforce global running cap of 64 at claim time in DB logic: - Count fresh `memory_stage1` running jobs. - Only allow new claims while count < cap. - Keep stale-lease takeover behavior intact. - Add/adjust tests in `state/src/runtime.rs`: - Idle filter inclusion/exclusion around 12h boundary. - Global running-cap guarantee. - Existing stale/fresh ownership behavior still passes. Acceptance criteria: - Startup never creates more than 64 fresh `memory_stage1` running jobs. - Threads updated <12h ago are skipped. - Threads older than 30d are skipped. ## PR 2: Stage-1 output contract + storage artifacts (forward-compatible) - Update parser/types to accept the new structured output while keeping backward compatibility: - Add `rollout_slug` (optional for now). - Add `rollout_summary`. - Keep alias support for legacy `summary` and `rawMemory` until prompt swap completes. - Update stage-1 schema generator in `core/src/memories/stage_one.rs` to include the new keys. - Update prompt templates: - `core/templates/memories/stage_one_system.md`. - `core/templates/memories/stage_one_input.md`. - Replace storage model in `core/src/memories/storage.rs`: - Introduce `rollout_summaries/` directory writer (`<thread_id>.md` files). - Introduce `raw_memories.md` aggregator writer from DB rows. - Keep deterministic rebuild behavior from DB outputs so files can always be regenerated. - Update consolidation prompt template to reference `rollout_summaries/` + `raw_memories.md` inputs. Acceptance criteria: - Stage-1 accepts both old and new output keys during migration. - Phase-1 artifacts are generated in new format from DB state. - No dependence on per-thread files in `raw_memories/`. ## PR 3: Remove per-cwd memories and move to one global memory root - Simplify layout in `core/src/memories/layout.rs`: - Single root: `codex_home/memories`. - Remove cwd-hash bucket helpers and normalization logic used only for memory pathing. - Remove scope branching from startup phase-2 dispatch path: - No cwd/user mapping in `core/src/memories/startup/mod.rs`. - One target root for consolidation. - In `state/src/runtime/memory.rs`, stop enqueueing/handling cwd consolidation scope. - Keep one logical consolidation scope/job key (global/user) to avoid a risky schema rewrite in same PR. - Add one-time migration helper (core side) to preserve current shared memory output: - If `~/.codex/memories/user/memory` exists and new root is empty, move/copy contents into `~/.codex/memories`. - Leave old hashed cwd buckets untouched for now (safe/no-destructive migration). Acceptance criteria: - New runs only read/write `~/.codex/memories`. - No new cwd-scoped consolidation jobs are enqueued. - Existing user-shared memory content is preserved. ## PR 4: Phase-2 global lock simplification and cleanup - Replace multi-scope dispatch with a single global consolidation claim path: - Either reuse jobs table with one fixed key, or add a tiny dedicated lock helper; keep 1h lease. - Ensure at most one consolidation agent can run at once. - Keep heartbeat + stale lock recovery semantics in `core/src/memories/startup/watch.rs`. - Remove dead scope code and legacy constants no longer used. - Update tests: - One-agent-at-a-time behavior. - Lock expiry allows takeover after stale lease. Acceptance criteria: - Exactly one phase-2 consolidation agent can be active cluster-wide (per local DB). - Stale lock recovers automatically. ## PR 5: Final cleanup and docs - Remove legacy artifacts and references: - `raw_memories/` and `memory_summary.md` assumptions from prompts/comments/tests. - Scope constants for cwd memory pathing in core/state if fully unused. - Update docs under `docs/` for memory workflow and directory layout. - Add a brief operator note for rollout: compatibility window for old stage-1 JSON keys and when to remove aliases. Acceptance criteria: - Code and docs reflect only the simplified global workflow. - No stale references to per-cwd memory buckets. ## Notes on sequencing - PR 1 is safest first because it improves correctness without changing external artifact layout. - PR 2 keeps parser compatibility so prompt deployment can happen independently. - PR 3 and PR 4 split filesystem/scope simplification from locking simplification to reduce blast radius. - PR 5 is intentionally cleanup-only.
This commit is contained in:
@@ -1,35 +1,18 @@
|
||||
use crate::path_utils::normalize_for_path_comparison;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::scope::MEMORY_SCOPE_KEY_USER;
|
||||
|
||||
pub(super) const MEMORY_SUBDIR: &str = "memory";
|
||||
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
|
||||
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
|
||||
pub(super) const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
|
||||
pub(super) const LEGACY_CONSOLIDATED_FILENAME: &str = "consolidated.md";
|
||||
pub(super) const SKILLS_SUBDIR: &str = "skills";
|
||||
const LEGACY_MEMORY_SUBDIR: &str = "memory";
|
||||
|
||||
const CWD_MEMORY_BUCKET_HEX_LEN: usize = 16;
|
||||
|
||||
/// Returns the on-disk memory root directory for a given working directory.
|
||||
///
|
||||
/// The cwd is normalized and hashed into a deterministic bucket under
|
||||
/// `<codex_home>/memories/<hash>/memory`.
|
||||
pub(super) fn memory_root_for_cwd(codex_home: &Path, cwd: &Path) -> PathBuf {
|
||||
let bucket = memory_bucket_for_cwd(cwd);
|
||||
codex_home.join("memories").join(bucket).join(MEMORY_SUBDIR)
|
||||
}
|
||||
|
||||
/// Returns the on-disk user-shared memory root directory.
|
||||
pub(super) fn memory_root_for_user(codex_home: &Path) -> PathBuf {
|
||||
codex_home
|
||||
.join("memories")
|
||||
.join(MEMORY_SCOPE_KEY_USER)
|
||||
.join(MEMORY_SUBDIR)
|
||||
/// Returns the shared on-disk memory root directory.
|
||||
pub(super) fn memory_root(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join("memories")
|
||||
}
|
||||
|
||||
pub(super) fn rollout_summaries_dir(root: &Path) -> PathBuf {
|
||||
@@ -40,20 +23,62 @@ pub(super) fn raw_memories_file(root: &Path) -> PathBuf {
|
||||
root.join(RAW_MEMORIES_FILENAME)
|
||||
}
|
||||
|
||||
/// Migrates legacy user memory contents into the shared root when no shared-root
|
||||
/// phase artifacts exist yet.
|
||||
pub(super) async fn migrate_legacy_user_memory_root_if_needed(
|
||||
codex_home: &Path,
|
||||
) -> std::io::Result<()> {
|
||||
let root = memory_root(codex_home);
|
||||
let legacy = legacy_user_memory_root(codex_home);
|
||||
|
||||
if !tokio::fs::try_exists(&legacy).await? || global_root_has_phase_artifacts(&root).await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
copy_dir_contents_if_missing(&legacy, &root).await
|
||||
}
|
||||
|
||||
/// Ensures the phase-1 memory directory layout exists for the given root.
|
||||
pub(super) async fn ensure_layout(root: &Path) -> std::io::Result<()> {
|
||||
tokio::fs::create_dir_all(rollout_summaries_dir(root)).await
|
||||
}
|
||||
|
||||
fn memory_bucket_for_cwd(cwd: &Path) -> String {
|
||||
let normalized = normalize_cwd_for_memory(cwd);
|
||||
let normalized = normalized.to_string_lossy();
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(normalized.as_bytes());
|
||||
let full_hash = format!("{:x}", hasher.finalize());
|
||||
full_hash[..CWD_MEMORY_BUCKET_HEX_LEN].to_string()
|
||||
fn legacy_user_memory_root(codex_home: &Path) -> PathBuf {
|
||||
codex_home
|
||||
.join("memories")
|
||||
.join(MEMORY_SCOPE_KEY_USER)
|
||||
.join(LEGACY_MEMORY_SUBDIR)
|
||||
}
|
||||
|
||||
fn normalize_cwd_for_memory(cwd: &Path) -> PathBuf {
|
||||
normalize_for_path_comparison(cwd).unwrap_or_else(|_| cwd.to_path_buf())
|
||||
async fn global_root_has_phase_artifacts(root: &Path) -> std::io::Result<bool> {
|
||||
if tokio::fs::try_exists(&rollout_summaries_dir(root)).await?
|
||||
|| tokio::fs::try_exists(&raw_memories_file(root)).await?
|
||||
|| tokio::fs::try_exists(&root.join(MEMORY_REGISTRY_FILENAME)).await?
|
||||
|| tokio::fs::try_exists(&root.join(LEGACY_CONSOLIDATED_FILENAME)).await?
|
||||
|| tokio::fs::try_exists(&root.join(SKILLS_SUBDIR)).await?
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn copy_dir_contents_if_missing<'a>(
|
||||
src_dir: &'a Path,
|
||||
dst_dir: &'a Path,
|
||||
) -> futures::future::BoxFuture<'a, std::io::Result<()>> {
|
||||
Box::pin(async move {
|
||||
tokio::fs::create_dir_all(dst_dir).await?;
|
||||
let mut dir = tokio::fs::read_dir(src_dir).await?;
|
||||
while let Some(entry) = dir.next_entry().await? {
|
||||
let src_path = entry.path();
|
||||
let dst_path = dst_dir.join(entry.file_name());
|
||||
let metadata = entry.metadata().await?;
|
||||
if metadata.is_dir() {
|
||||
copy_dir_contents_if_missing(&src_path, &dst_path).await?;
|
||||
} else if metadata.is_file() && !tokio::fs::try_exists(&dst_path).await? {
|
||||
tokio::fs::copy(&src_path, &dst_path).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
pub(super) const MEMORY_SCOPE_KIND_CWD: &str = "cwd";
|
||||
pub(super) const MEMORY_SCOPE_KIND_USER: &str = "user";
|
||||
pub(super) const MEMORY_SCOPE_KEY_USER: &str = "user";
|
||||
|
||||
@@ -7,10 +7,9 @@ use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::features::Feature;
|
||||
use crate::memories::layout::memory_root_for_cwd;
|
||||
use crate::memories::layout::memory_root_for_user;
|
||||
use crate::memories::layout::memory_root;
|
||||
use crate::memories::layout::migrate_legacy_user_memory_root_if_needed;
|
||||
use crate::memories::scope::MEMORY_SCOPE_KEY_USER;
|
||||
use crate::memories::scope::MEMORY_SCOPE_KIND_CWD;
|
||||
use crate::memories::scope::MEMORY_SCOPE_KIND_USER;
|
||||
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
use codex_otel::OtelManager;
|
||||
@@ -61,7 +60,7 @@ pub(super) struct MemoryScopeTarget {
|
||||
|
||||
/// Converts a pending scope consolidation row into a concrete filesystem target for phase 2.
|
||||
///
|
||||
/// Unsupported scope kinds or malformed user-scope keys are ignored.
|
||||
/// Unsupported scope kinds or malformed keys are ignored.
|
||||
pub(super) fn memory_scope_target_for_pending_scope(
|
||||
config: &Config,
|
||||
pending_scope: codex_state::PendingScopeConsolidation,
|
||||
@@ -70,14 +69,6 @@ pub(super) fn memory_scope_target_for_pending_scope(
|
||||
let scope_key = pending_scope.scope_key;
|
||||
|
||||
match scope_kind.as_str() {
|
||||
MEMORY_SCOPE_KIND_CWD => {
|
||||
let cwd = PathBuf::from(&scope_key);
|
||||
Some(MemoryScopeTarget {
|
||||
scope_kind: MEMORY_SCOPE_KIND_CWD,
|
||||
scope_key,
|
||||
memory_root: memory_root_for_cwd(&config.codex_home, &cwd),
|
||||
})
|
||||
}
|
||||
MEMORY_SCOPE_KIND_USER => {
|
||||
if scope_key != MEMORY_SCOPE_KEY_USER {
|
||||
warn!(
|
||||
@@ -89,7 +80,7 @@ pub(super) fn memory_scope_target_for_pending_scope(
|
||||
Some(MemoryScopeTarget {
|
||||
scope_kind: MEMORY_SCOPE_KIND_USER,
|
||||
scope_key,
|
||||
memory_root: memory_root_for_user(&config.codex_home),
|
||||
memory_root: memory_root(&config.codex_home),
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
@@ -139,6 +130,10 @@ pub(super) async fn run_memories_startup_pipeline(
|
||||
session: &Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
) -> CodexResult<()> {
|
||||
if let Err(err) = migrate_legacy_user_memory_root_if_needed(&config.codex_home).await {
|
||||
warn!("failed migrating legacy shared memory root: {err}");
|
||||
}
|
||||
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
warn!("state db unavailable for memories startup pipeline; skipping");
|
||||
return Ok(());
|
||||
@@ -302,23 +297,12 @@ async fn list_consolidation_scopes(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::test_config;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Verifies that phase-2 pending scope rows are translated only for supported scopes.
|
||||
#[test]
|
||||
fn pending_scope_mapping_accepts_supported_scopes_only() {
|
||||
let mut config = test_config();
|
||||
config.codex_home = PathBuf::from("/tmp/memory-startup-test-home");
|
||||
|
||||
let cwd_target = memory_scope_target_for_pending_scope(
|
||||
&config,
|
||||
codex_state::PendingScopeConsolidation {
|
||||
scope_kind: MEMORY_SCOPE_KIND_CWD.to_string(),
|
||||
scope_key: "/tmp/project-a".to_string(),
|
||||
},
|
||||
)
|
||||
.expect("cwd scope should map");
|
||||
assert_eq!(cwd_target.scope_kind, MEMORY_SCOPE_KIND_CWD);
|
||||
config.codex_home = "/tmp/memory-startup-test-home".into();
|
||||
|
||||
let user_target = memory_scope_target_for_pending_scope(
|
||||
&config,
|
||||
@@ -341,6 +325,17 @@ mod tests {
|
||||
.is_none()
|
||||
);
|
||||
|
||||
assert!(
|
||||
memory_scope_target_for_pending_scope(
|
||||
&config,
|
||||
codex_state::PendingScopeConsolidation {
|
||||
scope_kind: "cwd".to_string(),
|
||||
scope_key: "/tmp/project-a".to_string(),
|
||||
},
|
||||
)
|
||||
.is_none()
|
||||
);
|
||||
|
||||
assert!(
|
||||
memory_scope_target_for_pending_scope(
|
||||
&config,
|
||||
|
||||
@@ -6,7 +6,8 @@ use super::storage::rebuild_raw_memories_file_from_memories;
|
||||
use super::storage::sync_rollout_summaries_from_memories;
|
||||
use super::storage::wipe_consolidation_outputs;
|
||||
use crate::memories::layout::ensure_layout;
|
||||
use crate::memories::layout::memory_root_for_cwd;
|
||||
use crate::memories::layout::memory_root;
|
||||
use crate::memories::layout::migrate_legacy_user_memory_root_if_needed;
|
||||
use crate::memories::layout::raw_memories_file;
|
||||
use crate::memories::layout::rollout_summaries_dir;
|
||||
use chrono::TimeZone;
|
||||
@@ -21,46 +22,37 @@ use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn memory_root_varies_by_cwd() {
|
||||
fn memory_root_uses_shared_global_path() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let codex_home = dir.path().join("codex");
|
||||
let cwd_a = dir.path().join("workspace-a");
|
||||
let cwd_b = dir.path().join("workspace-b");
|
||||
|
||||
std::fs::create_dir_all(&cwd_a).expect("mkdir a");
|
||||
std::fs::create_dir_all(&cwd_b).expect("mkdir b");
|
||||
|
||||
let root_a = memory_root_for_cwd(&codex_home, &cwd_a);
|
||||
let root_b = memory_root_for_cwd(&codex_home, &cwd_b);
|
||||
assert!(root_a.starts_with(codex_home.join("memories")));
|
||||
assert!(root_b.starts_with(codex_home.join("memories")));
|
||||
assert!(root_a.ends_with("memory"));
|
||||
assert!(root_b.ends_with("memory"));
|
||||
assert_ne!(root_a, root_b);
|
||||
|
||||
let bucket_a = root_a
|
||||
.parent()
|
||||
.and_then(std::path::Path::file_name)
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.expect("cwd bucket");
|
||||
assert_eq!(bucket_a.len(), 16);
|
||||
assert!(bucket_a.chars().all(|ch| ch.is_ascii_hexdigit()));
|
||||
assert_eq!(memory_root(&codex_home), codex_home.join("memories"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn memory_root_encoding_avoids_component_collisions() {
|
||||
#[tokio::test]
|
||||
async fn migrate_legacy_user_memory_root_if_needed_copies_contents() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let codex_home = dir.path().join("codex");
|
||||
let legacy_root = codex_home.join("memories").join("user").join("memory");
|
||||
tokio::fs::create_dir_all(legacy_root.join("rollout_summaries"))
|
||||
.await
|
||||
.expect("create legacy rollout summaries dir");
|
||||
tokio::fs::write(
|
||||
legacy_root.join("rollout_summaries").join("thread.md"),
|
||||
"summary",
|
||||
)
|
||||
.await
|
||||
.expect("write legacy rollout summary");
|
||||
tokio::fs::write(legacy_root.join("raw_memories.md"), "raw")
|
||||
.await
|
||||
.expect("write legacy raw memories");
|
||||
|
||||
let cwd_question = dir.path().join("workspace?one");
|
||||
let cwd_hash = dir.path().join("workspace#one");
|
||||
migrate_legacy_user_memory_root_if_needed(&codex_home)
|
||||
.await
|
||||
.expect("migrate legacy memory root");
|
||||
|
||||
let root_question = memory_root_for_cwd(&codex_home, &cwd_question);
|
||||
let root_hash = memory_root_for_cwd(&codex_home, &cwd_hash);
|
||||
|
||||
assert_ne!(root_question, root_hash);
|
||||
assert!(!root_question.display().to_string().contains("workspace"));
|
||||
assert!(!root_hash.display().to_string().contains("workspace"));
|
||||
let root = memory_root(&codex_home);
|
||||
assert!(root.join("rollout_summaries").join("thread.md").is_file());
|
||||
assert!(root.join("raw_memories.md").is_file());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -39,7 +39,6 @@ use uuid::Uuid;
|
||||
pub const STATE_DB_FILENAME: &str = "state";
|
||||
pub const STATE_DB_VERSION: u32 = 4;
|
||||
|
||||
const MEMORY_SCOPE_KIND_CWD: &str = "cwd";
|
||||
const MEMORY_SCOPE_KIND_USER: &str = "user";
|
||||
const MEMORY_SCOPE_KEY_USER: &str = "user";
|
||||
|
||||
@@ -90,7 +89,7 @@ pub struct Stage1StartupClaimParams<'a> {
|
||||
/// Scope row used to queue phase-2 consolidation work.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PendingScopeConsolidation {
|
||||
/// Scope family (`cwd` or `user`).
|
||||
/// Scope family (`user`).
|
||||
pub scope_kind: String,
|
||||
/// Scope identifier keyed by `scope_kind`.
|
||||
pub scope_key: String,
|
||||
@@ -1473,7 +1472,7 @@ WHERE kind = 'memory_stage1'
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "/tmp/project-a", 100)
|
||||
.enqueue_scope_consolidation("user", "user", 100)
|
||||
.await
|
||||
.expect("enqueue scope");
|
||||
|
||||
@@ -1484,13 +1483,13 @@ WHERE kind = 'memory_stage1'
|
||||
assert_eq!(
|
||||
scopes,
|
||||
vec![PendingScopeConsolidation {
|
||||
scope_kind: "cwd".to_string(),
|
||||
scope_key: "/tmp/project-a".to_string(),
|
||||
scope_kind: "user".to_string(),
|
||||
scope_key: "user".to_string(),
|
||||
}]
|
||||
);
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_phase2_job("cwd", "/tmp/project-a", owner, 3600)
|
||||
.try_claim_phase2_job("user", "user", owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2");
|
||||
let (ownership_token, input_watermark) = match claim {
|
||||
@@ -1503,8 +1502,8 @@ WHERE kind = 'memory_stage1'
|
||||
assert!(
|
||||
runtime
|
||||
.mark_phase2_job_succeeded(
|
||||
"cwd",
|
||||
"/tmp/project-a",
|
||||
"user",
|
||||
"user",
|
||||
ownership_token.as_str(),
|
||||
input_watermark,
|
||||
)
|
||||
@@ -1514,18 +1513,18 @@ WHERE kind = 'memory_stage1'
|
||||
);
|
||||
|
||||
let claim_up_to_date = runtime
|
||||
.try_claim_phase2_job("cwd", "/tmp/project-a", owner, 3600)
|
||||
.try_claim_phase2_job("user", "user", owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 up-to-date");
|
||||
assert_eq!(claim_up_to_date, Phase2JobClaimOutcome::SkippedNotDirty);
|
||||
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "/tmp/project-a", 101)
|
||||
.enqueue_scope_consolidation("user", "user", 101)
|
||||
.await
|
||||
.expect("enqueue scope again");
|
||||
|
||||
let claim_rerun = runtime
|
||||
.try_claim_phase2_job("cwd", "/tmp/project-a", owner, 3600)
|
||||
.try_claim_phase2_job("user", "user", owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 rerun");
|
||||
assert!(
|
||||
@@ -1537,38 +1536,36 @@ WHERE kind = 'memory_stage1'
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_stage1_outputs_for_cwd_scope_matches_canonical_equivalent_paths() {
|
||||
async fn list_stage1_outputs_for_user_scope_returns_latest_outputs() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let workspace = codex_home.join("workspace");
|
||||
tokio::fs::create_dir_all(&workspace)
|
||||
.await
|
||||
.expect("create workspace");
|
||||
let non_normalized_cwd = workspace.join("..").join("workspace");
|
||||
let canonical_scope_key = workspace
|
||||
.canonicalize()
|
||||
.expect("canonicalize workspace")
|
||||
.display()
|
||||
.to_string();
|
||||
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_id_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_id_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
non_normalized_cwd,
|
||||
thread_id_a,
|
||||
codex_home.join("workspace-a"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
.expect("upsert thread a");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id_b,
|
||||
codex_home.join("workspace-b"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread b");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
|
||||
.try_claim_stage1_job(thread_id_a, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1");
|
||||
.expect("claim stage1 a");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
@@ -1576,56 +1573,77 @@ WHERE kind = 'memory_stage1'
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
thread_id_a,
|
||||
ownership_token.as_str(),
|
||||
100,
|
||||
"raw memory",
|
||||
"summary",
|
||||
"raw memory a",
|
||||
"summary a",
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded"),
|
||||
"stage1 success should persist output"
|
||||
.expect("mark stage1 succeeded a"),
|
||||
"stage1 success should persist output a"
|
||||
);
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id_b, owner, 101, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1 b");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id_b,
|
||||
ownership_token.as_str(),
|
||||
101,
|
||||
"raw memory b",
|
||||
"summary b",
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded b"),
|
||||
"stage1 success should persist output b"
|
||||
);
|
||||
|
||||
let outputs = runtime
|
||||
.list_stage1_outputs_for_scope("cwd", canonical_scope_key.as_str(), 10)
|
||||
.list_stage1_outputs_for_scope("user", "user", 10)
|
||||
.await
|
||||
.expect("list stage1 outputs for canonical cwd scope");
|
||||
assert_eq!(outputs.len(), 1);
|
||||
assert_eq!(outputs[0].thread_id, thread_id);
|
||||
assert_eq!(outputs[0].summary, "summary");
|
||||
.expect("list stage1 outputs for user scope");
|
||||
assert_eq!(outputs.len(), 2);
|
||||
assert_eq!(outputs[0].thread_id, thread_id_b);
|
||||
assert_eq!(outputs[0].summary, "summary b");
|
||||
assert_eq!(outputs[1].thread_id, thread_id_a);
|
||||
assert_eq!(outputs[1].summary, "summary a");
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_stage1_job_succeeded_normalizes_cwd_scope_job_key() {
|
||||
async fn mark_stage1_job_succeeded_enqueues_single_user_scope() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let workspace = codex_home.join("workspace");
|
||||
tokio::fs::create_dir_all(&workspace)
|
||||
.await
|
||||
.expect("create workspace");
|
||||
let canonical_scope_key = workspace
|
||||
.canonicalize()
|
||||
.expect("canonicalize workspace")
|
||||
.display()
|
||||
.to_string();
|
||||
let cwd_alias = workspace.join(".");
|
||||
|
||||
let thread_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id a");
|
||||
let thread_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id b");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(&codex_home, thread_a, workspace))
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_a,
|
||||
codex_home.join("workspace-a"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread a");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(&codex_home, thread_b, cwd_alias))
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_b,
|
||||
codex_home.join("workspace-b"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread b");
|
||||
|
||||
@@ -1665,13 +1683,13 @@ WHERE kind = 'memory_stage1'
|
||||
.list_pending_scope_consolidations(10)
|
||||
.await
|
||||
.expect("list pending scopes");
|
||||
let cwd_scopes = pending_scopes
|
||||
.iter()
|
||||
.filter(|scope| scope.scope_kind == "cwd")
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(cwd_scopes.len(), 1);
|
||||
assert_eq!(cwd_scopes[0].scope_key, canonical_scope_key);
|
||||
assert_eq!(
|
||||
pending_scopes,
|
||||
vec![PendingScopeConsolidation {
|
||||
scope_kind: "user".to_string(),
|
||||
scope_key: "user".to_string(),
|
||||
}]
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
@@ -1685,28 +1703,28 @@ WHERE kind = 'memory_stage1'
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "scope-running", 200)
|
||||
.enqueue_scope_consolidation("user", "scope-running", 200)
|
||||
.await
|
||||
.expect("enqueue running scope");
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "scope-backoff", 199)
|
||||
.enqueue_scope_consolidation("user", "scope-backoff", 199)
|
||||
.await
|
||||
.expect("enqueue backoff scope");
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "scope-exhausted", 198)
|
||||
.enqueue_scope_consolidation("user", "scope-exhausted", 198)
|
||||
.await
|
||||
.expect("enqueue exhausted scope");
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "scope-claimable-a", 90)
|
||||
.enqueue_scope_consolidation("user", "scope-claimable-a", 90)
|
||||
.await
|
||||
.expect("enqueue claimable scope a");
|
||||
runtime
|
||||
.enqueue_scope_consolidation("cwd", "scope-claimable-b", 89)
|
||||
.enqueue_scope_consolidation("user", "scope-claimable-b", 89)
|
||||
.await
|
||||
.expect("enqueue claimable scope b");
|
||||
|
||||
let running_claim = runtime
|
||||
.try_claim_phase2_job("cwd", "scope-running", owner, 3600)
|
||||
.try_claim_phase2_job("user", "scope-running", owner, 3600)
|
||||
.await
|
||||
.expect("claim running scope");
|
||||
assert!(
|
||||
@@ -1715,7 +1733,7 @@ WHERE kind = 'memory_stage1'
|
||||
);
|
||||
|
||||
let backoff_claim = runtime
|
||||
.try_claim_phase2_job("cwd", "scope-backoff", owner, 3600)
|
||||
.try_claim_phase2_job("user", "scope-backoff", owner, 3600)
|
||||
.await
|
||||
.expect("claim backoff scope");
|
||||
let backoff_token = match backoff_claim {
|
||||
@@ -1727,7 +1745,7 @@ WHERE kind = 'memory_stage1'
|
||||
assert!(
|
||||
runtime
|
||||
.mark_phase2_job_failed(
|
||||
"cwd",
|
||||
"user",
|
||||
"scope-backoff",
|
||||
backoff_token.as_str(),
|
||||
"temporary failure",
|
||||
@@ -1739,7 +1757,7 @@ WHERE kind = 'memory_stage1'
|
||||
);
|
||||
|
||||
sqlx::query("UPDATE jobs SET retry_remaining = 0 WHERE kind = ? AND job_key = ?")
|
||||
.bind("memory_consolidate_cwd")
|
||||
.bind("memory_consolidate_user")
|
||||
.bind("scope-exhausted")
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
@@ -1753,11 +1771,11 @@ WHERE kind = 'memory_stage1'
|
||||
pending,
|
||||
vec![
|
||||
PendingScopeConsolidation {
|
||||
scope_kind: "cwd".to_string(),
|
||||
scope_kind: "user".to_string(),
|
||||
scope_key: "scope-claimable-a".to_string(),
|
||||
},
|
||||
PendingScopeConsolidation {
|
||||
scope_kind: "cwd".to_string(),
|
||||
scope_kind: "user".to_string(),
|
||||
scope_key: "scope-claimable-b".to_string(),
|
||||
},
|
||||
]
|
||||
|
||||
@@ -6,36 +6,28 @@ use chrono::Duration;
|
||||
use sqlx::Executor;
|
||||
use sqlx::QueryBuilder;
|
||||
use sqlx::Sqlite;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
const JOB_KIND_MEMORY_STAGE1: &str = "memory_stage1";
|
||||
const JOB_KIND_MEMORY_CONSOLIDATE_CWD: &str = "memory_consolidate_cwd";
|
||||
const JOB_KIND_MEMORY_CONSOLIDATE_USER: &str = "memory_consolidate_user";
|
||||
|
||||
const DEFAULT_RETRY_REMAINING: i64 = 3;
|
||||
|
||||
fn job_kind_for_scope(scope_kind: &str) -> Option<&'static str> {
|
||||
match scope_kind {
|
||||
MEMORY_SCOPE_KIND_CWD => Some(JOB_KIND_MEMORY_CONSOLIDATE_CWD),
|
||||
MEMORY_SCOPE_KIND_USER => Some(JOB_KIND_MEMORY_CONSOLIDATE_USER),
|
||||
_ => None,
|
||||
if scope_kind == MEMORY_SCOPE_KIND_USER {
|
||||
Some(JOB_KIND_MEMORY_CONSOLIDATE_USER)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn scope_kind_for_job_kind(job_kind: &str) -> Option<&'static str> {
|
||||
match job_kind {
|
||||
JOB_KIND_MEMORY_CONSOLIDATE_CWD => Some(MEMORY_SCOPE_KIND_CWD),
|
||||
JOB_KIND_MEMORY_CONSOLIDATE_USER => Some(MEMORY_SCOPE_KIND_USER),
|
||||
_ => None,
|
||||
if job_kind == JOB_KIND_MEMORY_CONSOLIDATE_USER {
|
||||
Some(MEMORY_SCOPE_KIND_USER)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_cwd_for_scope_matching(cwd: &str) -> Option<PathBuf> {
|
||||
Path::new(cwd).canonicalize().ok()
|
||||
}
|
||||
|
||||
impl StateRuntime {
|
||||
pub async fn claim_stage1_jobs_for_startup(
|
||||
&self,
|
||||
@@ -156,7 +148,7 @@ WHERE thread_id = ?
|
||||
pub async fn list_stage1_outputs_for_scope(
|
||||
&self,
|
||||
scope_kind: &str,
|
||||
scope_key: &str,
|
||||
_scope_key: &str,
|
||||
n: usize,
|
||||
) -> anyhow::Result<Vec<Stage1Output>> {
|
||||
if n == 0 {
|
||||
@@ -164,58 +156,6 @@ WHERE thread_id = ?
|
||||
}
|
||||
|
||||
let rows = match scope_kind {
|
||||
MEMORY_SCOPE_KIND_CWD => {
|
||||
let exact_rows = sqlx::query(
|
||||
r#"
|
||||
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.summary, so.generated_at
|
||||
FROM stage1_outputs AS so
|
||||
JOIN threads AS t ON t.id = so.thread_id
|
||||
WHERE t.cwd = ?
|
||||
ORDER BY so.source_updated_at DESC, so.thread_id DESC
|
||||
LIMIT ?
|
||||
"#,
|
||||
)
|
||||
.bind(scope_key)
|
||||
.bind(n as i64)
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
if let Some(normalized_scope_key) = normalize_cwd_for_scope_matching(scope_key) {
|
||||
let mut rows = Vec::new();
|
||||
let mut selected_thread_ids = HashSet::new();
|
||||
let candidate_rows = sqlx::query(
|
||||
r#"
|
||||
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.summary, so.generated_at, t.cwd AS thread_cwd
|
||||
FROM stage1_outputs AS so
|
||||
JOIN threads AS t ON t.id = so.thread_id
|
||||
ORDER BY so.source_updated_at DESC, so.thread_id DESC
|
||||
"#,
|
||||
)
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
for row in candidate_rows {
|
||||
if rows.len() >= n {
|
||||
break;
|
||||
}
|
||||
let thread_id: String = row.try_get("thread_id")?;
|
||||
if selected_thread_ids.contains(&thread_id) {
|
||||
continue;
|
||||
}
|
||||
let thread_cwd: String = row.try_get("thread_cwd")?;
|
||||
if let Some(normalized_thread_cwd) =
|
||||
normalize_cwd_for_scope_matching(&thread_cwd)
|
||||
&& normalized_thread_cwd == normalized_scope_key
|
||||
{
|
||||
selected_thread_ids.insert(thread_id);
|
||||
rows.push(row);
|
||||
}
|
||||
}
|
||||
if rows.is_empty() { exact_rows } else { rows }
|
||||
} else {
|
||||
exact_rows
|
||||
}
|
||||
}
|
||||
MEMORY_SCOPE_KIND_USER => {
|
||||
sqlx::query(
|
||||
r#"
|
||||
@@ -224,7 +164,7 @@ FROM stage1_outputs AS so
|
||||
JOIN threads AS t ON t.id = so.thread_id
|
||||
ORDER BY so.source_updated_at DESC, so.thread_id DESC
|
||||
LIMIT ?
|
||||
"#,
|
||||
"#,
|
||||
)
|
||||
.bind(n as i64)
|
||||
.fetch_all(self.pool.as_ref())
|
||||
@@ -468,37 +408,13 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
if let Some(thread_row) = sqlx::query(
|
||||
r#"
|
||||
SELECT cwd
|
||||
FROM threads
|
||||
WHERE id = ?
|
||||
"#,
|
||||
enqueue_scope_consolidation_with_executor(
|
||||
&mut *tx,
|
||||
MEMORY_SCOPE_KIND_USER,
|
||||
MEMORY_SCOPE_KEY_USER,
|
||||
source_updated_at,
|
||||
)
|
||||
.bind(thread_id.as_str())
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
{
|
||||
let cwd: String = thread_row.try_get("cwd")?;
|
||||
let normalized_cwd = normalize_cwd_for_scope_matching(&cwd)
|
||||
.unwrap_or_else(|| PathBuf::from(&cwd))
|
||||
.display()
|
||||
.to_string();
|
||||
enqueue_scope_consolidation_with_executor(
|
||||
&mut *tx,
|
||||
MEMORY_SCOPE_KIND_CWD,
|
||||
&normalized_cwd,
|
||||
source_updated_at,
|
||||
)
|
||||
.await?;
|
||||
enqueue_scope_consolidation_with_executor(
|
||||
&mut *tx,
|
||||
MEMORY_SCOPE_KIND_USER,
|
||||
MEMORY_SCOPE_KEY_USER,
|
||||
source_updated_at,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(true)
|
||||
@@ -570,7 +486,7 @@ WHERE kind = ? AND job_key = ?
|
||||
r#"
|
||||
SELECT kind, job_key
|
||||
FROM jobs
|
||||
WHERE kind IN (?, ?)
|
||||
WHERE kind = ?
|
||||
AND input_watermark IS NOT NULL
|
||||
AND input_watermark > COALESCE(last_success_watermark, 0)
|
||||
AND retry_remaining > 0
|
||||
@@ -580,7 +496,6 @@ ORDER BY input_watermark DESC, kind ASC, job_key ASC
|
||||
LIMIT ?
|
||||
"#,
|
||||
)
|
||||
.bind(JOB_KIND_MEMORY_CONSOLIDATE_CWD)
|
||||
.bind(JOB_KIND_MEMORY_CONSOLIDATE_USER)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
|
||||
Reference in New Issue
Block a user