Files
codex/codex-rs/state/src/runtime.rs
jif-oai b401666ca5 Add process-scoped SQLite telemetry (#22154)
## Summary
- add SQLite init, backfill-gate, and fallback telemetry without
introducing a cross-cutting state-db access wrapper
- install one process-scoped telemetry sink after OTEL startup and let
low-level state/rollout paths emit through it directly
- add process-start metrics for the process owners that initialize
SQLite

---------

Co-authored-by: Owen Lin <owen@openai.com>
2026-05-11 11:32:40 -07:00

473 lines
14 KiB
Rust

use crate::AgentJob;
use crate::AgentJobCreateParams;
use crate::AgentJobItem;
use crate::AgentJobItemCreateParams;
use crate::AgentJobItemStatus;
use crate::AgentJobProgress;
use crate::AgentJobStatus;
use crate::LOGS_DB_FILENAME;
use crate::LogEntry;
use crate::LogQuery;
use crate::LogRow;
use crate::STATE_DB_FILENAME;
use crate::SortKey;
use crate::ThreadMetadata;
use crate::ThreadMetadataBuilder;
use crate::ThreadsPage;
use crate::apply_rollout_item;
use crate::migrations::runtime_logs_migrator;
use crate::migrations::runtime_state_migrator;
use crate::model::AgentJobRow;
use crate::model::ThreadGoalRow;
use crate::model::ThreadRow;
use crate::model::anchor_from_item;
use crate::model::datetime_to_epoch_millis;
use crate::model::datetime_to_epoch_seconds;
use crate::model::epoch_millis_to_datetime;
use crate::paths::file_modified_time_utc;
use crate::telemetry::DbKind;
use crate::telemetry::DbTelemetry;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use log::LevelFilter;
use serde_json::Value;
use sqlx::ConnectOptions;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Sqlite;
use sqlx::SqliteConnection;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqliteAutoVacuum;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::sqlite::SqliteSynchronous;
use std::collections::BTreeSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::time::Duration;
use std::time::Instant;
use tracing::warn;
mod agent_jobs;
mod backfill;
mod goals;
mod logs;
mod memories;
mod remote_control;
#[cfg(test)]
mod test_support;
mod threads;
pub use goals::ThreadGoalAccountingMode;
pub use goals::ThreadGoalAccountingOutcome;
pub use goals::ThreadGoalUpdate;
pub use remote_control::RemoteControlEnrollmentRecord;
pub use threads::ThreadFilterOptions;
// "Partition" is the retained-log-content bucket we cap at 10 MiB:
// - one bucket per non-null thread_id
// - one bucket per threadless (thread_id IS NULL) non-null process_uuid
// - one bucket for threadless rows with process_uuid IS NULL
// This budget tracks each row's persisted rendered log body plus non-body
// metadata, rather than the exact sum of all persisted SQLite column bytes.
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
#[derive(Clone)]
pub struct StateRuntime {
codex_home: PathBuf,
default_provider: String,
pool: Arc<sqlx::SqlitePool>,
logs_pool: Arc<sqlx::SqlitePool>,
thread_updated_at_millis: Arc<AtomicI64>,
}
impl StateRuntime {
/// Initialize the state runtime using the provided Codex home and default provider.
///
/// This opens (and migrates) the SQLite databases under `codex_home`,
/// keeping logs in a dedicated file to reduce lock contention with the
/// rest of the state store.
pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result<Arc<Self>> {
Self::init_inner(
codex_home,
default_provider,
/*telemetry_override*/ None,
)
.await
}
#[cfg(test)]
pub(crate) async fn init_with_telemetry_for_tests(
codex_home: PathBuf,
default_provider: String,
telemetry_override: &dyn DbTelemetry,
) -> anyhow::Result<Arc<Self>> {
Self::init_inner(codex_home, default_provider, Some(telemetry_override)).await
}
async fn init_inner(
codex_home: PathBuf,
default_provider: String,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<Arc<Self>> {
tokio::fs::create_dir_all(&codex_home).await?;
let state_migrator = runtime_state_migrator();
let logs_migrator = runtime_logs_migrator();
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let pool = match open_state_sqlite(&state_path, &state_migrator, telemetry_override).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open state db at {}: {err}", state_path.display());
return Err(err);
}
};
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator, telemetry_override).await
{
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
return Err(err);
}
};
let started = Instant::now();
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
crate::telemetry::record_init_result(
telemetry_override,
DbKind::State,
"ensure_backfill_state",
started.elapsed(),
&backfill_state_result,
);
backfill_state_result?;
let started = Instant::now();
let thread_updated_at_millis_result: anyhow::Result<Option<i64>> =
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
.fetch_one(pool.as_ref())
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
telemetry_override,
DbKind::State,
"post_init_query",
started.elapsed(),
&thread_updated_at_millis_result,
);
let thread_updated_at_millis = thread_updated_at_millis_result?;
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
let runtime = Arc::new(Self {
pool,
logs_pool,
codex_home,
default_provider,
thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)),
});
if let Err(err) = runtime.run_logs_startup_maintenance().await {
warn!(
"failed to run startup maintenance for logs db at {}: {err}",
logs_path.display(),
);
}
Ok(runtime)
}
/// Return the configured Codex home directory for this runtime.
pub fn codex_home(&self) -> &Path {
self.codex_home.as_path()
}
}
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off)
}
async fn open_state_sqlite(
path: &Path,
migrator: &Migrator,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
// New state DBs should use incremental auto-vacuum, but retrofitting an
// existing DB requires a full VACUUM. Do not attempt that during process
// startup: it is maintenance work that can contend with foreground writers.
open_sqlite(
path,
migrator,
DbKind::State,
"open_state",
"migrate_state",
telemetry_override,
)
.await
}
async fn open_logs_sqlite(
path: &Path,
migrator: &Migrator,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
open_sqlite(
path,
migrator,
DbKind::Logs,
"open_logs",
"migrate_logs",
telemetry_override,
)
.await
}
async fn open_sqlite(
path: &Path,
migrator: &Migrator,
db: DbKind,
open_phase: &'static str,
migrate_phase: &'static str,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let started = Instant::now();
let pool_result = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
telemetry_override,
db,
open_phase,
started.elapsed(),
&pool_result,
);
let pool = pool_result?;
let started = Instant::now();
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
telemetry_override,
db,
migrate_phase,
started.elapsed(),
&migrate_result,
);
migrate_result?;
Ok(pool)
}
pub(super) async fn ensure_backfill_state_row_in_pool(
pool: &sqlx::SqlitePool,
) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(1_i64)
.bind(crate::BackfillStatus::Pending.as_str())
.bind(Utc::now().timestamp())
.execute(pool)
.await?;
Ok(())
}
pub fn state_db_filename() -> String {
STATE_DB_FILENAME.to_string()
}
pub fn state_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(state_db_filename())
}
pub fn logs_db_filename() -> String {
LOGS_DB_FILENAME.to_string()
}
pub fn logs_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(logs_db_filename())
}
#[cfg(test)]
mod tests {
use super::StateRuntime;
use super::open_state_sqlite;
use super::runtime_state_migrator;
use super::state_db_path;
use super::test_support::unique_temp_dir;
use crate::DB_INIT_METRIC;
use crate::DbTelemetry;
use crate::migrations::STATE_MIGRATOR;
use pretty_assertions::assert_eq;
use sqlx::SqlitePool;
use sqlx::migrate::MigrateError;
use sqlx::sqlite::SqliteConnectOptions;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::path::Path;
use std::sync::Mutex;
#[derive(Default)]
struct TestTelemetry {
counters: Mutex<Vec<MetricEvent>>,
}
#[derive(Debug, Eq, PartialEq)]
struct MetricEvent {
name: String,
tags: BTreeMap<String, String>,
}
impl TestTelemetry {
fn counters(&self) -> Vec<MetricEvent> {
self.counters
.lock()
.expect("telemetry lock")
.iter()
.map(|event| MetricEvent {
name: event.name.clone(),
tags: event.tags.clone(),
})
.collect()
}
}
impl DbTelemetry for TestTelemetry {
fn counter(&self, name: &str, _inc: i64, tags: &[(&str, &str)]) {
self.counters
.lock()
.expect("telemetry lock")
.push(MetricEvent {
name: name.to_string(),
tags: tags_to_map(tags),
});
}
fn record_duration(
&self,
_name: &str,
_duration: std::time::Duration,
_tags: &[(&str, &str)],
) {
}
}
fn tags_to_map(tags: &[(&str, &str)]) -> BTreeMap<String, String> {
tags.iter()
.map(|(key, value)| ((*key).to_string(), (*value).to_string()))
.collect()
}
async fn open_db_pool(path: &Path) -> SqlitePool {
SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false),
)
.await
.expect("open sqlite pool")
}
#[tokio::test]
async fn open_state_sqlite_tolerates_newer_applied_migrations() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let state_path = state_db_path(codex_home.as_path());
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(true),
)
.await
.expect("open state db");
STATE_MIGRATOR
.run(&pool)
.await
.expect("apply current state schema");
sqlx::query(
"INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)",
)
.bind(9_999_i64)
.bind("future migration")
.bind(true)
.bind(vec![1_u8, 2, 3, 4])
.bind(1_i64)
.execute(&pool)
.await
.expect("insert future migration record");
pool.close().await;
let strict_pool = open_db_pool(state_path.as_path()).await;
let strict_err = STATE_MIGRATOR
.run(&strict_pool)
.await
.expect_err("strict migrator should reject newer applied migrations");
assert!(matches!(strict_err, MigrateError::VersionMissing(9_999)));
strict_pool.close().await;
let tolerant_migrator = runtime_state_migrator();
let tolerant_pool = open_state_sqlite(
state_path.as_path(),
&tolerant_migrator,
/*telemetry_override*/ None,
)
.await
.expect("runtime migrator should tolerate newer applied migrations");
tolerant_pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn init_records_successful_sqlite_init_phases_to_explicit_telemetry() {
let codex_home = unique_temp_dir();
let telemetry = TestTelemetry::default();
let runtime = StateRuntime::init_with_telemetry_for_tests(
codex_home.clone(),
"test-provider".to_string(),
&telemetry,
)
.await
.expect("state runtime should initialize");
let phases = telemetry
.counters()
.into_iter()
.filter(|event| event.name == DB_INIT_METRIC)
.filter(|event| event.tags.get("status").map(String::as_str) == Some("success"))
.filter_map(|event| event.tags.get("phase").cloned())
.collect::<BTreeSet<_>>();
let expected = [
"open_state",
"migrate_state",
"open_logs",
"migrate_logs",
"ensure_backfill_state",
"post_init_query",
]
.into_iter()
.map(str::to_string)
.collect::<BTreeSet<_>>();
assert_eq!(phases, expected);
runtime.pool.close().await;
runtime.logs_pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}