use crate::config::Config; use crate::features::Feature; use crate::rollout::list::Cursor; use crate::rollout::list::ThreadSortKey; use crate::rollout::metadata; use chrono::DateTime; use chrono::NaiveDateTime; use chrono::Timelike; use chrono::Utc; use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_state::DB_METRIC_COMPARE_ERROR; pub use codex_state::LogEntry; use codex_state::STATE_DB_VERSION; use codex_state::ThreadMetadataBuilder; use serde_json::Value; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing::warn; use uuid::Uuid; /// Core-facing handle to the optional SQLite-backed state runtime. pub type StateDbHandle = Arc; /// Initialize the state runtime when the `sqlite` feature flag is enabled. To only be used /// inside `core`. The initialization should not be done anywhere else. pub(crate) async fn init_if_enabled( config: &Config, otel: Option<&OtelManager>, ) -> Option { if !config.features.enabled(Feature::Sqlite) { return None; } let runtime = match codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), otel.cloned(), ) .await { Ok(runtime) => runtime, Err(err) => { warn!( "failed to initialize state runtime at {}: {err}", config.codex_home.display() ); if let Some(otel) = otel { otel.counter("codex.db.init", 1, &[("status", "init_error")]); } return None; } }; let backfill_state = match runtime.get_backfill_state().await { Ok(state) => state, Err(err) => { warn!( "failed to read backfill state at {}: {err}", config.codex_home.display() ); return None; } }; if backfill_state.status != codex_state::BackfillStatus::Complete { metadata::backfill_sessions(runtime.as_ref(), config, otel).await; } require_backfill_complete(runtime, config.codex_home.as_path()).await } /// Get the DB if the feature is enabled and the DB exists. pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option { let state_path = codex_state::state_db_path(config.codex_home.as_path()); if !config.features.enabled(Feature::Sqlite) || !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { return None; } let runtime = codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), otel.cloned(), ) .await .ok()?; require_backfill_complete(runtime, config.codex_home.as_path()).await } /// Open the state runtime when the SQLite file exists, without feature gating. /// /// This is used for parity checks during the SQLite migration phase. pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option { let db_path = codex_state::state_db_path(codex_home); if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) { return None; } let runtime = codex_state::StateRuntime::init( codex_home.to_path_buf(), default_provider.to_string(), None, ) .await .ok()?; require_backfill_complete(runtime, codex_home).await } async fn require_backfill_complete( runtime: StateDbHandle, codex_home: &Path, ) -> Option { match runtime.get_backfill_state().await { Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime), Ok(state) => { warn!( "state db backfill not complete at {} (status: {})", codex_home.display(), state.status.as_str() ); None } Err(err) => { warn!( "failed to read backfill state at {}: {err}", codex_home.display() ); None } } } fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option { let cursor = cursor?; let value = serde_json::to_value(cursor).ok()?; let cursor_str = value.as_str()?; let (ts_str, id_str) = cursor_str.split_once('|')?; if id_str.contains('|') { return None; } let id = Uuid::parse_str(id_str).ok()?; let ts = if let Ok(naive) = NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S") { DateTime::::from_naive_utc_and_offset(naive, Utc) } else if let Ok(dt) = DateTime::parse_from_rfc3339(ts_str) { dt.with_timezone(&Utc) } else { return None; } .with_nanosecond(0)?; Some(codex_state::Anchor { ts, id }) } /// List thread ids from SQLite for parity checks without rollout scanning. #[allow(clippy::too_many_arguments)] pub async fn list_thread_ids_db( context: Option<&codex_state::StateRuntime>, codex_home: &Path, page_size: usize, cursor: Option<&Cursor>, sort_key: ThreadSortKey, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, archived_only: bool, stage: &str, ) -> Option> { let ctx = context?; if ctx.codex_home() != codex_home { warn!( "state db codex_home mismatch: expected {}, got {}", ctx.codex_home().display(), codex_home.display() ); } let anchor = cursor_to_anchor(cursor); let allowed_sources: Vec = allowed_sources .iter() .map(|value| match serde_json::to_value(value) { Ok(Value::String(s)) => s, Ok(other) => other.to_string(), Err(_) => String::new(), }) .collect(); let model_providers = model_providers.map(<[String]>::to_vec); match ctx .list_thread_ids( page_size, anchor.as_ref(), match sort_key { ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt, }, allowed_sources.as_slice(), model_providers.as_deref(), archived_only, ) .await { Ok(ids) => Some(ids), Err(err) => { warn!("state db list_thread_ids failed during {stage}: {err}"); None } } } /// List thread metadata from SQLite without rollout directory traversal. #[allow(clippy::too_many_arguments)] pub async fn list_threads_db( context: Option<&codex_state::StateRuntime>, codex_home: &Path, page_size: usize, cursor: Option<&Cursor>, sort_key: ThreadSortKey, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, archived: bool, ) -> Option { let ctx = context?; if ctx.codex_home() != codex_home { warn!( "state db codex_home mismatch: expected {}, got {}", ctx.codex_home().display(), codex_home.display() ); } let anchor = cursor_to_anchor(cursor); let allowed_sources: Vec = allowed_sources .iter() .map(|value| match serde_json::to_value(value) { Ok(Value::String(s)) => s, Ok(other) => other.to_string(), Err(_) => String::new(), }) .collect(); let model_providers = model_providers.map(<[String]>::to_vec); match ctx .list_threads( page_size, anchor.as_ref(), match sort_key { ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt, }, allowed_sources.as_slice(), model_providers.as_deref(), archived, ) .await { Ok(page) => Some(page), Err(err) => { warn!("state db list_threads failed: {err}"); None } } } /// Look up the rollout path for a thread id using SQLite. pub async fn find_rollout_path_by_id( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, archived_only: Option, stage: &str, ) -> Option { let ctx = context?; ctx.find_rollout_path_by_id(thread_id, archived_only) .await .unwrap_or_else(|err| { warn!("state db find_rollout_path_by_id failed during {stage}: {err}"); None }) } /// Get dynamic tools for a thread id using SQLite. pub async fn get_dynamic_tools( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, stage: &str, ) -> Option> { let ctx = context?; match ctx.get_dynamic_tools(thread_id).await { Ok(tools) => tools, Err(err) => { warn!("state db get_dynamic_tools failed during {stage}: {err}"); None } } } /// Persist dynamic tools for a thread id using SQLite, if none exist yet. pub async fn persist_dynamic_tools( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, tools: Option<&[DynamicToolSpec]>, stage: &str, ) { let Some(ctx) = context else { return; }; if let Err(err) = ctx.persist_dynamic_tools(thread_id, tools).await { warn!("state db persist_dynamic_tools failed during {stage}: {err}"); } } /// Get memory summaries for a thread id using SQLite. pub async fn get_thread_memory( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, stage: &str, ) -> Option { let ctx = context?; match ctx.get_thread_memory(thread_id).await { Ok(memory) => memory, Err(err) => { warn!("state db get_thread_memory failed during {stage}: {err}"); None } } } /// Upsert memory summaries for a thread id using SQLite. pub async fn upsert_thread_memory( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, trace_summary: &str, memory_summary: &str, stage: &str, ) -> Option { let ctx = context?; match ctx .upsert_thread_memory(thread_id, trace_summary, memory_summary) .await { Ok(memory) => Some(memory), Err(err) => { warn!("state db upsert_thread_memory failed during {stage}: {err}"); None } } } /// Get the last N memories corresponding to a cwd using an exact path match. pub async fn get_last_n_thread_memories_for_cwd( context: Option<&codex_state::StateRuntime>, cwd: &Path, n: usize, stage: &str, ) -> Option> { let ctx = context?; match ctx.get_last_n_thread_memories_for_cwd(cwd, n).await { Ok(memories) => Some(memories), Err(err) => { warn!("state db get_last_n_thread_memories_for_cwd failed during {stage}: {err}"); None } } } /// Reconcile rollout items into SQLite, falling back to scanning the rollout file. pub async fn reconcile_rollout( context: Option<&codex_state::StateRuntime>, rollout_path: &Path, default_provider: &str, builder: Option<&ThreadMetadataBuilder>, items: &[RolloutItem], archived_only: Option, ) { let Some(ctx) = context else { return; }; if builder.is_some() || !items.is_empty() { apply_rollout_items( Some(ctx), rollout_path, default_provider, builder, items, "reconcile_rollout", ) .await; return; } let outcome = match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await { Ok(outcome) => outcome, Err(err) => { warn!( "state db reconcile_rollout extraction failed {}: {err}", rollout_path.display() ); return; } }; let mut metadata = outcome.metadata; match archived_only { Some(true) if metadata.archived_at.is_none() => { metadata.archived_at = Some(metadata.updated_at); } Some(false) => { metadata.archived_at = None; } Some(true) | None => {} } if let Err(err) = ctx.upsert_thread(&metadata).await { warn!( "state db reconcile_rollout upsert failed {}: {err}", rollout_path.display() ); return; } if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await { persist_dynamic_tools( Some(ctx), meta_line.meta.id, meta_line.meta.dynamic_tools.as_deref(), "reconcile_rollout", ) .await; } else { warn!( "state db reconcile_rollout missing session meta {}", rollout_path.display() ); } } /// Repair a thread's rollout path after filesystem fallback succeeds. pub async fn read_repair_rollout_path( context: Option<&codex_state::StateRuntime>, thread_id: Option, archived_only: Option, rollout_path: &Path, ) { let Some(ctx) = context else { return; }; if let Some(thread_id) = thread_id && let Ok(Some(mut metadata)) = ctx.get_thread(thread_id).await { metadata.rollout_path = rollout_path.to_path_buf(); match archived_only { Some(true) if metadata.archived_at.is_none() => { metadata.archived_at = Some(metadata.updated_at); } Some(false) => { metadata.archived_at = None; } Some(true) | None => {} } if let Err(err) = ctx.upsert_thread(&metadata).await { warn!( "state db read-repair upsert failed for {}: {err}", rollout_path.display() ); } else { return; } } let default_provider = crate::rollout::list::read_session_meta_line(rollout_path) .await .ok() .and_then(|meta| meta.meta.model_provider) .unwrap_or_default(); reconcile_rollout( Some(ctx), rollout_path, default_provider.as_str(), None, &[], archived_only, ) .await; } /// Apply rollout items incrementally to SQLite. pub async fn apply_rollout_items( context: Option<&codex_state::StateRuntime>, rollout_path: &Path, _default_provider: &str, builder: Option<&ThreadMetadataBuilder>, items: &[RolloutItem], stage: &str, ) { let Some(ctx) = context else { return; }; let mut builder = match builder { Some(builder) => builder.clone(), None => match metadata::builder_from_items(items, rollout_path) { Some(builder) => builder, None => { warn!( "state db apply_rollout_items missing builder during {stage}: {}", rollout_path.display() ); record_discrepancy(stage, "missing_builder"); return; } }, }; builder.rollout_path = rollout_path.to_path_buf(); if let Err(err) = ctx.apply_rollout_items(&builder, items, None).await { warn!( "state db apply_rollout_items failed during {stage} for {}: {err}", rollout_path.display() ); } } /// Record a state discrepancy metric with a stage and reason tag. pub fn record_discrepancy(stage: &str, reason: &str) { // We access the global metric because the call sites might not have access to the broader // OtelManager. tracing::warn!("state db record_discrepancy: {stage}, {reason}"); if let Some(metric) = codex_otel::metrics::global() { let _ = metric.counter( DB_METRIC_COMPARE_ERROR, 1, &[ ("stage", stage), ("reason", reason), ("version", &STATE_DB_VERSION.to_string()), ], ); } } #[cfg(test)] mod tests { use super::*; use crate::rollout::list::parse_cursor; use pretty_assertions::assert_eq; #[test] fn cursor_to_anchor_normalizes_timestamp_format() { let uuid = Uuid::new_v4(); let ts_str = "2026-01-27T12-34-56"; let token = format!("{ts_str}|{uuid}"); let cursor = parse_cursor(token.as_str()).expect("cursor should parse"); let anchor = cursor_to_anchor(Some(&cursor)).expect("anchor should parse"); let naive = NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S").expect("ts should parse"); let expected_ts = DateTime::::from_naive_utc_and_offset(naive, Utc) .with_nanosecond(0) .expect("nanosecond"); assert_eq!(anchor.id, uuid); assert_eq!(anchor.ts, expected_ts); } }