diff --git a/codex-rs/core/src/codex/memory_startup.rs b/codex-rs/core/src/codex/memory_startup.rs index dda7918574..f2a0bae271 100644 --- a/codex-rs/core/src/codex/memory_startup.rs +++ b/codex-rs/core/src/codex/memory_startup.rs @@ -9,8 +9,10 @@ const MEMORY_STARTUP_STAGE: &str = "run_memories_startup_pipeline"; const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000; const PHASE_ONE_DB_LOCK_RETRY_LIMIT: usize = 3; const PHASE_ONE_DB_LOCK_RETRY_BACKOFF_MS: u64 = 25; +const PHASE_TWO_DB_LOCK_RETRY_LIMIT: usize = 3; +const PHASE_TWO_DB_LOCK_RETRY_BACKOFF_MS: u64 = 25; -#[derive(Clone, Debug, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] struct MemoryScopeTarget { scope_kind: &'static str, scope_key: String, @@ -108,17 +110,14 @@ pub(super) async fn run_memories_startup_pipeline( page.items.len() ); - if claimed_candidates.is_empty() { - return Ok(()); - } - - let stage_one_context = StageOneRequestContext::from_turn_context( - turn_context.as_ref(), - turn_context.resolve_turn_metadata_header().await, - ); - - let touched_scope_sets = - futures::stream::iter(claimed_candidates.into_iter()) + let touched_scope_count = if claimed_candidates.is_empty() { + 0 + } else { + let stage_one_context = StageOneRequestContext::from_turn_context( + turn_context.as_ref(), + turn_context.resolve_turn_metadata_header().await, + ); + let touched_scope_counts = futures::stream::iter(claimed_candidates.into_iter()) .map(|claimed_candidate| { let session = Arc::clone(session); let stage_one_context = stage_one_context.clone(); @@ -127,23 +126,20 @@ pub(super) async fn run_memories_startup_pipeline( } }) .buffer_unordered(memories::PHASE_ONE_CONCURRENCY_LIMIT) - .collect::>>() + .collect::>() .await; - let touched_scopes = touched_scope_sets - .into_iter() - .flatten() - .collect::>(); + touched_scope_counts.into_iter().sum::() + }; info!( "memory phase-1 extraction complete: {} scope(s) touched", - touched_scopes.len() + touched_scope_count ); - if touched_scopes.is_empty() { - return Ok(()); - } - - let consolidation_scope_count = touched_scopes.len(); - futures::stream::iter(touched_scopes.into_iter()) + let dirty_scopes = + list_phase2_dirty_scopes(session, config.as_ref(), memories::MAX_ROLLOUTS_PER_STARTUP) + .await; + let consolidation_scope_count = dirty_scopes.len(); + futures::stream::iter(dirty_scopes.into_iter()) .map(|scope| { let session = Arc::clone(session); let config = Arc::clone(&config); @@ -252,11 +248,111 @@ async fn try_claim_phase1_job_with_retry( None } +async fn list_phase2_dirty_scopes( + session: &Session, + config: &Config, + limit: usize, +) -> Vec { + if limit == 0 { + return Vec::new(); + } + + let Some(state_db) = session.services.state_db.as_deref() else { + return Vec::new(); + }; + + let dirty_scopes = match state_db.list_dirty_memory_scopes(limit).await { + Ok(scopes) => scopes, + Err(err) => { + warn!("state db list_dirty_memory_scopes failed during {MEMORY_STARTUP_STAGE}: {err}"); + return Vec::new(); + } + }; + + dirty_scopes + .into_iter() + .filter_map(|dirty_scope| memory_scope_target_for_dirty_scope(config, dirty_scope)) + .collect() +} + +fn memory_scope_target_for_dirty_scope( + config: &Config, + dirty_scope: codex_state::DirtyMemoryScope, +) -> Option { + let scope_kind = dirty_scope.scope_kind; + let scope_key = dirty_scope.scope_key; + match scope_kind.as_str() { + memories::MEMORY_SCOPE_KIND_CWD => { + let cwd = PathBuf::from(&scope_key); + Some(MemoryScopeTarget { + scope_kind: memories::MEMORY_SCOPE_KIND_CWD, + scope_key, + memory_root: memories::memory_root_for_cwd(&config.codex_home, &cwd), + }) + } + memories::MEMORY_SCOPE_KIND_USER => { + if scope_key != memories::MEMORY_SCOPE_KEY_USER { + warn!( + "skipping unsupported user memory scope key for phase-2: {}:{}", + scope_kind, scope_key + ); + return None; + } + Some(MemoryScopeTarget { + scope_kind: memories::MEMORY_SCOPE_KIND_USER, + scope_key, + memory_root: memories::memory_root_for_user(&config.codex_home), + }) + } + _ => { + warn!( + "skipping unsupported memory scope for phase-2 consolidation: {}:{}", + scope_kind, scope_key + ); + None + } + } +} + +async fn try_claim_phase2_job_with_retry( + state_db: &codex_state::StateRuntime, + scope_kind: &str, + scope_key: &str, + owner_session_id: ThreadId, +) -> Option { + for attempt in 0..=PHASE_TWO_DB_LOCK_RETRY_LIMIT { + match state_db + .try_claim_phase2_job( + scope_kind, + scope_key, + owner_session_id, + memories::PHASE_TWO_JOB_LEASE_SECONDS, + ) + .await + { + Ok(claim) => return Some(claim), + Err(err) => { + let is_locked = err.to_string().contains("database is locked"); + if is_locked && attempt < PHASE_TWO_DB_LOCK_RETRY_LIMIT { + tokio::time::sleep(Duration::from_millis( + PHASE_TWO_DB_LOCK_RETRY_BACKOFF_MS * (attempt as u64 + 1), + )) + .await; + continue; + } + warn!("state db try_claim_phase2_job failed during {MEMORY_STARTUP_STAGE}: {err}"); + return None; + } + } + } + None +} + async fn process_memory_candidate( session: Arc, claimed_candidate: ClaimedPhaseOneCandidate, stage_one_context: StageOneRequestContext, -) -> HashSet { +) -> usize { let candidate = claimed_candidate.candidate; let claimed_scopes = claimed_candidate.claimed_scopes; @@ -283,7 +379,7 @@ async fn process_memory_candidate( ready_scopes.push((scope, ownership_token)); } if ready_scopes.is_empty() { - return HashSet::new(); + return 0; } let (rollout_items, _thread_id, parse_errors) = @@ -301,7 +397,7 @@ async fn process_memory_candidate( "failed to load rollout", ) .await; - return HashSet::new(); + return 0; } }; if parse_errors > 0 { @@ -328,7 +424,7 @@ async fn process_memory_candidate( "failed to serialize filtered rollout", ) .await; - return HashSet::new(); + return 0; } }; @@ -379,7 +475,7 @@ async fn process_memory_candidate( "stage-1 memory request failed", ) .await; - return HashSet::new(); + return 0; } }; @@ -397,7 +493,7 @@ async fn process_memory_candidate( "stage-1 memory response stream failed", ) .await; - return HashSet::new(); + return 0; } }; @@ -415,11 +511,11 @@ async fn process_memory_candidate( "invalid stage-1 memory payload", ) .await; - return HashSet::new(); + return 0; } }; - let mut touched_scopes = HashSet::new(); + let mut touched_scope_count = 0; for (scope, ownership_token) in &ready_scopes { if persist_phase_one_memory_for_scope( &session, @@ -431,11 +527,11 @@ async fn process_memory_candidate( ) .await { - touched_scopes.insert(scope.clone()); + touched_scope_count += 1; } } - touched_scopes + touched_scope_count } fn parse_source_updated_at_epoch(candidate: &memories::RolloutCandidate) -> i64 { @@ -686,44 +782,59 @@ async fn run_memory_consolidation_for_scope( config: Arc, scope: MemoryScopeTarget, ) { - let lock_owner = session.conversation_id; - let Some(lock_acquired) = state_db::try_acquire_memory_consolidation_lock( - session.services.state_db.as_deref(), - &scope.memory_root, - lock_owner, - memories::CONSOLIDATION_LOCK_LEASE_SECONDS, - MEMORY_STARTUP_STAGE, - ) - .await - else { - warn!( - "failed to acquire memory consolidation lock for scope {}:{}; skipping consolidation", - scope.scope_kind, scope.scope_key - ); - return; - }; - if !lock_acquired { - debug!( - "memory consolidation lock already held for scope {}:{}; skipping", - scope.scope_kind, scope.scope_key - ); - return; - } - let Some(state_db) = session.services.state_db.as_deref() else { warn!( "state db unavailable for scope {}:{}; skipping consolidation", scope.scope_kind, scope.scope_key ); - let _ = state_db::release_memory_consolidation_lock( + return; + }; + + let Some(claim) = try_claim_phase2_job_with_retry( + state_db, + scope.scope_kind, + &scope.scope_key, + session.conversation_id, + ) + .await + else { + return; + }; + let ownership_token = match claim { + codex_state::Phase2JobClaimOutcome::Claimed { ownership_token } => ownership_token, + codex_state::Phase2JobClaimOutcome::SkippedNotDirty => { + debug!( + "memory phase-2 scope no longer dirty; skipping consolidation: {}:{}", + scope.scope_kind, scope.scope_key + ); + return; + } + codex_state::Phase2JobClaimOutcome::SkippedRunning => { + debug!( + "memory phase-2 job already running for scope {}:{}; skipping", + scope.scope_kind, scope.scope_key + ); + return; + } + }; + + if let Err(err) = memories::ensure_layout(&scope.memory_root).await { + warn!( + "failed to create memory layout for phase-2 scope {}:{} root={}: {err}", + scope.scope_kind, + scope.scope_key, + scope.memory_root.display() + ); + mark_phase2_job_failed_best_effort( session.services.state_db.as_deref(), - &scope.memory_root, - lock_owner, - MEMORY_STARTUP_STAGE, + scope.scope_kind, + &scope.scope_key, + &ownership_token, + "failed to create memory layout", ) .await; return; - }; + } let latest_memories = match state_db .get_last_n_thread_memories_for_scope( @@ -738,11 +849,12 @@ async fn run_memory_consolidation_for_scope( warn!( "state db get_last_n_thread_memories_for_scope failed during {MEMORY_STARTUP_STAGE}: {err}" ); - let _ = state_db::release_memory_consolidation_lock( + mark_phase2_job_failed_best_effort( session.services.state_db.as_deref(), - &scope.memory_root, - lock_owner, - MEMORY_STARTUP_STAGE, + scope.scope_kind, + &scope.scope_key, + &ownership_token, + "failed to read scope memories before consolidation", ) .await; return; @@ -757,11 +869,12 @@ async fn run_memory_consolidation_for_scope( "failed to refresh phase-1 memory outputs for scope {}:{}: {err}", scope.scope_kind, scope.scope_key ); - let _ = state_db::release_memory_consolidation_lock( + mark_phase2_job_failed_best_effort( session.services.state_db.as_deref(), - &scope.memory_root, - lock_owner, - MEMORY_STARTUP_STAGE, + scope.scope_kind, + &scope.scope_key, + &ownership_token, + "failed to refresh phase-1 memory outputs", ) .await; return; @@ -772,11 +885,12 @@ async fn run_memory_consolidation_for_scope( "failed to wipe previous consolidation outputs for scope {}:{}: {err}", scope.scope_kind, scope.scope_key ); - let _ = state_db::release_memory_consolidation_lock( + mark_phase2_job_failed_best_effort( session.services.state_db.as_deref(), - &scope.memory_root, - lock_owner, - MEMORY_STARTUP_STAGE, + scope.scope_kind, + &scope.scope_key, + &ownership_token, + "failed to wipe previous consolidation outputs", ) .await; return; @@ -799,14 +913,37 @@ async fn run_memory_consolidation_for_scope( .await { Ok(consolidation_agent_id) => { + match state_db + .set_phase2_job_agent_thread_id( + scope.scope_kind, + &scope.scope_key, + &ownership_token, + consolidation_agent_id, + ) + .await + { + Ok(true) => {} + Ok(false) => { + debug!( + "memory phase-2 job lost ownership before agent registration: {}:{}", + scope.scope_kind, scope.scope_key + ); + return; + } + Err(err) => { + warn!( + "state db set_phase2_job_agent_thread_id failed during {MEMORY_STARTUP_STAGE}: {err}" + ); + } + } info!( "memory phase-2 consolidation agent started: scope={} scope_key={} agent_id={}", scope.scope_kind, scope.scope_key, consolidation_agent_id ); - spawn_memory_lock_release_task( + spawn_phase2_completion_task( session.as_ref(), - scope.memory_root, - lock_owner, + scope, + ownership_token, consolidation_agent_id, ); } @@ -815,72 +952,148 @@ async fn run_memory_consolidation_for_scope( "failed to spawn memory consolidation agent for scope {}:{}: {err}", scope.scope_kind, scope.scope_key ); - let _ = state_db::release_memory_consolidation_lock( + mark_phase2_job_failed_best_effort( session.services.state_db.as_deref(), - &scope.memory_root, - lock_owner, - MEMORY_STARTUP_STAGE, + scope.scope_kind, + &scope.scope_key, + &ownership_token, + "failed to spawn consolidation agent", ) .await; } } } -fn spawn_memory_lock_release_task( +fn spawn_phase2_completion_task( session: &Session, - cwd: PathBuf, - lock_owner: ThreadId, + scope: MemoryScopeTarget, + ownership_token: String, consolidation_agent_id: ThreadId, ) { let state_db = session.services.state_db.clone(); let agent_control = session.services.agent_control.clone(); tokio::spawn(async move { + let Some(state_db) = state_db.as_deref() else { + return; + }; let mut status_rx = match agent_control.subscribe_status(consolidation_agent_id).await { Ok(status_rx) => status_rx, Err(err) => { warn!( - "failed to subscribe to memory consolidation agent {} for cwd {}: {err}", - consolidation_agent_id, - cwd.display() + "failed to subscribe to memory consolidation agent {} for scope {}:{}: {err}", + consolidation_agent_id, scope.scope_kind, scope.scope_key ); - let _ = state_db::release_memory_consolidation_lock( - state_db.as_deref(), - &cwd, - lock_owner, - MEMORY_STARTUP_STAGE, - ) - .await; + if let Err(mark_err) = state_db + .mark_phase2_job_failed( + scope.scope_kind, + &scope.scope_key, + &ownership_token, + "failed to subscribe to consolidation agent status", + ) + .await + { + warn!( + "state db mark_phase2_job_failed failed during {MEMORY_STARTUP_STAGE}: {mark_err}" + ); + } return; } }; + let mut heartbeat_interval = tokio::time::interval(Duration::from_secs( + memories::PHASE_TWO_JOB_HEARTBEAT_SECONDS, + )); + heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let final_status = loop { let status = status_rx.borrow().clone(); if is_final_agent_status(&status) { - break Some(status); + break status; } - if status_rx.changed().await.is_err() { - warn!( - "lost status updates for memory consolidation agent {} in cwd {}; releasing lock", - consolidation_agent_id, - cwd.display() - ); - break Some(status); + + tokio::select! { + changed = status_rx.changed() => { + if changed.is_err() { + warn!( + "lost status updates for memory consolidation agent {} in scope {}:{}", + consolidation_agent_id, scope.scope_kind, scope.scope_key + ); + break status; + } + } + _ = heartbeat_interval.tick() => { + match state_db + .heartbeat_phase2_job(scope.scope_kind, &scope.scope_key, &ownership_token) + .await + { + Ok(true) => {} + Ok(false) => { + debug!( + "memory phase-2 heartbeat lost ownership for scope {}:{}; skipping finalization", + scope.scope_kind, scope.scope_key + ); + return; + } + Err(err) => { + warn!("state db heartbeat_phase2_job failed during {MEMORY_STARTUP_STAGE}: {err}"); + return; + } + } + } } }; - let _ = state_db::release_memory_consolidation_lock( - state_db.as_deref(), - &cwd, - lock_owner, - MEMORY_STARTUP_STAGE, - ) - .await; - info!( - "memory phase-2 consolidation agent finished: cwd={} agent_id={} final_status={:?}", - cwd.display(), - consolidation_agent_id, - final_status + let phase2_succeeded = matches!(&final_status, AgentStatus::Completed(_)); + if phase2_succeeded { + match state_db + .mark_phase2_job_succeeded(scope.scope_kind, &scope.scope_key, &ownership_token) + .await + { + Ok(true) => {} + Ok(false) => { + debug!( + "memory phase-2 success finalization skipped after ownership changed: scope={} scope_key={}", + scope.scope_kind, scope.scope_key + ); + } + Err(err) => { + warn!( + "state db mark_phase2_job_succeeded failed during {MEMORY_STARTUP_STAGE}: {err}" + ); + } + } + info!( + "memory phase-2 consolidation agent finished: scope={} scope_key={} agent_id={} final_status={final_status:?}", + scope.scope_kind, scope.scope_key, consolidation_agent_id + ); + return; + } + + let failure_reason = format!("consolidation agent finished with status {final_status:?}"); + match state_db + .mark_phase2_job_failed( + scope.scope_kind, + &scope.scope_key, + &ownership_token, + &failure_reason, + ) + .await + { + Ok(true) => {} + Ok(false) => { + debug!( + "memory phase-2 failure finalization skipped after ownership changed: scope={} scope_key={}", + scope.scope_kind, scope.scope_key + ) + } + Err(err) => { + warn!( + "state db mark_phase2_job_failed failed during {MEMORY_STARTUP_STAGE}: {err}" + ); + } + } + warn!( + "memory phase-2 consolidation agent finished with non-success status: scope={} scope_key={} agent_id={} final_status={final_status:?}", + scope.scope_kind, scope.scope_key, consolidation_agent_id ); }); } @@ -910,6 +1123,24 @@ async fn mark_phase1_job_failed_best_effort( } } +async fn mark_phase2_job_failed_best_effort( + state_db: Option<&codex_state::StateRuntime>, + scope_kind: &str, + scope_key: &str, + ownership_token: &str, + failure_reason: &str, +) { + let Some(state_db) = state_db else { + return; + }; + if let Err(err) = state_db + .mark_phase2_job_failed(scope_kind, scope_key, ownership_token, failure_reason) + .await + { + warn!("state db mark_phase2_job_failed failed during {MEMORY_STARTUP_STAGE}: {err}"); + } +} + async fn collect_response_text_until_completed(stream: &mut ResponseStream) -> CodexResult { let mut output_text = String::new(); diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 9a0ae00d21..2767a85d42 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -26,8 +26,10 @@ pub(crate) const MAX_RAW_MEMORIES_PER_SCOPE: usize = 64; pub(crate) const PHASE_ONE_MAX_ROLLOUT_AGE_DAYS: i64 = 30; /// Lease duration (seconds) for phase-1 job ownership. pub(crate) const PHASE_ONE_JOB_LEASE_SECONDS: i64 = 3_600; -/// Lease duration (seconds) for per-cwd consolidation locks. -pub(crate) const CONSOLIDATION_LOCK_LEASE_SECONDS: i64 = 600; +/// Lease duration (seconds) for phase-2 consolidation job ownership. +pub(crate) const PHASE_TWO_JOB_LEASE_SECONDS: i64 = 3_600; +/// Heartbeat interval (seconds) for phase-2 running jobs. +pub(crate) const PHASE_TWO_JOB_HEARTBEAT_SECONDS: u64 = 30; pub(crate) const MEMORY_SCOPE_KIND_CWD: &str = "cwd"; pub(crate) const MEMORY_SCOPE_KIND_USER: &str = "user"; pub(crate) const MEMORY_SCOPE_KEY_USER: &str = "user"; diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index 11ba68d4f3..f4d760cc26 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -373,49 +373,6 @@ pub async fn get_last_n_thread_memories_for_cwd( } } -/// Try to acquire or renew a per-cwd memory consolidation lock. -pub async fn try_acquire_memory_consolidation_lock( - context: Option<&codex_state::StateRuntime>, - cwd: &Path, - working_thread_id: ThreadId, - lease_seconds: i64, - stage: &str, -) -> Option { - let ctx = context?; - let normalized_cwd = normalize_cwd_for_state_db(cwd); - match ctx - .try_acquire_memory_consolidation_lock(&normalized_cwd, working_thread_id, lease_seconds) - .await - { - Ok(acquired) => Some(acquired), - Err(err) => { - warn!("state db try_acquire_memory_consolidation_lock failed during {stage}: {err}"); - None - } - } -} - -/// Release a per-cwd memory consolidation lock if held by `working_thread_id`. -pub async fn release_memory_consolidation_lock( - context: Option<&codex_state::StateRuntime>, - cwd: &Path, - working_thread_id: ThreadId, - stage: &str, -) -> Option { - let ctx = context?; - let normalized_cwd = normalize_cwd_for_state_db(cwd); - match ctx - .release_memory_consolidation_lock(&normalized_cwd, working_thread_id) - .await - { - Ok(released) => Some(released), - Err(err) => { - warn!("state db release_memory_consolidation_lock failed during {stage}: {err}"); - None - } - } -} - /// Reconcile rollout items into SQLite, falling back to scanning the rollout file. pub async fn reconcile_rollout( context: Option<&codex_state::StateRuntime>, diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 6db552794b..0219dbe49d 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -31,7 +31,9 @@ pub use model::ThreadMemory; pub use model::ThreadMetadata; pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; +pub use runtime::DirtyMemoryScope; pub use runtime::Phase1JobClaimOutcome; +pub use runtime::Phase2JobClaimOutcome; pub use runtime::STATE_DB_FILENAME; pub use runtime::STATE_DB_VERSION; pub use runtime::state_db_filename; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index d1049a891c..b3c763c7cc 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -60,6 +60,19 @@ pub enum Phase1JobClaimOutcome { SkippedRunning, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DirtyMemoryScope { + pub scope_kind: String, + pub scope_key: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Phase2JobClaimOutcome { + Claimed { ownership_token: String }, + SkippedNotDirty, + SkippedRunning, +} + impl StateRuntime { /// Initialize the state runtime using the provided Codex home and default provider. /// @@ -1040,62 +1053,342 @@ ON CONFLICT(scope_kind, scope_key) DO UPDATE SET Ok(()) } - /// Try to acquire or renew the per-cwd memory consolidation lock. - /// - /// Returns `true` when the lock is acquired/renewed for `working_thread_id`. - /// Returns `false` when another owner holds a non-expired lease. - pub async fn try_acquire_memory_consolidation_lock( + /// List scopes that currently require phase-2 consolidation. + pub async fn list_dirty_memory_scopes( &self, - cwd: &Path, - working_thread_id: ThreadId, - lease_seconds: i64, - ) -> anyhow::Result { - let now = Utc::now().timestamp(); - let stale_cutoff = now.saturating_sub(lease_seconds.max(0)); - let result = sqlx::query( + limit: usize, + ) -> anyhow::Result> { + if limit == 0 { + return Ok(Vec::new()); + } + + let rows = sqlx::query( r#" -INSERT INTO memory_consolidation_locks ( - cwd, - working_thread_id, - updated_at -) VALUES (?, ?, ?) -ON CONFLICT(cwd) DO UPDATE SET - working_thread_id = excluded.working_thread_id, - updated_at = excluded.updated_at -WHERE memory_consolidation_locks.working_thread_id = excluded.working_thread_id - OR memory_consolidation_locks.updated_at <= ? +SELECT scope_kind, scope_key +FROM memory_scope_dirty +WHERE dirty = 1 +ORDER BY updated_at DESC, scope_kind ASC, scope_key ASC +LIMIT ? "#, ) - .bind(cwd.display().to_string()) - .bind(working_thread_id.to_string()) - .bind(now) - .bind(stale_cutoff) - .execute(self.pool.as_ref()) + .bind(limit as i64) + .fetch_all(self.pool.as_ref()) .await?; - Ok(result.rows_affected() > 0) + rows.into_iter() + .map(|row| { + Ok(DirtyMemoryScope { + scope_kind: row.try_get("scope_kind")?, + scope_key: row.try_get("scope_key")?, + }) + }) + .collect() } - /// Release the per-cwd memory consolidation lock if held by `working_thread_id`. - /// - /// Returns `true` when a lock row was removed. - pub async fn release_memory_consolidation_lock( + /// Try to claim a phase-2 consolidation job for `(scope_kind, scope_key)`. + pub async fn try_claim_phase2_job( &self, - cwd: &Path, - working_thread_id: ThreadId, + scope_kind: &str, + scope_key: &str, + owner_session_id: ThreadId, + lease_seconds: i64, + ) -> anyhow::Result { + const CAS_RETRY_LIMIT: usize = 3; + + for _ in 0..CAS_RETRY_LIMIT { + let now = Utc::now().timestamp(); + let stale_cutoff = now.saturating_sub(lease_seconds.max(0)); + let ownership_token = Uuid::new_v4().to_string(); + let owner_session_id = owner_session_id.to_string(); + + let mut tx = self.pool.begin().await?; + + let dirty_row = sqlx::query( + r#" +SELECT dirty +FROM memory_scope_dirty +WHERE scope_kind = ? AND scope_key = ? + "#, + ) + .bind(scope_kind) + .bind(scope_key) + .fetch_optional(&mut *tx) + .await?; + let Some(dirty_row) = dirty_row else { + tx.commit().await?; + return Ok(Phase2JobClaimOutcome::SkippedNotDirty); + }; + let dirty: bool = dirty_row.try_get("dirty")?; + if !dirty { + tx.commit().await?; + return Ok(Phase2JobClaimOutcome::SkippedNotDirty); + } + + let existing = sqlx::query( + r#" +SELECT status, last_heartbeat_at, attempt +FROM memory_phase2_jobs +WHERE scope_kind = ? AND scope_key = ? + "#, + ) + .bind(scope_kind) + .bind(scope_key) + .fetch_optional(&mut *tx) + .await?; + + let Some(existing) = existing else { + sqlx::query( + r#" +INSERT INTO memory_phase2_jobs ( + scope_kind, + scope_key, + status, + owner_session_id, + agent_thread_id, + started_at, + last_heartbeat_at, + finished_at, + attempt, + failure_reason, + ownership_token +) VALUES (?, ?, 'running', ?, NULL, ?, ?, NULL, 1, NULL, ?) + "#, + ) + .bind(scope_kind) + .bind(scope_key) + .bind(owner_session_id.as_str()) + .bind(now) + .bind(now) + .bind(ownership_token.as_str()) + .execute(&mut *tx) + .await?; + tx.commit().await?; + return Ok(Phase2JobClaimOutcome::Claimed { ownership_token }); + }; + + let status: String = existing.try_get("status")?; + let existing_last_heartbeat_at: Option = existing.try_get("last_heartbeat_at")?; + let existing_attempt: i64 = existing.try_get("attempt")?; + if status == "running" + && existing_last_heartbeat_at + .is_some_and(|last_heartbeat_at| last_heartbeat_at > stale_cutoff) + { + tx.commit().await?; + return Ok(Phase2JobClaimOutcome::SkippedRunning); + } + + let new_attempt = existing_attempt.saturating_add(1); + let rows_affected = if let Some(existing_last_heartbeat_at) = existing_last_heartbeat_at + { + sqlx::query( + r#" +UPDATE memory_phase2_jobs +SET + status = 'running', + owner_session_id = ?, + agent_thread_id = NULL, + started_at = ?, + last_heartbeat_at = ?, + finished_at = NULL, + attempt = ?, + failure_reason = NULL, + ownership_token = ? +WHERE scope_kind = ? AND scope_key = ? + AND status = ? AND attempt = ? AND last_heartbeat_at = ? + "#, + ) + .bind(owner_session_id.as_str()) + .bind(now) + .bind(now) + .bind(new_attempt) + .bind(ownership_token.as_str()) + .bind(scope_kind) + .bind(scope_key) + .bind(status.as_str()) + .bind(existing_attempt) + .bind(existing_last_heartbeat_at) + .execute(&mut *tx) + .await? + .rows_affected() + } else { + sqlx::query( + r#" +UPDATE memory_phase2_jobs +SET + status = 'running', + owner_session_id = ?, + agent_thread_id = NULL, + started_at = ?, + last_heartbeat_at = ?, + finished_at = NULL, + attempt = ?, + failure_reason = NULL, + ownership_token = ? +WHERE scope_kind = ? AND scope_key = ? + AND status = ? AND attempt = ? AND last_heartbeat_at IS NULL + "#, + ) + .bind(owner_session_id.as_str()) + .bind(now) + .bind(now) + .bind(new_attempt) + .bind(ownership_token.as_str()) + .bind(scope_kind) + .bind(scope_key) + .bind(status.as_str()) + .bind(existing_attempt) + .execute(&mut *tx) + .await? + .rows_affected() + }; + + if rows_affected == 0 { + tx.rollback().await?; + continue; + } + + tx.commit().await?; + return Ok(Phase2JobClaimOutcome::Claimed { ownership_token }); + } + + Ok(Phase2JobClaimOutcome::SkippedRunning) + } + + /// Persist the spawned phase-2 agent id for an owned running job. + pub async fn set_phase2_job_agent_thread_id( + &self, + scope_kind: &str, + scope_key: &str, + ownership_token: &str, + agent_thread_id: ThreadId, ) -> anyhow::Result { - let result = sqlx::query( + let now = Utc::now().timestamp(); + let rows_affected = sqlx::query( r#" -DELETE FROM memory_consolidation_locks -WHERE cwd = ? AND working_thread_id = ? +UPDATE memory_phase2_jobs +SET + agent_thread_id = ?, + last_heartbeat_at = ? +WHERE scope_kind = ? AND scope_key = ? + AND status = 'running' AND ownership_token = ? "#, ) - .bind(cwd.display().to_string()) - .bind(working_thread_id.to_string()) + .bind(agent_thread_id.to_string()) + .bind(now) + .bind(scope_kind) + .bind(scope_key) + .bind(ownership_token) .execute(self.pool.as_ref()) + .await? + .rows_affected(); + Ok(rows_affected > 0) + } + + /// Refresh heartbeat timestamp for an owned running phase-2 job. + pub async fn heartbeat_phase2_job( + &self, + scope_kind: &str, + scope_key: &str, + ownership_token: &str, + ) -> anyhow::Result { + let now = Utc::now().timestamp(); + let rows_affected = sqlx::query( + r#" +UPDATE memory_phase2_jobs +SET last_heartbeat_at = ? +WHERE scope_kind = ? AND scope_key = ? + AND status = 'running' AND ownership_token = ? + "#, + ) + .bind(now) + .bind(scope_kind) + .bind(scope_key) + .bind(ownership_token) + .execute(self.pool.as_ref()) + .await? + .rows_affected(); + Ok(rows_affected > 0) + } + + /// Finalize a claimed phase-2 job as succeeded and clear dirty state. + pub async fn mark_phase2_job_succeeded( + &self, + scope_kind: &str, + scope_key: &str, + ownership_token: &str, + ) -> anyhow::Result { + let now = Utc::now().timestamp(); + let mut tx = self.pool.begin().await?; + let rows_affected = sqlx::query( + r#" +UPDATE memory_phase2_jobs +SET + status = 'succeeded', + finished_at = ?, + failure_reason = NULL +WHERE scope_kind = ? AND scope_key = ? + AND status = 'running' AND ownership_token = ? + "#, + ) + .bind(now) + .bind(scope_kind) + .bind(scope_key) + .bind(ownership_token) + .execute(&mut *tx) + .await? + .rows_affected(); + + if rows_affected == 0 { + tx.commit().await?; + return Ok(false); + } + + sqlx::query( + r#" +UPDATE memory_scope_dirty +SET dirty = 0, updated_at = ? +WHERE scope_kind = ? AND scope_key = ? + "#, + ) + .bind(now) + .bind(scope_kind) + .bind(scope_key) + .execute(&mut *tx) .await?; - Ok(result.rows_affected() > 0) + tx.commit().await?; + Ok(true) + } + + /// Finalize a claimed phase-2 job as failed, leaving dirty scope set. + pub async fn mark_phase2_job_failed( + &self, + scope_kind: &str, + scope_key: &str, + ownership_token: &str, + failure_reason: &str, + ) -> anyhow::Result { + let now = Utc::now().timestamp(); + let rows_affected = sqlx::query( + r#" +UPDATE memory_phase2_jobs +SET + status = 'failed', + finished_at = ?, + failure_reason = ? +WHERE scope_kind = ? AND scope_key = ? + AND status = 'running' AND ownership_token = ? + "#, + ) + .bind(now) + .bind(failure_reason) + .bind(scope_kind) + .bind(scope_key) + .bind(ownership_token) + .execute(self.pool.as_ref()) + .await? + .rows_affected(); + Ok(rows_affected > 0) } /// Persist dynamic tools for a thread if none have been stored yet. @@ -1478,6 +1771,7 @@ fn push_thread_order_and_limit( #[cfg(test)] mod tests { use super::Phase1JobClaimOutcome; + use super::Phase2JobClaimOutcome; use super::STATE_DB_FILENAME; use super::STATE_DB_VERSION; use super::StateRuntime; @@ -1845,90 +2139,259 @@ mod tests { } #[tokio::test] - async fn memory_consolidation_lock_enforces_owner_and_release() { + async fn phase2_job_claim_requires_dirty_scope() { let codex_home = unique_temp_dir(); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) .await .expect("initialize runtime"); + let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); - let cwd = codex_home.join("workspace"); - let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); - let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + let claim_without_dirty = runtime + .try_claim_phase2_job("cwd", "scope", owner, 3600) + .await + .expect("claim without dirty"); + assert_eq!(claim_without_dirty, Phase2JobClaimOutcome::SkippedNotDirty); - assert!( - runtime - .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_a, 600) - .await - .expect("acquire for owner_a"), - "owner_a should acquire lock" + runtime + .mark_memory_scope_dirty("cwd", "scope", false) + .await + .expect("mark dirty false"); + let claim_with_false_dirty = runtime + .try_claim_phase2_job("cwd", "scope", owner, 3600) + .await + .expect("claim with false dirty"); + assert_eq!( + claim_with_false_dirty, + Phase2JobClaimOutcome::SkippedNotDirty ); + + runtime + .mark_memory_scope_dirty("cwd", "scope", true) + .await + .expect("mark dirty true"); + let claim_with_dirty = runtime + .try_claim_phase2_job("cwd", "scope", owner, 3600) + .await + .expect("claim with dirty"); assert!( - !runtime - .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_b, 600) - .await - .expect("acquire for owner_b should fail"), - "owner_b should not steal active lock" - ); - assert!( - runtime - .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_a, 600) - .await - .expect("owner_a should renew lock"), - "owner_a should renew lock" - ); - assert!( - !runtime - .release_memory_consolidation_lock(cwd.as_path(), owner_b) - .await - .expect("owner_b release should be no-op"), - "non-owner release should not remove lock" - ); - assert!( - runtime - .release_memory_consolidation_lock(cwd.as_path(), owner_a) - .await - .expect("owner_a release"), - "owner_a should release lock" - ); - assert!( - runtime - .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_b, 600) - .await - .expect("owner_b acquire after release"), - "owner_b should acquire released lock" + matches!(claim_with_dirty, Phase2JobClaimOutcome::Claimed { .. }), + "dirty scope should be claimable" ); let _ = tokio::fs::remove_dir_all(codex_home).await; } #[tokio::test] - async fn memory_consolidation_lock_can_be_stolen_when_lease_expired() { + async fn phase2_running_job_skips_fresh_claims_and_allows_stale_steal() { let codex_home = unique_temp_dir(); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) .await .expect("initialize runtime"); + let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); - let cwd = codex_home.join("workspace"); - let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); - let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + runtime + .mark_memory_scope_dirty("cwd", "scope", true) + .await + .expect("mark dirty true"); + + let claim_a = runtime + .try_claim_phase2_job("cwd", "scope", owner_a, 3600) + .await + .expect("claim owner_a"); + let owner_a_token = match claim_a { + Phase2JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected claim outcome: {other:?}"), + }; + + let fresh_claim_b = runtime + .try_claim_phase2_job("cwd", "scope", owner_b, 3600) + .await + .expect("fresh claim owner_b"); + assert_eq!(fresh_claim_b, Phase2JobClaimOutcome::SkippedRunning); assert!( runtime - .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_a, 600) + .heartbeat_phase2_job("cwd", "scope", owner_a_token.as_str()) .await - .expect("owner_a acquire") + .expect("owner_a heartbeat"), + "current owner should heartbeat" + ); + assert!( + !runtime + .heartbeat_phase2_job("cwd", "scope", "wrong-token") + .await + .expect("wrong token heartbeat"), + "wrong token should not heartbeat" + ); + + let stale_claim_b = runtime + .try_claim_phase2_job("cwd", "scope", owner_b, 0) + .await + .expect("stale claim owner_b"); + let owner_b_token = match stale_claim_b { + Phase2JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected stale claim outcome: {other:?}"), + }; + + assert!( + !runtime + .heartbeat_phase2_job("cwd", "scope", owner_a_token.as_str()) + .await + .expect("stale owner heartbeat"), + "stale owner should lose heartbeat ownership" ); assert!( runtime - .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_b, 0) + .heartbeat_phase2_job("cwd", "scope", owner_b_token.as_str()) .await - .expect("owner_b steal with expired lease"), - "owner_b should steal lock when lease cutoff marks previous lock stale" + .expect("new owner heartbeat"), + "new owner should heartbeat" ); let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn phase2_success_requires_owner_and_clears_dirty_scope() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + + runtime + .mark_memory_scope_dirty("cwd", "scope", true) + .await + .expect("mark dirty true"); + let claim = runtime + .try_claim_phase2_job("cwd", "scope", owner, 3600) + .await + .expect("claim"); + let ownership_token = match claim { + Phase2JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected claim outcome: {other:?}"), + }; + + assert!( + !runtime + .mark_phase2_job_succeeded("cwd", "scope", "wrong-token") + .await + .expect("wrong token success should fail"), + "wrong token should not finalize phase2 job" + ); + let dirty_after_wrong_token = sqlx::query( + "SELECT dirty FROM memory_scope_dirty WHERE scope_kind = ? AND scope_key = ?", + ) + .bind("cwd") + .bind("scope") + .fetch_one(runtime.pool.as_ref()) + .await + .expect("fetch dirty after wrong token") + .try_get::("dirty") + .expect("dirty value"); + assert!(dirty_after_wrong_token, "dirty scope should remain dirty"); + + assert!( + runtime + .mark_phase2_job_succeeded("cwd", "scope", ownership_token.as_str()) + .await + .expect("owner success should pass"), + "owner token should finalize phase2 job" + ); + let dirty_after_success = sqlx::query( + "SELECT dirty FROM memory_scope_dirty WHERE scope_kind = ? AND scope_key = ?", + ) + .bind("cwd") + .bind("scope") + .fetch_one(runtime.pool.as_ref()) + .await + .expect("fetch dirty after success") + .try_get::("dirty") + .expect("dirty value"); + assert!( + !dirty_after_success, + "successful phase2 finalization should clear dirty scope" + ); + let dirty_scopes = runtime + .list_dirty_memory_scopes(10) + .await + .expect("list dirty scopes"); + assert_eq!(dirty_scopes, Vec::new()); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn phase2_failure_keeps_scope_dirty_and_allows_retry() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + + runtime + .mark_memory_scope_dirty("cwd", "scope", true) + .await + .expect("mark dirty true"); + let claim_a = runtime + .try_claim_phase2_job("cwd", "scope", owner_a, 3600) + .await + .expect("claim owner_a"); + let owner_a_token = match claim_a { + Phase2JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected claim outcome: {other:?}"), + }; + + assert!( + runtime + .mark_phase2_job_failed( + "cwd", + "scope", + owner_a_token.as_str(), + "consolidation failed", + ) + .await + .expect("mark phase2 failed"), + "owner token should fail phase2 job" + ); + let dirty_scopes = runtime + .list_dirty_memory_scopes(10) + .await + .expect("list dirty scopes"); + assert_eq!( + dirty_scopes, + vec![super::DirtyMemoryScope { + scope_kind: "cwd".to_string(), + scope_key: "scope".to_string(), + }] + ); + + let claim_b = runtime + .try_claim_phase2_job("cwd", "scope", owner_b, 3600) + .await + .expect("claim owner_b"); + assert!( + matches!(claim_b, Phase2JobClaimOutcome::Claimed { .. }), + "failed jobs should be retryable while dirty" + ); + + let attempt = sqlx::query( + "SELECT attempt FROM memory_phase2_jobs WHERE scope_kind = ? AND scope_key = ?", + ) + .bind("cwd") + .bind("scope") + .fetch_one(runtime.pool.as_ref()) + .await + .expect("fetch attempt") + .try_get::("attempt") + .expect("attempt value"); + assert_eq!(attempt, 2); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn phase1_job_claim_and_success_require_current_owner_token() { let codex_home = unique_temp_dir();