Files
codex/codex-rs/core/src/state_db.rs
jif-oai 74ecd6e3b2 state: add memory consolidation lock primitives (#11199)
## Summary
- add a migration for memory_consolidation_locks
- add acquire/release lock primitives to codex-state runtime
- add core/state_db wrappers and cwd normalization for memory queries
and lock keys

## Testing
- cargo test -p codex-state memory_consolidation_lock_
- cargo test -p codex-core --lib state_db::
2026-02-09 21:04:20 +00:00

617 lines
19 KiB
Rust

use crate::config::Config;
use crate::features::Feature;
use crate::path_utils::normalize_for_path_comparison;
use crate::rollout::list::Cursor;
use crate::rollout::list::ThreadSortKey;
use crate::rollout::metadata;
use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Timelike;
use chrono::Utc;
use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_state::DB_METRIC_COMPARE_ERROR;
pub use codex_state::LogEntry;
use codex_state::STATE_DB_VERSION;
use codex_state::ThreadMetadataBuilder;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::warn;
use uuid::Uuid;
/// Core-facing handle to the optional SQLite-backed state runtime.
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
/// Initialize the state runtime when the `sqlite` feature flag is enabled. To only be used
/// inside `core`. The initialization should not be done anywhere else.
pub(crate) async fn init_if_enabled(
config: &Config,
otel: Option<&OtelManager>,
) -> Option<StateDbHandle> {
if !config.features.enabled(Feature::Sqlite) {
return None;
}
let runtime = match codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
)
.await
{
Ok(runtime) => runtime,
Err(err) => {
warn!(
"failed to initialize state runtime at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter("codex.db.init", 1, &[("status", "init_error")]);
}
return None;
}
};
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
warn!(
"failed to read backfill state at {}: {err}",
config.codex_home.display()
);
return None;
}
};
if backfill_state.status != codex_state::BackfillStatus::Complete {
let runtime_for_backfill = runtime.clone();
let config = config.clone();
let otel = otel.cloned();
tokio::spawn(async move {
metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config, otel.as_ref())
.await;
});
}
Some(runtime)
}
/// Get the DB if the feature is enabled and the DB exists.
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
let state_path = codex_state::state_db_path(config.codex_home.as_path());
if !config.features.enabled(Feature::Sqlite)
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
{
return None;
}
let runtime = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
)
.await
.ok()?;
require_backfill_complete(runtime, config.codex_home.as_path()).await
}
/// Open the state runtime when the SQLite file exists, without feature gating.
///
/// This is used for parity checks during the SQLite migration phase.
pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
let db_path = codex_state::state_db_path(codex_home);
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
return None;
}
let runtime = codex_state::StateRuntime::init(
codex_home.to_path_buf(),
default_provider.to_string(),
None,
)
.await
.ok()?;
require_backfill_complete(runtime, codex_home).await
}
async fn require_backfill_complete(
runtime: StateDbHandle,
codex_home: &Path,
) -> Option<StateDbHandle> {
match runtime.get_backfill_state().await {
Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime),
Ok(state) => {
warn!(
"state db backfill not complete at {} (status: {})",
codex_home.display(),
state.status.as_str()
);
None
}
Err(err) => {
warn!(
"failed to read backfill state at {}: {err}",
codex_home.display()
);
None
}
}
}
fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option<codex_state::Anchor> {
let cursor = cursor?;
let value = serde_json::to_value(cursor).ok()?;
let cursor_str = value.as_str()?;
let (ts_str, id_str) = cursor_str.split_once('|')?;
if id_str.contains('|') {
return None;
}
let id = Uuid::parse_str(id_str).ok()?;
let ts = if let Ok(naive) = NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S") {
DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
} else if let Ok(dt) = DateTime::parse_from_rfc3339(ts_str) {
dt.with_timezone(&Utc)
} else {
return None;
}
.with_nanosecond(0)?;
Some(codex_state::Anchor { ts, id })
}
fn normalize_cwd_for_state_db(cwd: &Path) -> PathBuf {
normalize_for_path_comparison(cwd).unwrap_or_else(|_| cwd.to_path_buf())
}
/// List thread ids from SQLite for parity checks without rollout scanning.
#[allow(clippy::too_many_arguments)]
pub async fn list_thread_ids_db(
context: Option<&codex_state::StateRuntime>,
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
archived_only: bool,
stage: &str,
) -> Option<Vec<ThreadId>> {
let ctx = context?;
if ctx.codex_home() != codex_home {
warn!(
"state db codex_home mismatch: expected {}, got {}",
ctx.codex_home().display(),
codex_home.display()
);
}
let anchor = cursor_to_anchor(cursor);
let allowed_sources: Vec<String> = allowed_sources
.iter()
.map(|value| match serde_json::to_value(value) {
Ok(Value::String(s)) => s,
Ok(other) => other.to_string(),
Err(_) => String::new(),
})
.collect();
let model_providers = model_providers.map(<[String]>::to_vec);
match ctx
.list_thread_ids(
page_size,
anchor.as_ref(),
match sort_key {
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
},
allowed_sources.as_slice(),
model_providers.as_deref(),
archived_only,
)
.await
{
Ok(ids) => Some(ids),
Err(err) => {
warn!("state db list_thread_ids failed during {stage}: {err}");
None
}
}
}
/// List thread metadata from SQLite without rollout directory traversal.
#[allow(clippy::too_many_arguments)]
pub async fn list_threads_db(
context: Option<&codex_state::StateRuntime>,
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
archived: bool,
) -> Option<codex_state::ThreadsPage> {
let ctx = context?;
if ctx.codex_home() != codex_home {
warn!(
"state db codex_home mismatch: expected {}, got {}",
ctx.codex_home().display(),
codex_home.display()
);
}
let anchor = cursor_to_anchor(cursor);
let allowed_sources: Vec<String> = allowed_sources
.iter()
.map(|value| match serde_json::to_value(value) {
Ok(Value::String(s)) => s,
Ok(other) => other.to_string(),
Err(_) => String::new(),
})
.collect();
let model_providers = model_providers.map(<[String]>::to_vec);
match ctx
.list_threads(
page_size,
anchor.as_ref(),
match sort_key {
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
},
allowed_sources.as_slice(),
model_providers.as_deref(),
archived,
)
.await
{
Ok(page) => Some(page),
Err(err) => {
warn!("state db list_threads failed: {err}");
None
}
}
}
/// Look up the rollout path for a thread id using SQLite.
pub async fn find_rollout_path_by_id(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
archived_only: Option<bool>,
stage: &str,
) -> Option<PathBuf> {
let ctx = context?;
ctx.find_rollout_path_by_id(thread_id, archived_only)
.await
.unwrap_or_else(|err| {
warn!("state db find_rollout_path_by_id failed during {stage}: {err}");
None
})
}
/// Get dynamic tools for a thread id using SQLite.
pub async fn get_dynamic_tools(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
stage: &str,
) -> Option<Vec<DynamicToolSpec>> {
let ctx = context?;
match ctx.get_dynamic_tools(thread_id).await {
Ok(tools) => tools,
Err(err) => {
warn!("state db get_dynamic_tools failed during {stage}: {err}");
None
}
}
}
/// Persist dynamic tools for a thread id using SQLite, if none exist yet.
pub async fn persist_dynamic_tools(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
tools: Option<&[DynamicToolSpec]>,
stage: &str,
) {
let Some(ctx) = context else {
return;
};
if let Err(err) = ctx.persist_dynamic_tools(thread_id, tools).await {
warn!("state db persist_dynamic_tools failed during {stage}: {err}");
}
}
/// Get memory summaries for a thread id using SQLite.
pub async fn get_thread_memory(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
stage: &str,
) -> Option<codex_state::ThreadMemory> {
let ctx = context?;
match ctx.get_thread_memory(thread_id).await {
Ok(memory) => memory,
Err(err) => {
warn!("state db get_thread_memory failed during {stage}: {err}");
None
}
}
}
/// Upsert memory summaries for a thread id using SQLite.
pub async fn upsert_thread_memory(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
trace_summary: &str,
memory_summary: &str,
stage: &str,
) -> Option<codex_state::ThreadMemory> {
let ctx = context?;
match ctx
.upsert_thread_memory(thread_id, trace_summary, memory_summary)
.await
{
Ok(memory) => Some(memory),
Err(err) => {
warn!("state db upsert_thread_memory failed during {stage}: {err}");
None
}
}
}
/// Get the last N memories corresponding to a cwd using an exact path match.
pub async fn get_last_n_thread_memories_for_cwd(
context: Option<&codex_state::StateRuntime>,
cwd: &Path,
n: usize,
stage: &str,
) -> Option<Vec<codex_state::ThreadMemory>> {
let ctx = context?;
let normalized_cwd = normalize_cwd_for_state_db(cwd);
match ctx
.get_last_n_thread_memories_for_cwd(&normalized_cwd, n)
.await
{
Ok(memories) => Some(memories),
Err(err) => {
warn!("state db get_last_n_thread_memories_for_cwd failed during {stage}: {err}");
None
}
}
}
/// Try to acquire or renew a per-cwd memory consolidation lock.
pub async fn try_acquire_memory_consolidation_lock(
context: Option<&codex_state::StateRuntime>,
cwd: &Path,
working_thread_id: ThreadId,
lease_seconds: i64,
stage: &str,
) -> Option<bool> {
let ctx = context?;
let normalized_cwd = normalize_cwd_for_state_db(cwd);
match ctx
.try_acquire_memory_consolidation_lock(&normalized_cwd, working_thread_id, lease_seconds)
.await
{
Ok(acquired) => Some(acquired),
Err(err) => {
warn!("state db try_acquire_memory_consolidation_lock failed during {stage}: {err}");
None
}
}
}
/// Release a per-cwd memory consolidation lock if held by `working_thread_id`.
pub async fn release_memory_consolidation_lock(
context: Option<&codex_state::StateRuntime>,
cwd: &Path,
working_thread_id: ThreadId,
stage: &str,
) -> Option<bool> {
let ctx = context?;
let normalized_cwd = normalize_cwd_for_state_db(cwd);
match ctx
.release_memory_consolidation_lock(&normalized_cwd, working_thread_id)
.await
{
Ok(released) => Some(released),
Err(err) => {
warn!("state db release_memory_consolidation_lock failed during {stage}: {err}");
None
}
}
}
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
pub async fn reconcile_rollout(
context: Option<&codex_state::StateRuntime>,
rollout_path: &Path,
default_provider: &str,
builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
archived_only: Option<bool>,
) {
let Some(ctx) = context else {
return;
};
if builder.is_some() || !items.is_empty() {
apply_rollout_items(
Some(ctx),
rollout_path,
default_provider,
builder,
items,
"reconcile_rollout",
)
.await;
return;
}
let outcome =
match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await {
Ok(outcome) => outcome,
Err(err) => {
warn!(
"state db reconcile_rollout extraction failed {}: {err}",
rollout_path.display()
);
return;
}
};
let mut metadata = outcome.metadata;
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
match archived_only {
Some(true) if metadata.archived_at.is_none() => {
metadata.archived_at = Some(metadata.updated_at);
}
Some(false) => {
metadata.archived_at = None;
}
Some(true) | None => {}
}
if let Err(err) = ctx.upsert_thread(&metadata).await {
warn!(
"state db reconcile_rollout upsert failed {}: {err}",
rollout_path.display()
);
return;
}
if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await {
persist_dynamic_tools(
Some(ctx),
meta_line.meta.id,
meta_line.meta.dynamic_tools.as_deref(),
"reconcile_rollout",
)
.await;
} else {
warn!(
"state db reconcile_rollout missing session meta {}",
rollout_path.display()
);
}
}
/// Repair a thread's rollout path after filesystem fallback succeeds.
pub async fn read_repair_rollout_path(
context: Option<&codex_state::StateRuntime>,
thread_id: Option<ThreadId>,
archived_only: Option<bool>,
rollout_path: &Path,
) {
let Some(ctx) = context else {
return;
};
if let Some(thread_id) = thread_id
&& let Ok(Some(mut metadata)) = ctx.get_thread(thread_id).await
{
metadata.rollout_path = rollout_path.to_path_buf();
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
match archived_only {
Some(true) if metadata.archived_at.is_none() => {
metadata.archived_at = Some(metadata.updated_at);
}
Some(false) => {
metadata.archived_at = None;
}
Some(true) | None => {}
}
if let Err(err) = ctx.upsert_thread(&metadata).await {
warn!(
"state db read-repair upsert failed for {}: {err}",
rollout_path.display()
);
} else {
return;
}
}
let default_provider = crate::rollout::list::read_session_meta_line(rollout_path)
.await
.ok()
.and_then(|meta| meta.meta.model_provider)
.unwrap_or_default();
reconcile_rollout(
Some(ctx),
rollout_path,
default_provider.as_str(),
None,
&[],
archived_only,
)
.await;
}
/// Apply rollout items incrementally to SQLite.
pub async fn apply_rollout_items(
context: Option<&codex_state::StateRuntime>,
rollout_path: &Path,
_default_provider: &str,
builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
stage: &str,
) {
let Some(ctx) = context else {
return;
};
let mut builder = match builder {
Some(builder) => builder.clone(),
None => match metadata::builder_from_items(items, rollout_path) {
Some(builder) => builder,
None => {
warn!(
"state db apply_rollout_items missing builder during {stage}: {}",
rollout_path.display()
);
record_discrepancy(stage, "missing_builder");
return;
}
},
};
builder.rollout_path = rollout_path.to_path_buf();
builder.cwd = normalize_cwd_for_state_db(&builder.cwd);
if let Err(err) = ctx.apply_rollout_items(&builder, items, None).await {
warn!(
"state db apply_rollout_items failed during {stage} for {}: {err}",
rollout_path.display()
);
}
}
/// Record a state discrepancy metric with a stage and reason tag.
pub fn record_discrepancy(stage: &str, reason: &str) {
// We access the global metric because the call sites might not have access to the broader
// OtelManager.
tracing::warn!("state db record_discrepancy: {stage}, {reason}");
if let Some(metric) = codex_otel::metrics::global() {
let _ = metric.counter(
DB_METRIC_COMPARE_ERROR,
1,
&[
("stage", stage),
("reason", reason),
("version", &STATE_DB_VERSION.to_string()),
],
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rollout::list::parse_cursor;
use pretty_assertions::assert_eq;
#[test]
fn cursor_to_anchor_normalizes_timestamp_format() {
let uuid = Uuid::new_v4();
let ts_str = "2026-01-27T12-34-56";
let token = format!("{ts_str}|{uuid}");
let cursor = parse_cursor(token.as_str()).expect("cursor should parse");
let anchor = cursor_to_anchor(Some(&cursor)).expect("anchor should parse");
let naive =
NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S").expect("ts should parse");
let expected_ts = DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
.with_nanosecond(0)
.expect("nanosecond");
assert_eq!(anchor.id, uuid);
assert_eq!(anchor.ts, expected_ts);
}
}