Files
codex/codex-rs/state/src/runtime.rs
2026-05-11 14:44:39 +01:00

318 lines
10 KiB
Rust

use crate::AgentJob;
use crate::AgentJobCreateParams;
use crate::AgentJobItem;
use crate::AgentJobItemCreateParams;
use crate::AgentJobItemStatus;
use crate::AgentJobProgress;
use crate::AgentJobStatus;
use crate::LOGS_DB_FILENAME;
use crate::LogEntry;
use crate::LogQuery;
use crate::LogRow;
use crate::STATE_DB_FILENAME;
use crate::SortKey;
use crate::ThreadMetadata;
use crate::ThreadMetadataBuilder;
use crate::ThreadsPage;
use crate::apply_rollout_item;
use crate::migrations::runtime_logs_migrator;
use crate::migrations::runtime_state_migrator;
use crate::model::AgentJobRow;
use crate::model::ThreadGoalRow;
use crate::model::ThreadRow;
use crate::model::anchor_from_item;
use crate::model::datetime_to_epoch_millis;
use crate::model::datetime_to_epoch_seconds;
use crate::model::epoch_millis_to_datetime;
use crate::paths::file_modified_time_utc;
use crate::telemetry::DbKind;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use log::LevelFilter;
use serde_json::Value;
use sqlx::ConnectOptions;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Sqlite;
use sqlx::SqliteConnection;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqliteAutoVacuum;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::sqlite::SqliteSynchronous;
use std::collections::BTreeSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::time::Duration;
use std::time::Instant;
use tracing::warn;
mod agent_jobs;
mod backfill;
mod goals;
mod logs;
mod memories;
mod remote_control;
#[cfg(test)]
mod test_support;
mod threads;
pub use goals::ThreadGoalAccountingMode;
pub use goals::ThreadGoalAccountingOutcome;
pub use goals::ThreadGoalUpdate;
pub use remote_control::RemoteControlEnrollmentRecord;
pub use threads::ThreadFilterOptions;
// "Partition" is the retained-log-content bucket we cap at 10 MiB:
// - one bucket per non-null thread_id
// - one bucket per threadless (thread_id IS NULL) non-null process_uuid
// - one bucket for threadless rows with process_uuid IS NULL
// This budget tracks each row's persisted rendered log body plus non-body
// metadata, rather than the exact sum of all persisted SQLite column bytes.
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
#[derive(Clone)]
pub struct StateRuntime {
codex_home: PathBuf,
default_provider: String,
pool: Arc<sqlx::SqlitePool>,
logs_pool: Arc<sqlx::SqlitePool>,
thread_updated_at_millis: Arc<AtomicI64>,
}
impl StateRuntime {
/// Initialize the state runtime using the provided Codex home and default provider.
///
/// This opens (and migrates) the SQLite databases under `codex_home`,
/// keeping logs in a dedicated file to reduce lock contention with the
/// rest of the state store.
pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result<Arc<Self>> {
tokio::fs::create_dir_all(&codex_home).await?;
let state_migrator = runtime_state_migrator();
let logs_migrator = runtime_logs_migrator();
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let pool = match open_state_sqlite(&state_path, &state_migrator).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open state db at {}: {err}", state_path.display());
return Err(err);
}
};
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
return Err(err);
}
};
let started = Instant::now();
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
crate::telemetry::record_init_result(
None,
DbKind::State,
"ensure_backfill_state",
started.elapsed(),
&backfill_state_result,
);
backfill_state_result?;
let started = Instant::now();
let thread_updated_at_millis_result: anyhow::Result<Option<i64>> =
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
.fetch_one(pool.as_ref())
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
None,
DbKind::State,
"post_init_query",
started.elapsed(),
&thread_updated_at_millis_result,
);
let thread_updated_at_millis = thread_updated_at_millis_result?;
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
let runtime = Arc::new(Self {
pool,
logs_pool,
codex_home,
default_provider,
thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)),
});
if let Err(err) = runtime.run_logs_startup_maintenance().await {
warn!(
"failed to run startup maintenance for logs db at {}: {err}",
logs_path.display(),
);
}
Ok(runtime)
}
/// Return the configured Codex home directory for this runtime.
pub fn codex_home(&self) -> &Path {
self.codex_home.as_path()
}
}
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off)
}
async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
// New state DBs should use incremental auto-vacuum, but retrofitting an
// existing DB requires a full VACUUM. Do not attempt that during process
// startup: it is maintenance work that can contend with foreground writers.
open_sqlite(path, migrator, DbKind::State, "open_state", "migrate_state").await
}
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
open_sqlite(path, migrator, DbKind::Logs, "open_logs", "migrate_logs").await
}
async fn open_sqlite(
path: &Path,
migrator: &Migrator,
db: DbKind,
open_phase: &'static str,
migrate_phase: &'static str,
) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let started = Instant::now();
let pool_result = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(None, db, open_phase, started.elapsed(), &pool_result);
let pool = pool_result?;
let started = Instant::now();
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
None,
db,
migrate_phase,
started.elapsed(),
&migrate_result,
);
migrate_result?;
Ok(pool)
}
pub(super) async fn ensure_backfill_state_row_in_pool(
pool: &sqlx::SqlitePool,
) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(1_i64)
.bind(crate::BackfillStatus::Pending.as_str())
.bind(Utc::now().timestamp())
.execute(pool)
.await?;
Ok(())
}
pub fn state_db_filename() -> String {
STATE_DB_FILENAME.to_string()
}
pub fn state_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(state_db_filename())
}
pub fn logs_db_filename() -> String {
LOGS_DB_FILENAME.to_string()
}
pub fn logs_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(logs_db_filename())
}
#[cfg(test)]
mod tests {
use super::open_state_sqlite;
use super::runtime_state_migrator;
use super::state_db_path;
use super::test_support::unique_temp_dir;
use crate::migrations::STATE_MIGRATOR;
use sqlx::SqlitePool;
use sqlx::migrate::MigrateError;
use sqlx::sqlite::SqliteConnectOptions;
use std::path::Path;
async fn open_db_pool(path: &Path) -> SqlitePool {
SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false),
)
.await
.expect("open sqlite pool")
}
#[tokio::test]
async fn open_state_sqlite_tolerates_newer_applied_migrations() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let state_path = state_db_path(codex_home.as_path());
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(true),
)
.await
.expect("open state db");
STATE_MIGRATOR
.run(&pool)
.await
.expect("apply current state schema");
sqlx::query(
"INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)",
)
.bind(9_999_i64)
.bind("future migration")
.bind(true)
.bind(vec![1_u8, 2, 3, 4])
.bind(1_i64)
.execute(&pool)
.await
.expect("insert future migration record");
pool.close().await;
let strict_pool = open_db_pool(state_path.as_path()).await;
let strict_err = STATE_MIGRATOR
.run(&strict_pool)
.await
.expect_err("strict migrator should reject newer applied migrations");
assert!(matches!(strict_err, MigrateError::VersionMissing(9_999)));
strict_pool.close().await;
let tolerant_migrator = runtime_state_migrator();
let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator)
.await
.expect("runtime migrator should tolerate newer applied migrations");
tolerant_pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}