feat: phase 2 consolidation (#11306)

Consolidation phase of memories

Cleaning and better handling of concurrency
This commit is contained in:
jif-oai
2026-02-10 14:31:16 +00:00
committed by GitHub
parent d735df1f50
commit e57892b211
5 changed files with 907 additions and 252 deletions

View File

@@ -9,8 +9,10 @@ const MEMORY_STARTUP_STAGE: &str = "run_memories_startup_pipeline";
const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000;
const PHASE_ONE_DB_LOCK_RETRY_LIMIT: usize = 3;
const PHASE_ONE_DB_LOCK_RETRY_BACKOFF_MS: u64 = 25;
const PHASE_TWO_DB_LOCK_RETRY_LIMIT: usize = 3;
const PHASE_TWO_DB_LOCK_RETRY_BACKOFF_MS: u64 = 25;
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
struct MemoryScopeTarget {
scope_kind: &'static str,
scope_key: String,
@@ -108,17 +110,14 @@ pub(super) async fn run_memories_startup_pipeline(
page.items.len()
);
if claimed_candidates.is_empty() {
return Ok(());
}
let stage_one_context = StageOneRequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
);
let touched_scope_sets =
futures::stream::iter(claimed_candidates.into_iter())
let touched_scope_count = if claimed_candidates.is_empty() {
0
} else {
let stage_one_context = StageOneRequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
);
let touched_scope_counts = futures::stream::iter(claimed_candidates.into_iter())
.map(|claimed_candidate| {
let session = Arc::clone(session);
let stage_one_context = stage_one_context.clone();
@@ -127,23 +126,20 @@ pub(super) async fn run_memories_startup_pipeline(
}
})
.buffer_unordered(memories::PHASE_ONE_CONCURRENCY_LIMIT)
.collect::<Vec<HashSet<MemoryScopeTarget>>>()
.collect::<Vec<usize>>()
.await;
let touched_scopes = touched_scope_sets
.into_iter()
.flatten()
.collect::<HashSet<MemoryScopeTarget>>();
touched_scope_counts.into_iter().sum::<usize>()
};
info!(
"memory phase-1 extraction complete: {} scope(s) touched",
touched_scopes.len()
touched_scope_count
);
if touched_scopes.is_empty() {
return Ok(());
}
let consolidation_scope_count = touched_scopes.len();
futures::stream::iter(touched_scopes.into_iter())
let dirty_scopes =
list_phase2_dirty_scopes(session, config.as_ref(), memories::MAX_ROLLOUTS_PER_STARTUP)
.await;
let consolidation_scope_count = dirty_scopes.len();
futures::stream::iter(dirty_scopes.into_iter())
.map(|scope| {
let session = Arc::clone(session);
let config = Arc::clone(&config);
@@ -252,11 +248,111 @@ async fn try_claim_phase1_job_with_retry(
None
}
async fn list_phase2_dirty_scopes(
session: &Session,
config: &Config,
limit: usize,
) -> Vec<MemoryScopeTarget> {
if limit == 0 {
return Vec::new();
}
let Some(state_db) = session.services.state_db.as_deref() else {
return Vec::new();
};
let dirty_scopes = match state_db.list_dirty_memory_scopes(limit).await {
Ok(scopes) => scopes,
Err(err) => {
warn!("state db list_dirty_memory_scopes failed during {MEMORY_STARTUP_STAGE}: {err}");
return Vec::new();
}
};
dirty_scopes
.into_iter()
.filter_map(|dirty_scope| memory_scope_target_for_dirty_scope(config, dirty_scope))
.collect()
}
fn memory_scope_target_for_dirty_scope(
config: &Config,
dirty_scope: codex_state::DirtyMemoryScope,
) -> Option<MemoryScopeTarget> {
let scope_kind = dirty_scope.scope_kind;
let scope_key = dirty_scope.scope_key;
match scope_kind.as_str() {
memories::MEMORY_SCOPE_KIND_CWD => {
let cwd = PathBuf::from(&scope_key);
Some(MemoryScopeTarget {
scope_kind: memories::MEMORY_SCOPE_KIND_CWD,
scope_key,
memory_root: memories::memory_root_for_cwd(&config.codex_home, &cwd),
})
}
memories::MEMORY_SCOPE_KIND_USER => {
if scope_key != memories::MEMORY_SCOPE_KEY_USER {
warn!(
"skipping unsupported user memory scope key for phase-2: {}:{}",
scope_kind, scope_key
);
return None;
}
Some(MemoryScopeTarget {
scope_kind: memories::MEMORY_SCOPE_KIND_USER,
scope_key,
memory_root: memories::memory_root_for_user(&config.codex_home),
})
}
_ => {
warn!(
"skipping unsupported memory scope for phase-2 consolidation: {}:{}",
scope_kind, scope_key
);
None
}
}
}
async fn try_claim_phase2_job_with_retry(
state_db: &codex_state::StateRuntime,
scope_kind: &str,
scope_key: &str,
owner_session_id: ThreadId,
) -> Option<codex_state::Phase2JobClaimOutcome> {
for attempt in 0..=PHASE_TWO_DB_LOCK_RETRY_LIMIT {
match state_db
.try_claim_phase2_job(
scope_kind,
scope_key,
owner_session_id,
memories::PHASE_TWO_JOB_LEASE_SECONDS,
)
.await
{
Ok(claim) => return Some(claim),
Err(err) => {
let is_locked = err.to_string().contains("database is locked");
if is_locked && attempt < PHASE_TWO_DB_LOCK_RETRY_LIMIT {
tokio::time::sleep(Duration::from_millis(
PHASE_TWO_DB_LOCK_RETRY_BACKOFF_MS * (attempt as u64 + 1),
))
.await;
continue;
}
warn!("state db try_claim_phase2_job failed during {MEMORY_STARTUP_STAGE}: {err}");
return None;
}
}
}
None
}
async fn process_memory_candidate(
session: Arc<Session>,
claimed_candidate: ClaimedPhaseOneCandidate,
stage_one_context: StageOneRequestContext,
) -> HashSet<MemoryScopeTarget> {
) -> usize {
let candidate = claimed_candidate.candidate;
let claimed_scopes = claimed_candidate.claimed_scopes;
@@ -283,7 +379,7 @@ async fn process_memory_candidate(
ready_scopes.push((scope, ownership_token));
}
if ready_scopes.is_empty() {
return HashSet::new();
return 0;
}
let (rollout_items, _thread_id, parse_errors) =
@@ -301,7 +397,7 @@ async fn process_memory_candidate(
"failed to load rollout",
)
.await;
return HashSet::new();
return 0;
}
};
if parse_errors > 0 {
@@ -328,7 +424,7 @@ async fn process_memory_candidate(
"failed to serialize filtered rollout",
)
.await;
return HashSet::new();
return 0;
}
};
@@ -379,7 +475,7 @@ async fn process_memory_candidate(
"stage-1 memory request failed",
)
.await;
return HashSet::new();
return 0;
}
};
@@ -397,7 +493,7 @@ async fn process_memory_candidate(
"stage-1 memory response stream failed",
)
.await;
return HashSet::new();
return 0;
}
};
@@ -415,11 +511,11 @@ async fn process_memory_candidate(
"invalid stage-1 memory payload",
)
.await;
return HashSet::new();
return 0;
}
};
let mut touched_scopes = HashSet::new();
let mut touched_scope_count = 0;
for (scope, ownership_token) in &ready_scopes {
if persist_phase_one_memory_for_scope(
&session,
@@ -431,11 +527,11 @@ async fn process_memory_candidate(
)
.await
{
touched_scopes.insert(scope.clone());
touched_scope_count += 1;
}
}
touched_scopes
touched_scope_count
}
fn parse_source_updated_at_epoch(candidate: &memories::RolloutCandidate) -> i64 {
@@ -686,44 +782,59 @@ async fn run_memory_consolidation_for_scope(
config: Arc<Config>,
scope: MemoryScopeTarget,
) {
let lock_owner = session.conversation_id;
let Some(lock_acquired) = state_db::try_acquire_memory_consolidation_lock(
session.services.state_db.as_deref(),
&scope.memory_root,
lock_owner,
memories::CONSOLIDATION_LOCK_LEASE_SECONDS,
MEMORY_STARTUP_STAGE,
)
.await
else {
warn!(
"failed to acquire memory consolidation lock for scope {}:{}; skipping consolidation",
scope.scope_kind, scope.scope_key
);
return;
};
if !lock_acquired {
debug!(
"memory consolidation lock already held for scope {}:{}; skipping",
scope.scope_kind, scope.scope_key
);
return;
}
let Some(state_db) = session.services.state_db.as_deref() else {
warn!(
"state db unavailable for scope {}:{}; skipping consolidation",
scope.scope_kind, scope.scope_key
);
let _ = state_db::release_memory_consolidation_lock(
return;
};
let Some(claim) = try_claim_phase2_job_with_retry(
state_db,
scope.scope_kind,
&scope.scope_key,
session.conversation_id,
)
.await
else {
return;
};
let ownership_token = match claim {
codex_state::Phase2JobClaimOutcome::Claimed { ownership_token } => ownership_token,
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => {
debug!(
"memory phase-2 scope no longer dirty; skipping consolidation: {}:{}",
scope.scope_kind, scope.scope_key
);
return;
}
codex_state::Phase2JobClaimOutcome::SkippedRunning => {
debug!(
"memory phase-2 job already running for scope {}:{}; skipping",
scope.scope_kind, scope.scope_key
);
return;
}
};
if let Err(err) = memories::ensure_layout(&scope.memory_root).await {
warn!(
"failed to create memory layout for phase-2 scope {}:{} root={}: {err}",
scope.scope_kind,
scope.scope_key,
scope.memory_root.display()
);
mark_phase2_job_failed_best_effort(
session.services.state_db.as_deref(),
&scope.memory_root,
lock_owner,
MEMORY_STARTUP_STAGE,
scope.scope_kind,
&scope.scope_key,
&ownership_token,
"failed to create memory layout",
)
.await;
return;
};
}
let latest_memories = match state_db
.get_last_n_thread_memories_for_scope(
@@ -738,11 +849,12 @@ async fn run_memory_consolidation_for_scope(
warn!(
"state db get_last_n_thread_memories_for_scope failed during {MEMORY_STARTUP_STAGE}: {err}"
);
let _ = state_db::release_memory_consolidation_lock(
mark_phase2_job_failed_best_effort(
session.services.state_db.as_deref(),
&scope.memory_root,
lock_owner,
MEMORY_STARTUP_STAGE,
scope.scope_kind,
&scope.scope_key,
&ownership_token,
"failed to read scope memories before consolidation",
)
.await;
return;
@@ -757,11 +869,12 @@ async fn run_memory_consolidation_for_scope(
"failed to refresh phase-1 memory outputs for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
let _ = state_db::release_memory_consolidation_lock(
mark_phase2_job_failed_best_effort(
session.services.state_db.as_deref(),
&scope.memory_root,
lock_owner,
MEMORY_STARTUP_STAGE,
scope.scope_kind,
&scope.scope_key,
&ownership_token,
"failed to refresh phase-1 memory outputs",
)
.await;
return;
@@ -772,11 +885,12 @@ async fn run_memory_consolidation_for_scope(
"failed to wipe previous consolidation outputs for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
let _ = state_db::release_memory_consolidation_lock(
mark_phase2_job_failed_best_effort(
session.services.state_db.as_deref(),
&scope.memory_root,
lock_owner,
MEMORY_STARTUP_STAGE,
scope.scope_kind,
&scope.scope_key,
&ownership_token,
"failed to wipe previous consolidation outputs",
)
.await;
return;
@@ -799,14 +913,37 @@ async fn run_memory_consolidation_for_scope(
.await
{
Ok(consolidation_agent_id) => {
match state_db
.set_phase2_job_agent_thread_id(
scope.scope_kind,
&scope.scope_key,
&ownership_token,
consolidation_agent_id,
)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 job lost ownership before agent registration: {}:{}",
scope.scope_kind, scope.scope_key
);
return;
}
Err(err) => {
warn!(
"state db set_phase2_job_agent_thread_id failed during {MEMORY_STARTUP_STAGE}: {err}"
);
}
}
info!(
"memory phase-2 consolidation agent started: scope={} scope_key={} agent_id={}",
scope.scope_kind, scope.scope_key, consolidation_agent_id
);
spawn_memory_lock_release_task(
spawn_phase2_completion_task(
session.as_ref(),
scope.memory_root,
lock_owner,
scope,
ownership_token,
consolidation_agent_id,
);
}
@@ -815,72 +952,148 @@ async fn run_memory_consolidation_for_scope(
"failed to spawn memory consolidation agent for scope {}:{}: {err}",
scope.scope_kind, scope.scope_key
);
let _ = state_db::release_memory_consolidation_lock(
mark_phase2_job_failed_best_effort(
session.services.state_db.as_deref(),
&scope.memory_root,
lock_owner,
MEMORY_STARTUP_STAGE,
scope.scope_kind,
&scope.scope_key,
&ownership_token,
"failed to spawn consolidation agent",
)
.await;
}
}
}
fn spawn_memory_lock_release_task(
fn spawn_phase2_completion_task(
session: &Session,
cwd: PathBuf,
lock_owner: ThreadId,
scope: MemoryScopeTarget,
ownership_token: String,
consolidation_agent_id: ThreadId,
) {
let state_db = session.services.state_db.clone();
let agent_control = session.services.agent_control.clone();
tokio::spawn(async move {
let Some(state_db) = state_db.as_deref() else {
return;
};
let mut status_rx = match agent_control.subscribe_status(consolidation_agent_id).await {
Ok(status_rx) => status_rx,
Err(err) => {
warn!(
"failed to subscribe to memory consolidation agent {} for cwd {}: {err}",
consolidation_agent_id,
cwd.display()
"failed to subscribe to memory consolidation agent {} for scope {}:{}: {err}",
consolidation_agent_id, scope.scope_kind, scope.scope_key
);
let _ = state_db::release_memory_consolidation_lock(
state_db.as_deref(),
&cwd,
lock_owner,
MEMORY_STARTUP_STAGE,
)
.await;
if let Err(mark_err) = state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
&ownership_token,
"failed to subscribe to consolidation agent status",
)
.await
{
warn!(
"state db mark_phase2_job_failed failed during {MEMORY_STARTUP_STAGE}: {mark_err}"
);
}
return;
}
};
let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(
memories::PHASE_TWO_JOB_HEARTBEAT_SECONDS,
));
heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let final_status = loop {
let status = status_rx.borrow().clone();
if is_final_agent_status(&status) {
break Some(status);
break status;
}
if status_rx.changed().await.is_err() {
warn!(
"lost status updates for memory consolidation agent {} in cwd {}; releasing lock",
consolidation_agent_id,
cwd.display()
);
break Some(status);
tokio::select! {
changed = status_rx.changed() => {
if changed.is_err() {
warn!(
"lost status updates for memory consolidation agent {} in scope {}:{}",
consolidation_agent_id, scope.scope_kind, scope.scope_key
);
break status;
}
}
_ = heartbeat_interval.tick() => {
match state_db
.heartbeat_phase2_job(scope.scope_kind, &scope.scope_key, &ownership_token)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 heartbeat lost ownership for scope {}:{}; skipping finalization",
scope.scope_kind, scope.scope_key
);
return;
}
Err(err) => {
warn!("state db heartbeat_phase2_job failed during {MEMORY_STARTUP_STAGE}: {err}");
return;
}
}
}
}
};
let _ = state_db::release_memory_consolidation_lock(
state_db.as_deref(),
&cwd,
lock_owner,
MEMORY_STARTUP_STAGE,
)
.await;
info!(
"memory phase-2 consolidation agent finished: cwd={} agent_id={} final_status={:?}",
cwd.display(),
consolidation_agent_id,
final_status
let phase2_succeeded = matches!(&final_status, AgentStatus::Completed(_));
if phase2_succeeded {
match state_db
.mark_phase2_job_succeeded(scope.scope_kind, &scope.scope_key, &ownership_token)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 success finalization skipped after ownership changed: scope={} scope_key={}",
scope.scope_kind, scope.scope_key
);
}
Err(err) => {
warn!(
"state db mark_phase2_job_succeeded failed during {MEMORY_STARTUP_STAGE}: {err}"
);
}
}
info!(
"memory phase-2 consolidation agent finished: scope={} scope_key={} agent_id={} final_status={final_status:?}",
scope.scope_kind, scope.scope_key, consolidation_agent_id
);
return;
}
let failure_reason = format!("consolidation agent finished with status {final_status:?}");
match state_db
.mark_phase2_job_failed(
scope.scope_kind,
&scope.scope_key,
&ownership_token,
&failure_reason,
)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 failure finalization skipped after ownership changed: scope={} scope_key={}",
scope.scope_kind, scope.scope_key
)
}
Err(err) => {
warn!(
"state db mark_phase2_job_failed failed during {MEMORY_STARTUP_STAGE}: {err}"
);
}
}
warn!(
"memory phase-2 consolidation agent finished with non-success status: scope={} scope_key={} agent_id={} final_status={final_status:?}",
scope.scope_kind, scope.scope_key, consolidation_agent_id
);
});
}
@@ -910,6 +1123,24 @@ async fn mark_phase1_job_failed_best_effort(
}
}
async fn mark_phase2_job_failed_best_effort(
state_db: Option<&codex_state::StateRuntime>,
scope_kind: &str,
scope_key: &str,
ownership_token: &str,
failure_reason: &str,
) {
let Some(state_db) = state_db else {
return;
};
if let Err(err) = state_db
.mark_phase2_job_failed(scope_kind, scope_key, ownership_token, failure_reason)
.await
{
warn!("state db mark_phase2_job_failed failed during {MEMORY_STARTUP_STAGE}: {err}");
}
}
async fn collect_response_text_until_completed(stream: &mut ResponseStream) -> CodexResult<String> {
let mut output_text = String::new();

View File

@@ -26,8 +26,10 @@ pub(crate) const MAX_RAW_MEMORIES_PER_SCOPE: usize = 64;
pub(crate) const PHASE_ONE_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
/// Lease duration (seconds) for phase-1 job ownership.
pub(crate) const PHASE_ONE_JOB_LEASE_SECONDS: i64 = 3_600;
/// Lease duration (seconds) for per-cwd consolidation locks.
pub(crate) const CONSOLIDATION_LOCK_LEASE_SECONDS: i64 = 600;
/// Lease duration (seconds) for phase-2 consolidation job ownership.
pub(crate) const PHASE_TWO_JOB_LEASE_SECONDS: i64 = 3_600;
/// Heartbeat interval (seconds) for phase-2 running jobs.
pub(crate) const PHASE_TWO_JOB_HEARTBEAT_SECONDS: u64 = 30;
pub(crate) const MEMORY_SCOPE_KIND_CWD: &str = "cwd";
pub(crate) const MEMORY_SCOPE_KIND_USER: &str = "user";
pub(crate) const MEMORY_SCOPE_KEY_USER: &str = "user";

View File

@@ -373,49 +373,6 @@ pub async fn get_last_n_thread_memories_for_cwd(
}
}
/// Try to acquire or renew a per-cwd memory consolidation lock.
pub async fn try_acquire_memory_consolidation_lock(
context: Option<&codex_state::StateRuntime>,
cwd: &Path,
working_thread_id: ThreadId,
lease_seconds: i64,
stage: &str,
) -> Option<bool> {
let ctx = context?;
let normalized_cwd = normalize_cwd_for_state_db(cwd);
match ctx
.try_acquire_memory_consolidation_lock(&normalized_cwd, working_thread_id, lease_seconds)
.await
{
Ok(acquired) => Some(acquired),
Err(err) => {
warn!("state db try_acquire_memory_consolidation_lock failed during {stage}: {err}");
None
}
}
}
/// Release a per-cwd memory consolidation lock if held by `working_thread_id`.
pub async fn release_memory_consolidation_lock(
context: Option<&codex_state::StateRuntime>,
cwd: &Path,
working_thread_id: ThreadId,
stage: &str,
) -> Option<bool> {
let ctx = context?;
let normalized_cwd = normalize_cwd_for_state_db(cwd);
match ctx
.release_memory_consolidation_lock(&normalized_cwd, working_thread_id)
.await
{
Ok(released) => Some(released),
Err(err) => {
warn!("state db release_memory_consolidation_lock failed during {stage}: {err}");
None
}
}
}
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
pub async fn reconcile_rollout(
context: Option<&codex_state::StateRuntime>,