feat: mem v2 - PR4 (#11369)

# Memories migration plan (simplified global workflow)

## Target behavior

- One shared memory root only: `~/.codex/memories/`.
- No per-cwd memory buckets, no cwd hash handling.
- Phase 1 candidate rules:
- Not currently being processed unless the job lease is stale.
- Rollout updated within the max-age window (currently 30 days).
- Rollout idle for at least 12 hours (new constant).
- Global cap: at most 64 stage-1 jobs in `running` state at any time
(new invariant).
- Stage-1 model output shape (new):
- `rollout_slug` (accepted but ignored for now).
- `rollout_summary`.
- `raw_memory`.
- Phase-1 artifacts written under the shared root:
- `rollout_summaries/<thread_id>.md` for each rollout summary.
- `raw_memories.md` containing appended/merged raw memory paragraphs.
- Phase 2 runs one consolidation agent for the shared `memories/`
directory.
- Phase-2 lock is DB-backed with 1 hour lease and heartbeat/expiry.

## Current code map

- Core startup pipeline: `core/src/memories/startup/mod.rs`.
- Stage-1 request+parse: `core/src/memories/startup/extract.rs`,
`core/src/memories/stage_one.rs`, templates in
`core/templates/memories/`.
- File materialization: `core/src/memories/storage.rs`,
`core/src/memories/layout.rs`.
- Scope routing (cwd/user): `core/src/memories/scope.rs`,
`core/src/memories/startup/mod.rs`.
- DB job lifecycle and scope queueing: `state/src/runtime/memory.rs`.

## PR plan

## PR 1: Correct phase-1 selection invariants (no behavior-breaking
layout changes yet)

- Add `PHASE_ONE_MIN_ROLLOUT_IDLE_HOURS: i64 = 12` in
`core/src/memories/mod.rs`.
- Thread this into `state::claim_stage1_jobs_for_startup(...)`.
- Enforce idle-time filter in DB selection logic (not only in-memory
filtering after `scan_limit`) so eligible threads are not starved by
very recent threads.
- Enforce global running cap of 64 at claim time in DB logic:
- Count fresh `memory_stage1` running jobs.
- Only allow new claims while count < cap.
- Keep stale-lease takeover behavior intact.
- Add/adjust tests in `state/src/runtime.rs`:
- Idle filter inclusion/exclusion around 12h boundary.
- Global running-cap guarantee.
- Existing stale/fresh ownership behavior still passes.

Acceptance criteria:
- Startup never creates more than 64 fresh `memory_stage1` running jobs.
- Threads updated <12h ago are skipped.
- Threads older than 30d are skipped.

## PR 2: Stage-1 output contract + storage artifacts
(forward-compatible)

- Update parser/types to accept the new structured output while keeping
backward compatibility:
- Add `rollout_slug` (optional for now).
- Add `rollout_summary`.
- Keep alias support for legacy `summary` and `rawMemory` until prompt
swap completes.
- Update stage-1 schema generator in `core/src/memories/stage_one.rs` to
include the new keys.
- Update prompt templates:
- `core/templates/memories/stage_one_system.md`.
- `core/templates/memories/stage_one_input.md`.
- Replace storage model in `core/src/memories/storage.rs`:
- Introduce `rollout_summaries/` directory writer (`<thread_id>.md`
files).
- Introduce `raw_memories.md` aggregator writer from DB rows.
- Keep deterministic rebuild behavior from DB outputs so files can
always be regenerated.
- Update consolidation prompt template to reference `rollout_summaries/`
+ `raw_memories.md` inputs.

Acceptance criteria:
- Stage-1 accepts both old and new output keys during migration.
- Phase-1 artifacts are generated in new format from DB state.
- No dependence on per-thread files in `raw_memories/`.

## PR 3: Remove per-cwd memories and move to one global memory root

- Simplify layout in `core/src/memories/layout.rs`:
- Single root: `codex_home/memories`.
- Remove cwd-hash bucket helpers and normalization logic used only for
memory pathing.
- Remove scope branching from startup phase-2 dispatch path:
- No cwd/user mapping in `core/src/memories/startup/mod.rs`.
- One target root for consolidation.
- In `state/src/runtime/memory.rs`, stop enqueueing/handling cwd
consolidation scope.
- Keep one logical consolidation scope/job key (global/user) to avoid a
risky schema rewrite in same PR.
- Add one-time migration helper (core side) to preserve current shared
memory output:
- If `~/.codex/memories/user/memory` exists and new root is empty,
move/copy contents into `~/.codex/memories`.
- Leave old hashed cwd buckets untouched for now (safe/no-destructive
migration).

Acceptance criteria:
- New runs only read/write `~/.codex/memories`.
- No new cwd-scoped consolidation jobs are enqueued.
- Existing user-shared memory content is preserved.

## PR 4: Phase-2 global lock simplification and cleanup

- Replace multi-scope dispatch with a single global consolidation claim
path:
- Either reuse jobs table with one fixed key, or add a tiny dedicated
lock helper; keep 1h lease.
- Ensure at most one consolidation agent can run at once.
- Keep heartbeat + stale lock recovery semantics in
`core/src/memories/startup/watch.rs`.
- Remove dead scope code and legacy constants no longer used.
- Update tests:
- One-agent-at-a-time behavior.
- Lock expiry allows takeover after stale lease.

Acceptance criteria:
- Exactly one phase-2 consolidation agent can be active cluster-wide
(per local DB).
- Stale lock recovers automatically.

## PR 5: Final cleanup and docs

- Remove legacy artifacts and references:
- `raw_memories/` and `memory_summary.md` assumptions from
prompts/comments/tests.
- Scope constants for cwd memory pathing in core/state if fully unused.
- Update docs under `docs/` for memory workflow and directory layout.
- Add a brief operator note for rollout: compatibility window for old
stage-1 JSON keys and when to remove aliases.

Acceptance criteria:
- Code and docs reflect only the simplified global workflow.
- No stale references to per-cwd memory buckets.

## Notes on sequencing

- PR 1 is safest first because it improves correctness without changing
external artifact layout.
- PR 2 keeps parser compatibility so prompt deployment can happen
independently.
- PR 3 and PR 4 split filesystem/scope simplification from locking
simplification to reduce blast radius.
- PR 5 is intentionally cleanup-only.
This commit is contained in:
jif-oai
2026-02-10 23:10:35 +00:00
committed by GitHub
parent d8f9bb65e2
commit 623d3f4071
10 changed files with 384 additions and 566 deletions

View File

@@ -1,13 +1,12 @@
use std::path::Path;
use std::path::PathBuf;
use super::scope::MEMORY_SCOPE_KEY_USER;
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
pub(super) const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
pub(super) const LEGACY_CONSOLIDATED_FILENAME: &str = "consolidated.md";
pub(super) const SKILLS_SUBDIR: &str = "skills";
const LEGACY_USER_SUBDIR: &str = "user";
const LEGACY_MEMORY_SUBDIR: &str = "memory";
/// Returns the shared on-disk memory root directory.
@@ -46,7 +45,7 @@ pub(super) async fn ensure_layout(root: &Path) -> std::io::Result<()> {
fn legacy_user_memory_root(codex_home: &Path) -> PathBuf {
codex_home
.join("memories")
.join(MEMORY_SCOPE_KEY_USER)
.join(LEGACY_USER_SUBDIR)
.join(LEGACY_MEMORY_SUBDIR)
}

View File

@@ -2,12 +2,11 @@
//!
//! The startup memory pipeline is split into two phases:
//! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation.
//! - Phase 2: claim scopes, materialize consolidation inputs, and dispatch consolidation agents.
//! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent.
mod layout;
mod prompts;
mod rollout;
mod scope;
mod stage_one;
mod startup;
mod storage;
@@ -23,10 +22,8 @@ const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation";
const MAX_ROLLOUTS_PER_STARTUP: usize = 64;
/// Concurrency cap for startup memory extraction and consolidation scheduling.
const PHASE_ONE_CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP;
/// Concurrency cap for phase-2 consolidation dispatch.
const PHASE_TWO_CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP;
/// Maximum number of recent raw memories retained per scope.
const MAX_RAW_MEMORIES_PER_SCOPE: usize = 64;
/// Maximum number of recent raw memories retained for global consolidation.
const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 64;
/// Maximum rollout age considered for phase-1 extraction.
const PHASE_ONE_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
/// Minimum rollout idle time required before phase-1 extraction.

View File

@@ -1,2 +0,0 @@
pub(super) const MEMORY_SCOPE_KIND_USER: &str = "user";
pub(super) const MEMORY_SCOPE_KEY_USER: &str = "user";

View File

@@ -1,5 +1,6 @@
use crate::codex::Session;
use crate::config::Config;
use crate::memories::layout::memory_root;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
@@ -8,7 +9,7 @@ use tracing::debug;
use tracing::info;
use tracing::warn;
use super::super::MAX_RAW_MEMORIES_PER_SCOPE;
use super::super::MAX_RAW_MEMORIES_FOR_GLOBAL;
use super::super::MEMORY_CONSOLIDATION_SUBAGENT_LABEL;
use super::super::PHASE_TWO_JOB_LEASE_SECONDS;
use super::super::PHASE_TWO_JOB_RETRY_DELAY_SECONDS;
@@ -16,38 +17,25 @@ use super::super::prompts::build_consolidation_prompt;
use super::super::storage::rebuild_raw_memories_file_from_memories;
use super::super::storage::sync_rollout_summaries_from_memories;
use super::super::storage::wipe_consolidation_outputs;
use super::MemoryScopeTarget;
use super::watch::spawn_phase2_completion_task;
pub(super) async fn run_memory_consolidation_for_scope(
session: Arc<Session>,
pub(super) async fn run_global_memory_consolidation(
session: &Arc<Session>,
config: Arc<Config>,
scope: MemoryScopeTarget,
) {
) -> bool {
let Some(state_db) = session.services.state_db.as_deref() else {
warn!(
"state db unavailable for scope {}:{}; skipping consolidation",
scope.scope_kind, scope.scope_key
);
return;
warn!("state db unavailable; skipping global memory consolidation");
return false;
};
let claim = match state_db
.try_claim_phase2_job(
scope.scope_kind,
&scope.scope_key,
session.conversation_id,
PHASE_TWO_JOB_LEASE_SECONDS,
)
.try_claim_global_phase2_job(session.conversation_id, PHASE_TWO_JOB_LEASE_SECONDS)
.await
{
Ok(claim) => claim,
Err(err) => {
warn!(
"state db try_claim_phase2_job failed for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
return;
warn!("state db try_claim_global_phase2_job failed during memories startup: {err}");
return false;
}
};
let (ownership_token, claimed_watermark) = match claim {
@@ -56,131 +44,90 @@ pub(super) async fn run_memory_consolidation_for_scope(
input_watermark,
} => (ownership_token, input_watermark),
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => {
debug!(
"memory phase-2 scope not pending (or already up to date); skipping consolidation: {}:{}",
scope.scope_kind, scope.scope_key
);
return;
debug!("memory phase-2 global lock is up-to-date; skipping consolidation");
return false;
}
codex_state::Phase2JobClaimOutcome::SkippedRunning => {
debug!(
"memory phase-2 job already running for scope {}:{}; skipping",
scope.scope_kind, scope.scope_key
);
return;
debug!("memory phase-2 global consolidation already running; skipping");
return false;
}
};
let latest_memories = match state_db
.list_stage1_outputs_for_scope(
scope.scope_kind,
&scope.scope_key,
MAX_RAW_MEMORIES_PER_SCOPE,
)
.list_stage1_outputs_for_global(MAX_RAW_MEMORIES_FOR_GLOBAL)
.await
{
Ok(memories) => memories,
Err(err) => {
warn!(
"state db list_stage1_outputs_for_scope failed during consolidation for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}");
let _ = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
"failed to read scope stage-1 outputs before consolidation",
"failed to read stage-1 outputs before global consolidation",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.await;
return;
return false;
}
};
if latest_memories.is_empty() {
debug!(
"memory phase-2 scope has no stage-1 outputs; skipping consolidation: {}:{}",
scope.scope_kind, scope.scope_key
);
debug!("memory phase-2 has no stage-1 outputs; skipping global consolidation");
let _ = state_db
.mark_phase2_job_succeeded(
scope.scope_kind,
&scope.scope_key,
&ownership_token,
claimed_watermark,
)
.mark_global_phase2_job_succeeded(&ownership_token, claimed_watermark)
.await;
return;
return false;
};
let root = memory_root(&config.codex_home);
let materialized_watermark = latest_memories
.iter()
.map(|memory| memory.source_updated_at.timestamp())
.max()
.unwrap_or(claimed_watermark);
if let Err(err) =
sync_rollout_summaries_from_memories(&scope.memory_root, &latest_memories).await
{
warn!(
"failed syncing phase-1 rollout summaries for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await {
warn!("failed syncing phase-1 rollout summaries for global consolidation: {err}");
let _ = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
"failed syncing phase-1 rollout summaries",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.await;
return;
return false;
}
if let Err(err) =
rebuild_raw_memories_file_from_memories(&scope.memory_root, &latest_memories).await
{
warn!(
"failed rebuilding raw memories aggregate for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await {
warn!("failed rebuilding raw memories aggregate for global consolidation: {err}");
let _ = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
"failed rebuilding raw memories aggregate",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.await;
return;
return false;
}
if let Err(err) = wipe_consolidation_outputs(&scope.memory_root).await {
warn!(
"failed to wipe previous consolidation outputs for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
if let Err(err) = wipe_consolidation_outputs(&root).await {
warn!("failed to wipe previous global consolidation outputs: {err}");
let _ = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
"failed to wipe previous consolidation outputs",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.await;
return;
return false;
}
let prompt = build_consolidation_prompt(&scope.memory_root);
let prompt = build_consolidation_prompt(&root);
let input = vec![UserInput::Text {
text: prompt,
text_elements: vec![],
}];
let mut consolidation_config = config.as_ref().clone();
consolidation_config.cwd = scope.memory_root.clone();
consolidation_config.cwd = root.clone();
let source = SessionSource::SubAgent(SubAgentSource::Other(
MEMORY_CONSOLIDATION_SUBAGENT_LABEL.to_string(),
));
@@ -193,31 +140,212 @@ pub(super) async fn run_memory_consolidation_for_scope(
{
Ok(consolidation_agent_id) => {
info!(
"memory phase-2 consolidation agent started: scope={} scope_key={} agent_id={}",
scope.scope_kind, scope.scope_key, consolidation_agent_id
"memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}"
);
spawn_phase2_completion_task(
session.as_ref(),
scope,
ownership_token,
materialized_watermark,
consolidation_agent_id,
);
true
}
Err(err) => {
warn!(
"failed to spawn memory consolidation agent for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
warn!("failed to spawn global memory consolidation agent: {err}");
let _ = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
"failed to spawn consolidation agent",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.await;
false
}
}
}
#[cfg(test)]
mod tests {
use super::run_global_memory_consolidation;
use crate::CodexAuth;
use crate::ThreadManager;
use crate::codex::Session;
use crate::codex::make_session_and_context;
use crate::config::Config;
use crate::config::test_config;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_state::Phase2JobClaimOutcome;
use codex_state::ThreadMetadataBuilder;
use pretty_assertions::assert_eq;
use std::sync::Arc;
use tempfile::TempDir;
struct DispatchHarness {
_codex_home: TempDir,
config: Arc<Config>,
session: Arc<Session>,
manager: ThreadManager,
state_db: Arc<codex_state::StateRuntime>,
}
impl DispatchHarness {
async fn new() -> Self {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let mut config = test_config();
config.codex_home = codex_home.path().to_path_buf();
config.cwd = config.codex_home.clone();
let config = Arc::new(config);
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");
let manager = ThreadManager::with_models_provider_and_home(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let (mut session, _turn_context) = make_session_and_context().await;
session.services.state_db = Some(Arc::clone(&state_db));
session.services.agent_control = manager.agent_control();
Self {
_codex_home: codex_home,
config,
session: Arc::new(session),
manager,
state_db,
}
}
async fn seed_stage1_output(&self, source_updated_at: i64) {
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
self.config
.codex_home
.join(format!("rollout-{thread_id}.jsonl")),
Utc::now(),
SessionSource::Cli,
);
metadata_builder.cwd = self.config.cwd.clone();
metadata_builder.model_provider = Some(self.config.model_provider_id.clone());
let metadata = metadata_builder.build(&self.config.model_provider_id);
self.state_db
.upsert_thread(&metadata)
.await
.expect("upsert thread metadata");
let claim = self
.state_db
.try_claim_stage1_job(
thread_id,
self.session.conversation_id,
source_updated_at,
3_600,
64,
)
.await
.expect("claim stage-1 job");
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
self.state_db
.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
source_updated_at,
"raw memory",
"rollout summary",
)
.await
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
}
async fn shutdown_threads(&self) {
self.manager
.remove_and_close_all_threads()
.await
.expect("shutdown spawned threads");
}
}
#[tokio::test]
async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
let stale_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 0)
.await
.expect("claim stale global lock");
assert!(
matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }),
"stale lock precondition should be claimed"
);
let scheduled =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(
scheduled,
"dispatch should reclaim stale lock and spawn one agent"
);
let running_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim while running");
assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning);
let user_input_ops = harness
.manager
.captured_ops()
.into_iter()
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
.count();
assert_eq!(user_input_ops, 1);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_schedules_only_one_agent_while_lock_is_running() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(200).await;
let first_run =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
let second_run =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(first_run, "first dispatch should schedule consolidation");
assert!(
!second_run,
"second dispatch should skip while the global lock is running"
);
let user_input_ops = harness
.manager
.captured_ops()
.into_iter()
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
.count();
assert_eq!(user_input_ops, 1);
harness.shutdown_threads().await;
}
}

View File

@@ -7,10 +7,7 @@ use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::memories::layout::memory_root;
use crate::memories::layout::migrate_legacy_user_memory_root_if_needed;
use crate::memories::scope::MEMORY_SCOPE_KEY_USER;
use crate::memories::scope::MEMORY_SCOPE_KIND_USER;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use codex_otel::OtelManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
@@ -19,7 +16,6 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use futures::StreamExt;
use serde_json::Value;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
@@ -47,52 +43,6 @@ impl StageOneRequestContext {
}
}
/// Canonical memory scope metadata used by both startup phases.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(super) struct MemoryScopeTarget {
/// Scope family used for DB ownership and dirty-state tracking.
pub(super) scope_kind: &'static str,
/// Scope identifier used for DB keys.
pub(super) scope_key: String,
/// On-disk root where phase-1 artifacts and phase-2 outputs live.
pub(super) memory_root: PathBuf,
}
/// Converts a pending scope consolidation row into a concrete filesystem target for phase 2.
///
/// Unsupported scope kinds or malformed keys are ignored.
pub(super) fn memory_scope_target_for_pending_scope(
config: &Config,
pending_scope: codex_state::PendingScopeConsolidation,
) -> Option<MemoryScopeTarget> {
let scope_kind = pending_scope.scope_kind;
let scope_key = pending_scope.scope_key;
match scope_kind.as_str() {
MEMORY_SCOPE_KIND_USER => {
if scope_key != MEMORY_SCOPE_KEY_USER {
warn!(
"skipping unsupported user memory scope key for phase-2: {}:{}",
scope_kind, scope_key
);
return None;
}
Some(MemoryScopeTarget {
scope_kind: MEMORY_SCOPE_KIND_USER,
scope_key,
memory_root: memory_root(&config.codex_home),
})
}
_ => {
warn!(
"skipping unsupported memory scope for phase-2 consolidation: {}:{}",
scope_kind, scope_key
);
None
}
}
}
/// Starts the asynchronous startup memory pipeline for an eligible root session.
///
/// The pipeline is skipped for ephemeral sessions, disabled feature flags, and
@@ -125,7 +75,7 @@ pub(crate) fn start_memories_startup_task(
/// Phase 1 selects rollout candidates, performs stage-1 extraction requests in
/// parallel, persists stage-1 outputs, and enqueues consolidation work.
///
/// Phase 2 claims pending scopes and spawns consolidation agents.
/// Phase 2 claims a global consolidation lock and spawns one consolidation agent.
pub(super) async fn run_memories_startup_pipeline(
session: &Arc<Session>,
config: Arc<Config>,
@@ -236,115 +186,15 @@ pub(super) async fn run_memories_startup_pipeline(
claimed_count, succeeded_count
);
let consolidation_scope_count = run_consolidation_dispatch(session, config).await;
let consolidation_job_count = run_consolidation_dispatch(session, config).await;
info!(
"memory consolidation dispatch complete: {} scope(s) scheduled",
consolidation_scope_count
"memory consolidation dispatch complete: {} job(s) scheduled",
consolidation_job_count
);
Ok(())
}
async fn run_consolidation_dispatch(session: &Arc<Session>, config: Arc<Config>) -> usize {
let scopes = list_consolidation_scopes(
session.as_ref(),
config.as_ref(),
super::MAX_ROLLOUTS_PER_STARTUP,
)
.await;
let consolidation_scope_count = scopes.len();
futures::stream::iter(scopes.into_iter())
.map(|scope| {
let session = Arc::clone(session);
let config = Arc::clone(&config);
async move {
dispatch::run_memory_consolidation_for_scope(session, config, scope).await;
}
})
.buffer_unordered(super::PHASE_TWO_CONCURRENCY_LIMIT)
.collect::<Vec<_>>()
.await;
consolidation_scope_count
}
async fn list_consolidation_scopes(
session: &Session,
config: &Config,
limit: usize,
) -> Vec<MemoryScopeTarget> {
if limit == 0 {
return Vec::new();
}
let Some(state_db) = session.services.state_db.as_deref() else {
return Vec::new();
};
let pending_scopes = match state_db.list_pending_scope_consolidations(limit).await {
Ok(scopes) => scopes,
Err(_) => return Vec::new(),
};
pending_scopes
.into_iter()
.filter_map(|scope| memory_scope_target_for_pending_scope(config, scope))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::test_config;
/// Verifies that phase-2 pending scope rows are translated only for supported scopes.
#[test]
fn pending_scope_mapping_accepts_supported_scopes_only() {
let mut config = test_config();
config.codex_home = "/tmp/memory-startup-test-home".into();
let user_target = memory_scope_target_for_pending_scope(
&config,
codex_state::PendingScopeConsolidation {
scope_kind: MEMORY_SCOPE_KIND_USER.to_string(),
scope_key: MEMORY_SCOPE_KEY_USER.to_string(),
},
)
.expect("valid user scope should map");
assert_eq!(user_target.scope_kind, MEMORY_SCOPE_KIND_USER);
assert!(
memory_scope_target_for_pending_scope(
&config,
codex_state::PendingScopeConsolidation {
scope_kind: MEMORY_SCOPE_KIND_USER.to_string(),
scope_key: "unexpected-user-key".to_string(),
},
)
.is_none()
);
assert!(
memory_scope_target_for_pending_scope(
&config,
codex_state::PendingScopeConsolidation {
scope_kind: "cwd".to_string(),
scope_key: "/tmp/project-a".to_string(),
},
)
.is_none()
);
assert!(
memory_scope_target_for_pending_scope(
&config,
codex_state::PendingScopeConsolidation {
scope_kind: "unknown".to_string(),
scope_key: "scope".to_string(),
},
)
.is_none()
);
}
usize::from(dispatch::run_global_memory_consolidation(session, config).await)
}

View File

@@ -10,11 +10,9 @@ use tracing::warn;
use super::super::PHASE_TWO_JOB_HEARTBEAT_SECONDS;
use super::super::PHASE_TWO_JOB_LEASE_SECONDS;
use super::super::PHASE_TWO_JOB_RETRY_DELAY_SECONDS;
use super::MemoryScopeTarget;
pub(super) fn spawn_phase2_completion_task(
session: &Session,
scope: MemoryScopeTarget,
ownership_token: String,
completion_watermark: i64,
consolidation_agent_id: ThreadId,
@@ -31,13 +29,10 @@ pub(super) fn spawn_phase2_completion_task(
Ok(status_rx) => status_rx,
Err(err) => {
warn!(
"failed to subscribe to memory consolidation agent {} for scope {}:{}: {err}",
consolidation_agent_id, scope.scope_kind, scope.scope_key
"failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}"
);
let _ = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
"failed to subscribe to consolidation agent status",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
@@ -61,33 +56,26 @@ pub(super) fn spawn_phase2_completion_task(
changed = status_rx.changed() => {
if changed.is_err() {
warn!(
"lost status updates for memory consolidation agent {} in scope {}:{}",
consolidation_agent_id, scope.scope_kind, scope.scope_key
"lost status updates for global memory consolidation agent {consolidation_agent_id}"
);
break status;
}
}
_ = heartbeat_interval.tick() => {
match state_db
.heartbeat_phase2_job(
scope.scope_kind,
&scope.scope_key,
&ownership_token,
PHASE_TWO_JOB_LEASE_SECONDS,
)
.heartbeat_global_phase2_job(&ownership_token, PHASE_TWO_JOB_LEASE_SECONDS)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 heartbeat lost ownership for scope {}:{}; skipping finalization",
scope.scope_kind, scope.scope_key
"memory phase-2 heartbeat lost global ownership; skipping finalization"
);
return;
}
Err(err) => {
warn!(
"state db heartbeat_phase2_job failed during memories startup: {err}"
"state db heartbeat_global_phase2_job failed during memories startup: {err}"
);
return;
}
@@ -98,39 +86,30 @@ pub(super) fn spawn_phase2_completion_task(
if is_phase2_success(&final_status) {
match state_db
.mark_phase2_job_succeeded(
scope.scope_kind,
&scope.scope_key,
&ownership_token,
completion_watermark,
)
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 success finalization skipped after ownership changed: scope={} scope_key={}",
scope.scope_kind, scope.scope_key
"memory phase-2 success finalization skipped after global ownership changed"
);
}
Err(err) => {
warn!(
"state db mark_phase2_job_succeeded failed during memories startup: {err}"
"state db mark_global_phase2_job_succeeded failed during memories startup: {err}"
);
}
}
info!(
"memory phase-2 consolidation agent finished: scope={} scope_key={} agent_id={} final_status={final_status:?}",
scope.scope_kind, scope.scope_key, consolidation_agent_id
"memory phase-2 global consolidation agent finished: agent_id={consolidation_agent_id} final_status={final_status:?}"
);
return;
}
let failure_reason = phase2_failure_reason(&final_status);
match state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
.mark_global_phase2_job_failed(
&ownership_token,
&failure_reason,
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
@@ -140,17 +119,17 @@ pub(super) fn spawn_phase2_completion_task(
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 failure finalization skipped after ownership changed: scope={} scope_key={}",
scope.scope_kind, scope.scope_key
"memory phase-2 failure finalization skipped after global ownership changed"
);
}
Err(err) => {
warn!("state db mark_phase2_job_failed failed during memories startup: {err}");
warn!(
"state db mark_global_phase2_job_failed failed during memories startup: {err}"
);
}
}
warn!(
"memory phase-2 consolidation agent finished with non-success status: scope={} scope_key={} agent_id={} final_status={final_status:?}",
scope.scope_kind, scope.scope_key, consolidation_agent_id
"memory phase-2 global consolidation agent finished with non-success status: agent_id={consolidation_agent_id} final_status={final_status:?}"
);
});
}

View File

@@ -4,7 +4,7 @@ use std::fmt::Write as _;
use std::path::Path;
use tracing::warn;
use super::MAX_RAW_MEMORIES_PER_SCOPE;
use super::MAX_RAW_MEMORIES_FOR_GLOBAL;
use super::text::compact_whitespace;
use crate::memories::layout::LEGACY_CONSOLIDATED_FILENAME;
use crate::memories::layout::MEMORY_REGISTRY_FILENAME;
@@ -31,7 +31,7 @@ pub(super) async fn sync_rollout_summaries_from_memories(
let retained = memories
.iter()
.take(MAX_RAW_MEMORIES_PER_SCOPE)
.take(MAX_RAW_MEMORIES_FOR_GLOBAL)
.collect::<Vec<_>>();
let keep = retained
.iter()
@@ -77,7 +77,7 @@ pub(super) async fn wipe_consolidation_outputs(root: &Path) -> std::io::Result<(
async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> {
let retained = memories
.iter()
.take(MAX_RAW_MEMORIES_PER_SCOPE)
.take(MAX_RAW_MEMORIES_FOR_GLOBAL)
.collect::<Vec<_>>();
let mut body = String::from("# Raw Memories\n\n");

View File

@@ -31,7 +31,6 @@ pub use model::Stage1Output;
pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
pub use runtime::PendingScopeConsolidation;
pub use runtime::Phase2JobClaimOutcome;
pub use runtime::STATE_DB_FILENAME;
pub use runtime::STATE_DB_VERSION;

View File

@@ -39,9 +39,6 @@ use uuid::Uuid;
pub const STATE_DB_FILENAME: &str = "state";
pub const STATE_DB_VERSION: u32 = 4;
const MEMORY_SCOPE_KIND_USER: &str = "user";
const MEMORY_SCOPE_KEY_USER: &str = "user";
const METRIC_DB_INIT: &str = "codex.db.init";
mod memory;
@@ -86,27 +83,18 @@ pub struct Stage1StartupClaimParams<'a> {
pub lease_seconds: i64,
}
/// Scope row used to queue phase-2 consolidation work.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PendingScopeConsolidation {
/// Scope family (`user`).
pub scope_kind: String,
/// Scope identifier keyed by `scope_kind`.
pub scope_key: String,
}
/// Result of trying to claim a phase-2 consolidation job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Phase2JobClaimOutcome {
/// The caller owns the scope and should spawn consolidation.
/// The caller owns the global lock and should spawn consolidation.
Claimed {
ownership_token: String,
/// Snapshot of `input_watermark` at claim time.
input_watermark: i64,
},
/// The scope is not pending consolidation (or is already up to date).
/// The global job is not pending consolidation (or is already up to date).
SkippedNotDirty,
/// Another worker currently owns a fresh lease for this scope.
/// Another worker currently owns a fresh global consolidation lease.
SkippedRunning,
}
@@ -926,7 +914,6 @@ fn push_thread_order_and_limit(
#[cfg(test)]
mod tests {
use super::PendingScopeConsolidation;
use super::Phase2JobClaimOutcome;
use super::STATE_DB_FILENAME;
use super::STATE_DB_VERSION;
@@ -1463,7 +1450,7 @@ WHERE kind = 'memory_stage1'
}
#[tokio::test]
async fn phase2_consolidation_jobs_rerun_when_watermark_advances() {
async fn phase2_global_consolidation_reruns_when_watermark_advances() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
@@ -1472,24 +1459,12 @@ WHERE kind = 'memory_stage1'
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.enqueue_scope_consolidation("user", "user", 100)
.enqueue_global_consolidation(100)
.await
.expect("enqueue scope");
let scopes = runtime
.list_pending_scope_consolidations(10)
.await
.expect("list pending");
assert_eq!(
scopes,
vec![PendingScopeConsolidation {
scope_kind: "user".to_string(),
scope_key: "user".to_string(),
}]
);
.expect("enqueue global consolidation");
let claim = runtime
.try_claim_phase2_job("user", "user", owner, 3600)
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2");
let (ownership_token, input_watermark) = match claim {
@@ -1501,30 +1476,25 @@ WHERE kind = 'memory_stage1'
};
assert!(
runtime
.mark_phase2_job_succeeded(
"user",
"user",
ownership_token.as_str(),
input_watermark,
)
.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
.await
.expect("mark phase2 succeeded"),
"phase2 success should finalize for current token"
);
let claim_up_to_date = runtime
.try_claim_phase2_job("user", "user", owner, 3600)
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 up-to-date");
assert_eq!(claim_up_to_date, Phase2JobClaimOutcome::SkippedNotDirty);
runtime
.enqueue_scope_consolidation("user", "user", 101)
.enqueue_global_consolidation(101)
.await
.expect("enqueue scope again");
.expect("enqueue global consolidation again");
let claim_rerun = runtime
.try_claim_phase2_job("user", "user", owner, 3600)
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 rerun");
assert!(
@@ -1536,7 +1506,7 @@ WHERE kind = 'memory_stage1'
}
#[tokio::test]
async fn list_stage1_outputs_for_user_scope_returns_latest_outputs() {
async fn list_stage1_outputs_for_global_returns_latest_outputs() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
@@ -1607,9 +1577,9 @@ WHERE kind = 'memory_stage1'
);
let outputs = runtime
.list_stage1_outputs_for_scope("user", "user", 10)
.list_stage1_outputs_for_global(10)
.await
.expect("list stage1 outputs for user scope");
.expect("list stage1 outputs for global");
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].thread_id, thread_id_b);
assert_eq!(outputs[0].summary, "summary b");
@@ -1620,7 +1590,7 @@ WHERE kind = 'memory_stage1'
}
#[tokio::test]
async fn mark_stage1_job_succeeded_enqueues_single_user_scope() {
async fn mark_stage1_job_succeeded_enqueues_global_consolidation() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
@@ -1679,106 +1649,116 @@ WHERE kind = 'memory_stage1'
"stage1 success should persist output for thread b"
);
let pending_scopes = runtime
.list_pending_scope_consolidations(10)
let claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("list pending scopes");
assert_eq!(
pending_scopes,
vec![PendingScopeConsolidation {
scope_kind: "user".to_string(),
scope_key: "user".to_string(),
}]
);
.expect("claim global consolidation");
let input_watermark = match claim {
Phase2JobClaimOutcome::Claimed {
input_watermark, ..
} => input_watermark,
other => panic!("unexpected global consolidation claim outcome: {other:?}"),
};
assert_eq!(input_watermark, 101);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn list_pending_scope_consolidations_omits_unclaimable_jobs() {
async fn phase2_global_lock_allows_only_one_fresh_runner() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.enqueue_scope_consolidation("user", "scope-running", 200)
.enqueue_global_consolidation(200)
.await
.expect("enqueue running scope");
runtime
.enqueue_scope_consolidation("user", "scope-backoff", 199)
.await
.expect("enqueue backoff scope");
runtime
.enqueue_scope_consolidation("user", "scope-exhausted", 198)
.await
.expect("enqueue exhausted scope");
runtime
.enqueue_scope_consolidation("user", "scope-claimable-a", 90)
.await
.expect("enqueue claimable scope a");
runtime
.enqueue_scope_consolidation("user", "scope-claimable-b", 89)
.await
.expect("enqueue claimable scope b");
.expect("enqueue global consolidation");
let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner a");
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
let running_claim = runtime
.try_claim_phase2_job("user", "scope-running", owner, 3600)
.try_claim_global_phase2_job(owner_a, 3600)
.await
.expect("claim running scope");
.expect("claim global lock");
assert!(
matches!(running_claim, Phase2JobClaimOutcome::Claimed { .. }),
"scope-running should be claimed"
"first owner should claim global lock"
);
let backoff_claim = runtime
.try_claim_phase2_job("user", "scope-backoff", owner, 3600)
let second_claim = runtime
.try_claim_global_phase2_job(owner_b, 3600)
.await
.expect("claim backoff scope");
let backoff_token = match backoff_claim {
.expect("claim global lock from second owner");
assert_eq!(second_claim, Phase2JobClaimOutcome::SkippedRunning);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_global_lock_stale_lease_allows_takeover() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
runtime
.enqueue_global_consolidation(300)
.await
.expect("enqueue global consolidation");
let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner a");
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
let initial_claim = runtime
.try_claim_global_phase2_job(owner_a, 3600)
.await
.expect("claim initial global lock");
let token_a = match initial_claim {
Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} => ownership_token,
other => panic!("unexpected backoff claim outcome: {other:?}"),
other => panic!("unexpected initial claim outcome: {other:?}"),
};
assert!(
runtime
.mark_phase2_job_failed(
"user",
"scope-backoff",
backoff_token.as_str(),
"temporary failure",
3600,
)
.await
.expect("mark backoff scope failed"),
"backoff scope should transition to retry backoff"
);
sqlx::query("UPDATE jobs SET retry_remaining = 0 WHERE kind = ? AND job_key = ?")
.bind("memory_consolidate_user")
.bind("scope-exhausted")
sqlx::query("UPDATE jobs SET lease_until = ? WHERE kind = ? AND job_key = ?")
.bind(Utc::now().timestamp() - 1)
.bind("memory_consolidate_global")
.bind("global")
.execute(runtime.pool.as_ref())
.await
.expect("set exhausted scope retries to zero");
.expect("expire global consolidation lease");
let pending = runtime
.list_pending_scope_consolidations(2)
let takeover_claim = runtime
.try_claim_global_phase2_job(owner_b, 3600)
.await
.expect("list pending scopes");
.expect("claim stale global lock");
let (token_b, input_watermark) = match takeover_claim {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("unexpected takeover claim outcome: {other:?}"),
};
assert_ne!(token_a, token_b);
assert_eq!(input_watermark, 300);
assert_eq!(
pending,
vec![
PendingScopeConsolidation {
scope_kind: "user".to_string(),
scope_key: "scope-claimable-a".to_string(),
},
PendingScopeConsolidation {
scope_kind: "user".to_string(),
scope_key: "scope-claimable-b".to_string(),
},
]
runtime
.mark_global_phase2_job_succeeded(token_a.as_str(), 300)
.await
.expect("mark stale owner success result"),
false,
"stale owner should lose finalization ownership after takeover"
);
assert!(
runtime
.mark_global_phase2_job_succeeded(token_b.as_str(), 300)
.await
.expect("mark takeover owner success"),
"takeover owner should finalize consolidation"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;

View File

@@ -8,26 +8,11 @@ use sqlx::QueryBuilder;
use sqlx::Sqlite;
const JOB_KIND_MEMORY_STAGE1: &str = "memory_stage1";
const JOB_KIND_MEMORY_CONSOLIDATE_USER: &str = "memory_consolidate_user";
const JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL: &str = "memory_consolidate_global";
const MEMORY_CONSOLIDATION_JOB_KEY: &str = "global";
const DEFAULT_RETRY_REMAINING: i64 = 3;
fn job_kind_for_scope(scope_kind: &str) -> Option<&'static str> {
if scope_kind == MEMORY_SCOPE_KIND_USER {
Some(JOB_KIND_MEMORY_CONSOLIDATE_USER)
} else {
None
}
}
fn scope_kind_for_job_kind(job_kind: &str) -> Option<&'static str> {
if job_kind == JOB_KIND_MEMORY_CONSOLIDATE_USER {
Some(MEMORY_SCOPE_KIND_USER)
} else {
None
}
}
impl StateRuntime {
pub async fn claim_stage1_jobs_for_startup(
&self,
@@ -145,33 +130,26 @@ WHERE thread_id = ?
.transpose()
}
pub async fn list_stage1_outputs_for_scope(
pub async fn list_stage1_outputs_for_global(
&self,
scope_kind: &str,
_scope_key: &str,
n: usize,
) -> anyhow::Result<Vec<Stage1Output>> {
if n == 0 {
return Ok(Vec::new());
}
let rows = match scope_kind {
MEMORY_SCOPE_KIND_USER => {
sqlx::query(
r#"
let rows = sqlx::query(
r#"
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.summary, so.generated_at
FROM stage1_outputs AS so
JOIN threads AS t ON t.id = so.thread_id
ORDER BY so.source_updated_at DESC, so.thread_id DESC
LIMIT ?
"#,
)
.bind(n as i64)
.fetch_all(self.pool.as_ref())
.await?
}
_ => return Ok(Vec::new()),
};
"#,
)
.bind(n as i64)
.fetch_all(self.pool.as_ref())
.await?;
rows.into_iter()
.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
@@ -408,13 +386,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
.execute(&mut *tx)
.await?;
enqueue_scope_consolidation_with_executor(
&mut *tx,
MEMORY_SCOPE_KIND_USER,
MEMORY_SCOPE_KEY_USER,
source_updated_at,
)
.await?;
enqueue_global_consolidation_with_executor(&mut *tx, source_updated_at).await?;
tx.commit().await?;
Ok(true)
@@ -458,77 +430,16 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
pub async fn enqueue_scope_consolidation(
&self,
scope_kind: &str,
scope_key: &str,
input_watermark: i64,
) -> anyhow::Result<()> {
enqueue_scope_consolidation_with_executor(
self.pool.as_ref(),
scope_kind,
scope_key,
input_watermark,
)
.await
pub async fn enqueue_global_consolidation(&self, input_watermark: i64) -> anyhow::Result<()> {
enqueue_global_consolidation_with_executor(self.pool.as_ref(), input_watermark).await
}
pub async fn list_pending_scope_consolidations(
/// Try to claim the global phase-2 consolidation job.
pub async fn try_claim_global_phase2_job(
&self,
limit: usize,
) -> anyhow::Result<Vec<PendingScopeConsolidation>> {
if limit == 0 {
return Ok(Vec::new());
}
let now = Utc::now().timestamp();
let rows = sqlx::query(
r#"
SELECT kind, job_key
FROM jobs
WHERE kind = ?
AND input_watermark IS NOT NULL
AND input_watermark > COALESCE(last_success_watermark, 0)
AND retry_remaining > 0
AND (retry_at IS NULL OR retry_at <= ?)
AND (status != 'running' OR lease_until IS NULL OR lease_until <= ?)
ORDER BY input_watermark DESC, kind ASC, job_key ASC
LIMIT ?
"#,
)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_USER)
.bind(now)
.bind(now)
.bind(limit as i64)
.fetch_all(self.pool.as_ref())
.await?;
Ok(rows
.into_iter()
.filter_map(|row| {
let kind: String = row.try_get("kind").ok()?;
let scope_kind = scope_kind_for_job_kind(&kind)?;
let scope_key: String = row.try_get("job_key").ok()?;
Some(PendingScopeConsolidation {
scope_kind: scope_kind.to_string(),
scope_key,
})
})
.collect::<Vec<_>>())
}
/// Try to claim a phase-2 consolidation job for `(scope_kind, scope_key)`.
pub async fn try_claim_phase2_job(
&self,
scope_kind: &str,
scope_key: &str,
worker_id: ThreadId,
lease_seconds: i64,
) -> anyhow::Result<Phase2JobClaimOutcome> {
let Some(job_kind) = job_kind_for_scope(scope_kind) else {
return Ok(Phase2JobClaimOutcome::SkippedNotDirty);
};
let now = Utc::now().timestamp();
let lease_until = now.saturating_add(lease_seconds.max(0));
let ownership_token = Uuid::new_v4().to_string();
@@ -543,8 +454,8 @@ FROM jobs
WHERE kind = ? AND job_key = ?
"#,
)
.bind(job_kind)
.bind(scope_key)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.fetch_optional(&mut *tx)
.await?;
@@ -593,6 +504,7 @@ SET
retry_at = NULL,
last_error = NULL
WHERE kind = ? AND job_key = ?
AND input_watermark > COALESCE(last_success_watermark, 0)
AND (status != 'running' OR lease_until IS NULL OR lease_until <= ?)
AND (retry_at IS NULL OR retry_at <= ?)
AND retry_remaining > 0
@@ -602,8 +514,8 @@ WHERE kind = ? AND job_key = ?
.bind(ownership_token.as_str())
.bind(now)
.bind(lease_until)
.bind(job_kind)
.bind(scope_key)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.bind(now)
.bind(now)
.execute(&mut *tx)
@@ -621,17 +533,11 @@ WHERE kind = ? AND job_key = ?
}
}
pub async fn heartbeat_phase2_job(
pub async fn heartbeat_global_phase2_job(
&self,
scope_kind: &str,
scope_key: &str,
ownership_token: &str,
lease_seconds: i64,
) -> anyhow::Result<bool> {
let Some(job_kind) = job_kind_for_scope(scope_kind) else {
return Ok(false);
};
let now = Utc::now().timestamp();
let lease_until = now.saturating_add(lease_seconds.max(0));
let rows_affected = sqlx::query(
@@ -643,8 +549,8 @@ WHERE kind = ? AND job_key = ?
"#,
)
.bind(lease_until)
.bind(job_kind)
.bind(scope_key)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.bind(ownership_token)
.execute(self.pool.as_ref())
.await?
@@ -653,17 +559,11 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
pub async fn mark_phase2_job_succeeded(
pub async fn mark_global_phase2_job_succeeded(
&self,
scope_kind: &str,
scope_key: &str,
ownership_token: &str,
completed_watermark: i64,
) -> anyhow::Result<bool> {
let Some(job_kind) = job_kind_for_scope(scope_kind) else {
return Ok(false);
};
let now = Utc::now().timestamp();
let rows_affected = sqlx::query(
r#"
@@ -680,8 +580,8 @@ WHERE kind = ? AND job_key = ?
)
.bind(now)
.bind(completed_watermark)
.bind(job_kind)
.bind(scope_key)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.bind(ownership_token)
.execute(self.pool.as_ref())
.await?
@@ -690,18 +590,12 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
pub async fn mark_phase2_job_failed(
pub async fn mark_global_phase2_job_failed(
&self,
scope_kind: &str,
scope_key: &str,
ownership_token: &str,
failure_reason: &str,
retry_delay_seconds: i64,
) -> anyhow::Result<bool> {
let Some(job_kind) = job_kind_for_scope(scope_kind) else {
return Ok(false);
};
let now = Utc::now().timestamp();
let retry_at = now.saturating_add(retry_delay_seconds.max(0));
let rows_affected = sqlx::query(
@@ -721,8 +615,8 @@ WHERE kind = ? AND job_key = ?
.bind(now)
.bind(retry_at)
.bind(failure_reason)
.bind(job_kind)
.bind(scope_key)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.bind(ownership_token)
.execute(self.pool.as_ref())
.await?
@@ -732,19 +626,13 @@ WHERE kind = ? AND job_key = ?
}
}
async fn enqueue_scope_consolidation_with_executor<'e, E>(
async fn enqueue_global_consolidation_with_executor<'e, E>(
executor: E,
scope_kind: &str,
scope_key: &str,
input_watermark: i64,
) -> anyhow::Result<()>
where
E: Executor<'e, Database = Sqlite>,
{
let Some(job_kind) = job_kind_for_scope(scope_kind) else {
return Ok(());
};
sqlx::query(
r#"
INSERT INTO jobs (
@@ -775,8 +663,8 @@ ON CONFLICT(kind, job_key) DO UPDATE SET
input_watermark = max(COALESCE(jobs.input_watermark, 0), excluded.input_watermark)
"#,
)
.bind(job_kind)
.bind(scope_key)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.bind(DEFAULT_RETRY_REMAINING)
.bind(input_watermark)
.execute(executor)