use crate::AgentJob; use crate::AgentJobCreateParams; use crate::AgentJobItem; use crate::AgentJobItemCreateParams; use crate::AgentJobItemStatus; use crate::AgentJobProgress; use crate::AgentJobStatus; use crate::LOGS_DB_FILENAME; use crate::LogEntry; use crate::LogQuery; use crate::LogRow; use crate::STATE_DB_FILENAME; use crate::SortKey; use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; use crate::ThreadsPage; use crate::apply_rollout_item; use crate::migrations::runtime_logs_migrator; use crate::migrations::runtime_state_migrator; use crate::model::AgentJobRow; use crate::model::ThreadGoalRow; use crate::model::ThreadRow; use crate::model::anchor_from_item; use crate::model::datetime_to_epoch_millis; use crate::model::datetime_to_epoch_seconds; use crate::model::epoch_millis_to_datetime; use crate::paths::file_modified_time_utc; use crate::telemetry::DbKind; use crate::telemetry::DbTelemetry; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use log::LevelFilter; use serde_json::Value; use sqlx::ConnectOptions; use sqlx::QueryBuilder; use sqlx::Row; use sqlx::Sqlite; use sqlx::SqliteConnection; use sqlx::SqlitePool; use sqlx::migrate::Migrator; use sqlx::sqlite::SqliteAutoVacuum; use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteJournalMode; use sqlx::sqlite::SqlitePoolOptions; use sqlx::sqlite::SqliteSynchronous; use std::collections::BTreeSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::time::Duration; use std::time::Instant; use tracing::warn; mod agent_jobs; mod backfill; mod goals; mod logs; mod memories; mod remote_control; #[cfg(test)] mod test_support; mod threads; pub use goals::ThreadGoalAccountingMode; pub use goals::ThreadGoalAccountingOutcome; pub use goals::ThreadGoalUpdate; pub use remote_control::RemoteControlEnrollmentRecord; pub use threads::ThreadFilterOptions; // "Partition" is the retained-log-content bucket we cap at 10 MiB: // - one bucket per non-null thread_id // - one bucket per threadless (thread_id IS NULL) non-null process_uuid // - one bucket for threadless rows with process_uuid IS NULL // This budget tracks each row's persisted rendered log body plus non-body // metadata, rather than the exact sum of all persisted SQLite column bytes. const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024; const LOG_PARTITION_ROW_LIMIT: i64 = 1_000; #[derive(Clone)] pub struct StateRuntime { codex_home: PathBuf, default_provider: String, pool: Arc, logs_pool: Arc, thread_updated_at_millis: Arc, } impl StateRuntime { /// Initialize the state runtime using the provided Codex home and default provider. /// /// This opens (and migrates) the SQLite databases under `codex_home`, /// keeping logs in a dedicated file to reduce lock contention with the /// rest of the state store. pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result> { Self::init_inner( codex_home, default_provider, /*telemetry_override*/ None, ) .await } #[cfg(test)] pub(crate) async fn init_with_telemetry_for_tests( codex_home: PathBuf, default_provider: String, telemetry_override: &dyn DbTelemetry, ) -> anyhow::Result> { Self::init_inner(codex_home, default_provider, Some(telemetry_override)).await } async fn init_inner( codex_home: PathBuf, default_provider: String, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; let state_migrator = runtime_state_migrator(); let logs_migrator = runtime_logs_migrator(); let state_path = state_db_path(codex_home.as_path()); let logs_path = logs_db_path(codex_home.as_path()); let pool = match open_state_sqlite(&state_path, &state_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); return Err(err); } }; let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open logs db at {}: {err}", logs_path.display()); return Err(err); } }; let started = Instant::now(); let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await; crate::telemetry::record_init_result( telemetry_override, DbKind::State, "ensure_backfill_state", started.elapsed(), &backfill_state_result, ); backfill_state_result?; let started = Instant::now(); let thread_updated_at_millis_result: anyhow::Result> = sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads") .fetch_one(pool.as_ref()) .await .map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, DbKind::State, "post_init_query", started.elapsed(), &thread_updated_at_millis_result, ); let thread_updated_at_millis = thread_updated_at_millis_result?; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); let runtime = Arc::new(Self { pool, logs_pool, codex_home, default_provider, thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)), }); if let Err(err) = runtime.run_logs_startup_maintenance().await { warn!( "failed to run startup maintenance for logs db at {}: {err}", logs_path.display(), ); } Ok(runtime) } /// Return the configured Codex home directory for this runtime. pub fn codex_home(&self) -> &Path { self.codex_home.as_path() } } fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { SqliteConnectOptions::new() .filename(path) .create_if_missing(true) .journal_mode(SqliteJournalMode::Wal) .synchronous(SqliteSynchronous::Normal) .busy_timeout(Duration::from_secs(5)) .log_statements(LevelFilter::Off) } async fn open_state_sqlite( path: &Path, migrator: &Migrator, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { // New state DBs should use incremental auto-vacuum, but retrofitting an // existing DB requires a full VACUUM. Do not attempt that during process // startup: it is maintenance work that can contend with foreground writers. open_sqlite( path, migrator, DbKind::State, "open_state", "migrate_state", telemetry_override, ) .await } async fn open_logs_sqlite( path: &Path, migrator: &Migrator, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { open_sqlite( path, migrator, DbKind::Logs, "open_logs", "migrate_logs", telemetry_override, ) .await } async fn open_sqlite( path: &Path, migrator: &Migrator, db: DbKind, open_phase: &'static str, migrate_phase: &'static str, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); let started = Instant::now(); let pool_result = SqlitePoolOptions::new() .max_connections(5) .connect_with(options) .await .map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, db, open_phase, started.elapsed(), &pool_result, ); let pool = pool_result?; let started = Instant::now(); let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, db, migrate_phase, started.elapsed(), &migrate_result, ); migrate_result?; Ok(pool) } pub(super) async fn ensure_backfill_state_row_in_pool( pool: &sqlx::SqlitePool, ) -> anyhow::Result<()> { sqlx::query( r#" INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) VALUES (?, ?, NULL, NULL, ?) ON CONFLICT(id) DO NOTHING "#, ) .bind(1_i64) .bind(crate::BackfillStatus::Pending.as_str()) .bind(Utc::now().timestamp()) .execute(pool) .await?; Ok(()) } pub fn state_db_filename() -> String { STATE_DB_FILENAME.to_string() } pub fn state_db_path(codex_home: &Path) -> PathBuf { codex_home.join(state_db_filename()) } pub fn logs_db_filename() -> String { LOGS_DB_FILENAME.to_string() } pub fn logs_db_path(codex_home: &Path) -> PathBuf { codex_home.join(logs_db_filename()) } #[cfg(test)] mod tests { use super::StateRuntime; use super::open_state_sqlite; use super::runtime_state_migrator; use super::state_db_path; use super::test_support::unique_temp_dir; use crate::DB_INIT_METRIC; use crate::DbTelemetry; use crate::migrations::STATE_MIGRATOR; use pretty_assertions::assert_eq; use sqlx::SqlitePool; use sqlx::migrate::MigrateError; use sqlx::sqlite::SqliteConnectOptions; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::path::Path; use std::sync::Mutex; #[derive(Default)] struct TestTelemetry { counters: Mutex>, } #[derive(Debug, Eq, PartialEq)] struct MetricEvent { name: String, tags: BTreeMap, } impl TestTelemetry { fn counters(&self) -> Vec { self.counters .lock() .expect("telemetry lock") .iter() .map(|event| MetricEvent { name: event.name.clone(), tags: event.tags.clone(), }) .collect() } } impl DbTelemetry for TestTelemetry { fn counter(&self, name: &str, _inc: i64, tags: &[(&str, &str)]) { self.counters .lock() .expect("telemetry lock") .push(MetricEvent { name: name.to_string(), tags: tags_to_map(tags), }); } fn record_duration( &self, _name: &str, _duration: std::time::Duration, _tags: &[(&str, &str)], ) { } } fn tags_to_map(tags: &[(&str, &str)]) -> BTreeMap { tags.iter() .map(|(key, value)| ((*key).to_string(), (*value).to_string())) .collect() } async fn open_db_pool(path: &Path) -> SqlitePool { SqlitePool::connect_with( SqliteConnectOptions::new() .filename(path) .create_if_missing(false), ) .await .expect("open sqlite pool") } #[tokio::test] async fn open_state_sqlite_tolerates_newer_applied_migrations() { let codex_home = unique_temp_dir(); tokio::fs::create_dir_all(&codex_home) .await .expect("create codex home"); let state_path = state_db_path(codex_home.as_path()); let pool = SqlitePool::connect_with( SqliteConnectOptions::new() .filename(&state_path) .create_if_missing(true), ) .await .expect("open state db"); STATE_MIGRATOR .run(&pool) .await .expect("apply current state schema"); sqlx::query( "INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)", ) .bind(9_999_i64) .bind("future migration") .bind(true) .bind(vec![1_u8, 2, 3, 4]) .bind(1_i64) .execute(&pool) .await .expect("insert future migration record"); pool.close().await; let strict_pool = open_db_pool(state_path.as_path()).await; let strict_err = STATE_MIGRATOR .run(&strict_pool) .await .expect_err("strict migrator should reject newer applied migrations"); assert!(matches!(strict_err, MigrateError::VersionMissing(9_999))); strict_pool.close().await; let tolerant_migrator = runtime_state_migrator(); let tolerant_pool = open_state_sqlite( state_path.as_path(), &tolerant_migrator, /*telemetry_override*/ None, ) .await .expect("runtime migrator should tolerate newer applied migrations"); tolerant_pool.close().await; let _ = tokio::fs::remove_dir_all(codex_home).await; } #[tokio::test] async fn init_records_successful_sqlite_init_phases_to_explicit_telemetry() { let codex_home = unique_temp_dir(); let telemetry = TestTelemetry::default(); let runtime = StateRuntime::init_with_telemetry_for_tests( codex_home.clone(), "test-provider".to_string(), &telemetry, ) .await .expect("state runtime should initialize"); let phases = telemetry .counters() .into_iter() .filter(|event| event.name == DB_INIT_METRIC) .filter(|event| event.tags.get("status").map(String::as_str) == Some("success")) .filter_map(|event| event.tags.get("phase").cloned()) .collect::>(); let expected = [ "open_state", "migrate_state", "open_logs", "migrate_logs", "ensure_backfill_state", "post_init_query", ] .into_iter() .map(str::to_string) .collect::>(); assert_eq!(phases, expected); runtime.pool.close().await; runtime.logs_pool.close().await; let _ = tokio::fs::remove_dir_all(codex_home).await; } }