feat: log db better maintenance (#16330)

Run a DB clean-up more frequently with an incremental `VACCUM` in it
This commit is contained in:
jif-oai
2026-03-31 19:15:44 +02:00
committed by GitHub
parent f396454097
commit 868ac158d7
4 changed files with 125 additions and 20 deletions

View File

@@ -55,7 +55,7 @@ pub use runtime::state_db_path;
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
pub const LOGS_DB_FILENAME: &str = "logs";
pub const LOGS_DB_VERSION: u32 = 1;
pub const LOGS_DB_VERSION: u32 = 2;
pub const STATE_DB_FILENAME: &str = "state";
pub const STATE_DB_VERSION: u32 = 5;

View File

@@ -18,8 +18,6 @@
//! # }
//! ```
use chrono::Duration as ChronoDuration;
use chrono::Utc;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::SystemTime;
@@ -47,8 +45,6 @@ use crate::StateRuntime;
const LOG_QUEUE_CAPACITY: usize = 512;
const LOG_BATCH_SIZE: usize = 128;
const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
const LOG_RETENTION_DAYS: i64 = 10;
pub struct LogDbLayer {
sender: mpsc::Sender<LogDbCommand>,
process_uuid: String,
@@ -58,7 +54,6 @@ pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
let process_uuid = current_process_log_uuid().to_string();
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
tokio::spawn(run_retention_cleanup(state_db));
LogDbLayer {
sender,
@@ -337,14 +332,6 @@ async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntr
let _ = state_db.insert_logs(entries.as_slice()).await;
}
async fn run_retention_cleanup(state_db: std::sync::Arc<StateRuntime>) {
let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS))
else {
return;
};
let _ = state_db.delete_logs_before(cutoff.timestamp()).await;
}
#[derive(Default)]
struct MessageVisitor {
message: Option<String>,

View File

@@ -38,6 +38,7 @@ use sqlx::Sqlite;
use sqlx::SqliteConnection;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqliteAutoVacuum;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
@@ -100,14 +101,14 @@ impl StateRuntime {
.await;
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let pool = match open_sqlite(&state_path, &STATE_MIGRATOR).await {
let pool = match open_state_sqlite(&state_path, &STATE_MIGRATOR).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open state db at {}: {err}", state_path.display());
return Err(err);
}
};
let logs_pool = match open_sqlite(&logs_path, &LOGS_MIGRATOR).await {
let logs_pool = match open_logs_sqlite(&logs_path, &LOGS_MIGRATOR).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
@@ -120,6 +121,12 @@ impl StateRuntime {
codex_home,
default_provider,
});
if let Err(err) = runtime.run_logs_startup_maintenance().await {
warn!(
"failed to run startup maintenance for logs db at {}: {err}",
logs_path.display(),
);
}
Ok(runtime)
}
@@ -129,14 +136,28 @@ impl StateRuntime {
}
}
async fn open_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off);
.log_statements(LevelFilter::Off)
}
async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
migrator.run(&pool).await?;
Ok(pool)
}
async fn open_logs_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)

View File

@@ -1,5 +1,7 @@
use super::*;
const LOG_RETENTION_DAYS: i64 = 10;
impl StateRuntime {
pub async fn insert_log(&self, entry: &LogEntry) -> anyhow::Result<()> {
self.insert_logs(std::slice::from_ref(entry)).await
@@ -291,6 +293,22 @@ WHERE id IN (
Ok(result.rows_affected())
}
pub(crate) async fn run_logs_startup_maintenance(&self) -> anyhow::Result<()> {
let Some(cutoff) =
Utc::now().checked_sub_signed(chrono::Duration::days(LOG_RETENTION_DAYS))
else {
return Ok(());
};
self.delete_logs_before(cutoff.timestamp()).await?;
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(self.logs_pool.as_ref())
.await?;
sqlx::query("PRAGMA incremental_vacuum")
.execute(self.logs_pool.as_ref())
.await?;
Ok(())
}
/// Query logs with optional filters.
pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result<Vec<LogRow>> {
let mut builder = QueryBuilder::<Sqlite>::new(
@@ -520,6 +538,7 @@ mod tests {
use crate::logs_db_path;
use crate::migrations::LOGS_MIGRATOR;
use crate::state_db_path;
use chrono::Utc;
use pretty_assertions::assert_eq;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
@@ -607,7 +626,7 @@ mod tests {
sqlx::query(
"INSERT INTO logs (ts, ts_nanos, level, target, message, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(1_i64)
.bind(Utc::now().timestamp())
.bind(0_i64)
.bind("INFO")
.bind("cli")
@@ -676,6 +695,84 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn init_recreates_legacy_logs_db_when_log_version_changes() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let legacy_logs_path = codex_home.join("logs_1.sqlite");
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&legacy_logs_path)
.create_if_missing(true),
)
.await
.expect("open legacy logs db");
LOGS_MIGRATOR
.run(&pool)
.await
.expect("apply legacy logs schema");
sqlx::query(
"INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(1_i64)
.bind(0_i64)
.bind("INFO")
.bind("cli")
.bind("legacy-log-row")
.bind("mod")
.bind("main.rs")
.bind(7_i64)
.bind("thread-1")
.bind("proc-1")
.bind(16_i64)
.execute(&pool)
.await
.expect("insert legacy log row");
pool.close().await;
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
assert!(
!legacy_logs_path.exists(),
"legacy logs db should be removed when the version changes"
);
assert!(
logs_db_path(codex_home.as_path()).exists(),
"current logs db should be recreated during init"
);
assert!(
runtime
.query_logs(&LogQuery::default())
.await
.expect("query recreated logs db")
.is_empty()
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn init_configures_logs_db_with_incremental_auto_vacuum() {
let codex_home = unique_temp_dir();
let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let pool = open_db_pool(logs_db_path(codex_home.as_path()).as_path()).await;
let auto_vacuum = sqlx::query_scalar::<_, i64>("PRAGMA auto_vacuum")
.fetch_one(&pool)
.await
.expect("read auto_vacuum pragma");
assert_eq!(auto_vacuum, 2);
pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[test]
fn format_feedback_log_line_matches_feedback_formatter_shape() {
assert_eq!(