use crate::AgentJob; use crate::AgentJobCreateParams; use crate::AgentJobItem; use crate::AgentJobItemCreateParams; use crate::AgentJobItemStatus; use crate::AgentJobProgress; use crate::AgentJobStatus; use crate::GOALS_DB_FILENAME; use crate::LOGS_DB_FILENAME; use crate::LogEntry; use crate::LogQuery; use crate::LogRow; use crate::STATE_DB_FILENAME; use crate::SortKey; use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; use crate::ThreadsPage; use crate::apply_rollout_item; use crate::migrations::runtime_goals_migrator; use crate::migrations::runtime_logs_migrator; use crate::migrations::runtime_state_migrator; use crate::model::AgentJobRow; use crate::model::ThreadRow; use crate::model::anchor_from_item; use crate::model::datetime_to_epoch_millis; use crate::model::datetime_to_epoch_seconds; use crate::model::epoch_millis_to_datetime; use crate::paths::file_modified_time_utc; use crate::telemetry::DbKind; use crate::telemetry::DbTelemetry; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use log::LevelFilter; use serde_json::Value; use sqlx::ConnectOptions; use sqlx::QueryBuilder; use sqlx::Row; use sqlx::Sqlite; use sqlx::SqliteConnection; use sqlx::SqlitePool; use sqlx::migrate::Migrator; use sqlx::sqlite::SqliteAutoVacuum; use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteJournalMode; use sqlx::sqlite::SqlitePoolOptions; use sqlx::sqlite::SqliteSynchronous; use std::collections::BTreeSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::time::Duration; use std::time::Instant; use tracing::warn; mod agent_jobs; mod backfill; mod goals; mod logs; mod memories; mod remote_control; #[cfg(test)] mod test_support; mod threads; pub use goals::GoalStore; pub use goals::ThreadGoalAccountingMode; pub use goals::ThreadGoalAccountingOutcome; pub use goals::ThreadGoalUpdate; pub use remote_control::RemoteControlEnrollmentRecord; pub use threads::ThreadFilterOptions; // "Partition" is the retained-log-content bucket we cap at 10 MiB: // - one bucket per non-null thread_id // - one bucket per threadless (thread_id IS NULL) non-null process_uuid // - one bucket for threadless rows with process_uuid IS NULL // This budget tracks each row's persisted rendered log body plus non-body // metadata, rather than the exact sum of all persisted SQLite column bytes. const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024; const LOG_PARTITION_ROW_LIMIT: i64 = 1_000; #[derive(Clone, Copy)] struct RuntimeDbSpec { label: &'static str, filename: &'static str, kind: DbKind, open_phase: &'static str, migrate_phase: &'static str, } impl RuntimeDbSpec { fn path(self, codex_home: &Path) -> PathBuf { codex_home.join(self.filename) } } const STATE_DB: RuntimeDbSpec = RuntimeDbSpec { label: "state DB", filename: STATE_DB_FILENAME, kind: DbKind::State, open_phase: "open_state", migrate_phase: "migrate_state", }; const LOGS_DB: RuntimeDbSpec = RuntimeDbSpec { label: "log DB", filename: LOGS_DB_FILENAME, kind: DbKind::Logs, open_phase: "open_logs", migrate_phase: "migrate_logs", }; const GOALS_DB: RuntimeDbSpec = RuntimeDbSpec { label: "goals DB", filename: GOALS_DB_FILENAME, kind: DbKind::Goals, open_phase: "open_goals", migrate_phase: "migrate_goals", }; const RUNTIME_DBS: [RuntimeDbSpec; 3] = [STATE_DB, LOGS_DB, GOALS_DB]; #[derive(Clone, Debug, Eq, PartialEq)] pub struct RuntimeDbPath { pub label: &'static str, pub path: PathBuf, } #[derive(Clone)] pub struct StateRuntime { codex_home: PathBuf, default_provider: String, pool: Arc, logs_pool: Arc, thread_goals: GoalStore, thread_updated_at_millis: Arc, } impl StateRuntime { /// Initialize the state runtime using the provided Codex home and default provider. /// /// This opens (and migrates) the SQLite databases under `codex_home`, /// keeping logs in a dedicated file to reduce lock contention with the /// rest of the state store. pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result> { Self::init_inner( codex_home, default_provider, /*telemetry_override*/ None, ) .await } #[cfg(test)] pub(crate) async fn init_with_telemetry_for_tests( codex_home: PathBuf, default_provider: String, telemetry_override: &dyn DbTelemetry, ) -> anyhow::Result> { Self::init_inner(codex_home, default_provider, Some(telemetry_override)).await } async fn init_inner( codex_home: PathBuf, default_provider: String, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; let state_migrator = runtime_state_migrator(); let logs_migrator = runtime_logs_migrator(); let goals_migrator = runtime_goals_migrator(); let state_path = STATE_DB.path(codex_home.as_path()); let logs_path = LOGS_DB.path(codex_home.as_path()); let goals_path = GOALS_DB.path(codex_home.as_path()); let pool = match open_state_sqlite(&state_path, &state_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); return Err(err); } }; let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open logs db at {}: {err}", logs_path.display()); return Err(err); } }; let goals_pool = match open_goals_sqlite(&goals_path, &goals_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open goals db at {}: {err}", goals_path.display()); return Err(err); } }; let started = Instant::now(); let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await; crate::telemetry::record_init_result( telemetry_override, DbKind::State, "ensure_backfill_state", started.elapsed(), &backfill_state_result, ); backfill_state_result?; let started = Instant::now(); let thread_updated_at_millis_result: anyhow::Result> = sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads") .fetch_one(pool.as_ref()) .await .map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, DbKind::State, "post_init_query", started.elapsed(), &thread_updated_at_millis_result, ); let thread_updated_at_millis = thread_updated_at_millis_result?; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); let runtime = Arc::new(Self { thread_goals: GoalStore::new(Arc::clone(&goals_pool)), pool, logs_pool, codex_home, default_provider, thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)), }); if let Err(err) = runtime.run_logs_startup_maintenance().await { warn!( "failed to run startup maintenance for logs db at {}: {err}", logs_path.display(), ); } Ok(runtime) } /// Return the configured Codex home directory for this runtime. pub fn codex_home(&self) -> &Path { self.codex_home.as_path() } pub fn thread_goals(&self) -> &GoalStore { &self.thread_goals } } fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { SqliteConnectOptions::new() .filename(path) .create_if_missing(true) .journal_mode(SqliteJournalMode::Wal) .synchronous(SqliteSynchronous::Normal) .busy_timeout(Duration::from_secs(5)) .log_statements(LevelFilter::Off) } async fn open_state_sqlite( path: &Path, migrator: &Migrator, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { // New state DBs should use incremental auto-vacuum, but retrofitting an // existing DB requires a full VACUUM. Do not attempt that during process // startup: it is maintenance work that can contend with foreground writers. open_sqlite(path, migrator, STATE_DB, telemetry_override).await } async fn open_logs_sqlite( path: &Path, migrator: &Migrator, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { open_sqlite(path, migrator, LOGS_DB, telemetry_override).await } async fn open_goals_sqlite( path: &Path, migrator: &Migrator, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { open_sqlite(path, migrator, GOALS_DB, telemetry_override).await } async fn open_sqlite( path: &Path, migrator: &Migrator, spec: RuntimeDbSpec, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); let started = Instant::now(); let pool_result = SqlitePoolOptions::new() .max_connections(5) .connect_with(options) .await .map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, spec.kind, spec.open_phase, started.elapsed(), &pool_result, ); let pool = pool_result?; let started = Instant::now(); let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, spec.kind, spec.migrate_phase, started.elapsed(), &migrate_result, ); migrate_result?; Ok(pool) } pub(super) async fn ensure_backfill_state_row_in_pool( pool: &sqlx::SqlitePool, ) -> anyhow::Result<()> { sqlx::query( r#" INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) VALUES (?, ?, NULL, NULL, ?) ON CONFLICT(id) DO NOTHING "#, ) .bind(1_i64) .bind(crate::BackfillStatus::Pending.as_str()) .bind(Utc::now().timestamp()) .execute(pool) .await?; Ok(()) } pub fn state_db_filename() -> String { STATE_DB.filename.to_string() } pub fn state_db_path(codex_home: &Path) -> PathBuf { STATE_DB.path(codex_home) } pub fn logs_db_filename() -> String { LOGS_DB.filename.to_string() } pub fn logs_db_path(codex_home: &Path) -> PathBuf { LOGS_DB.path(codex_home) } pub fn goals_db_filename() -> String { GOALS_DB.filename.to_string() } pub fn goals_db_path(codex_home: &Path) -> PathBuf { GOALS_DB.path(codex_home) } pub fn runtime_db_paths(codex_home: &Path) -> Vec { RUNTIME_DBS .iter() .map(|spec| RuntimeDbPath { label: spec.label, path: spec.path(codex_home), }) .collect() } /// Run SQLite's built-in integrity check against an existing database file. pub async fn sqlite_integrity_check(path: &Path) -> anyhow::Result> { let options = SqliteConnectOptions::new() .filename(path) .create_if_missing(false) .read_only(true) .log_statements(LevelFilter::Off); let pool = SqlitePoolOptions::new() .max_connections(1) .connect_with(options) .await?; let rows = sqlx::query_scalar::<_, String>("PRAGMA integrity_check") .fetch_all(&pool) .await?; pool.close().await; Ok(rows) } #[cfg(test)] mod tests { use super::StateRuntime; use super::open_state_sqlite; use super::runtime_state_migrator; use super::sqlite_integrity_check; use super::state_db_path; use super::test_support::unique_temp_dir; use crate::DB_INIT_METRIC; use crate::DbTelemetry; use crate::migrations::STATE_MIGRATOR; use pretty_assertions::assert_eq; use sqlx::SqlitePool; use sqlx::migrate::MigrateError; use sqlx::sqlite::SqliteConnectOptions; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::path::Path; use std::sync::Mutex; #[derive(Default)] struct TestTelemetry { counters: Mutex>, } #[derive(Debug, Eq, PartialEq)] struct MetricEvent { name: String, tags: BTreeMap, } impl TestTelemetry { fn counters(&self) -> Vec { self.counters .lock() .expect("telemetry lock") .iter() .map(|event| MetricEvent { name: event.name.clone(), tags: event.tags.clone(), }) .collect() } } impl DbTelemetry for TestTelemetry { fn counter(&self, name: &str, _inc: i64, tags: &[(&str, &str)]) { self.counters .lock() .expect("telemetry lock") .push(MetricEvent { name: name.to_string(), tags: tags_to_map(tags), }); } fn record_duration( &self, _name: &str, _duration: std::time::Duration, _tags: &[(&str, &str)], ) { } } fn tags_to_map(tags: &[(&str, &str)]) -> BTreeMap { tags.iter() .map(|(key, value)| ((*key).to_string(), (*value).to_string())) .collect() } async fn open_db_pool(path: &Path) -> SqlitePool { SqlitePool::connect_with( SqliteConnectOptions::new() .filename(path) .create_if_missing(false), ) .await .expect("open sqlite pool") } #[tokio::test] async fn sqlite_integrity_check_reports_ok_for_valid_db() { let codex_home = unique_temp_dir(); tokio::fs::create_dir_all(&codex_home) .await .expect("create codex home"); let path = state_db_path(codex_home.as_path()); let pool = SqlitePool::connect_with( SqliteConnectOptions::new() .filename(&path) .create_if_missing(true), ) .await .expect("open sqlite db"); sqlx::query("CREATE TABLE sample (id INTEGER PRIMARY KEY)") .execute(&pool) .await .expect("create sample table"); pool.close().await; let result = sqlite_integrity_check(&path) .await .expect("integrity check should run"); assert_eq!(result, vec!["ok".to_string()]); let _ = tokio::fs::remove_dir_all(codex_home).await; } #[tokio::test] async fn open_state_sqlite_tolerates_newer_applied_migrations() { let codex_home = unique_temp_dir(); tokio::fs::create_dir_all(&codex_home) .await .expect("create codex home"); let state_path = state_db_path(codex_home.as_path()); let pool = SqlitePool::connect_with( SqliteConnectOptions::new() .filename(&state_path) .create_if_missing(true), ) .await .expect("open state db"); STATE_MIGRATOR .run(&pool) .await .expect("apply current state schema"); sqlx::query( "INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)", ) .bind(9_999_i64) .bind("future migration") .bind(true) .bind(vec![1_u8, 2, 3, 4]) .bind(1_i64) .execute(&pool) .await .expect("insert future migration record"); pool.close().await; let strict_pool = open_db_pool(state_path.as_path()).await; let strict_err = STATE_MIGRATOR .run(&strict_pool) .await .expect_err("strict migrator should reject newer applied migrations"); assert!(matches!(strict_err, MigrateError::VersionMissing(9_999))); strict_pool.close().await; let tolerant_migrator = runtime_state_migrator(); let tolerant_pool = open_state_sqlite( state_path.as_path(), &tolerant_migrator, /*telemetry_override*/ None, ) .await .expect("runtime migrator should tolerate newer applied migrations"); tolerant_pool.close().await; let _ = tokio::fs::remove_dir_all(codex_home).await; } #[tokio::test] async fn init_records_successful_sqlite_init_phases_to_explicit_telemetry() { let codex_home = unique_temp_dir(); let telemetry = TestTelemetry::default(); let runtime = StateRuntime::init_with_telemetry_for_tests( codex_home.clone(), "test-provider".to_string(), &telemetry, ) .await .expect("state runtime should initialize"); let phases = telemetry .counters() .into_iter() .filter(|event| event.name == DB_INIT_METRIC) .filter(|event| event.tags.get("status").map(String::as_str) == Some("success")) .filter_map(|event| event.tags.get("phase").cloned()) .collect::>(); let expected = [ "open_state", "migrate_state", "open_logs", "migrate_logs", "open_goals", "migrate_goals", "ensure_backfill_state", "post_init_query", ] .into_iter() .map(str::to_string) .collect::>(); assert_eq!(phases, expected); runtime.pool.close().await; runtime.logs_pool.close().await; let _ = tokio::fs::remove_dir_all(codex_home).await; } }