Compare commits

...

1 Commits

Author SHA1 Message Date
Ruslan Nigmatullin
6dc89b8344 state: avoid startup writes for ready dbs 2026-05-08 23:00:13 +00:00
3 changed files with 173 additions and 1 deletions

View File

@@ -106,6 +106,20 @@ async fn try_init_with_roots_inner(
default_model_provider_id: String,
backfill_lease_seconds: Option<i64>,
) -> anyhow::Result<StateDbHandle> {
if let Some(runtime) = codex_state::StateRuntime::try_open_existing_ready(
sqlite_home.clone(),
default_model_provider_id.clone(),
)
.await
.map_err(|err| {
anyhow::anyhow!(
"failed to open ready state runtime at {}: {err}",
sqlite_home.display()
)
})? {
return Ok(runtime);
}
let runtime =
codex_state::StateRuntime::init(sqlite_home.clone(), default_model_provider_id.clone())
.await

View File

@@ -84,3 +84,34 @@ async fn try_init_times_out_waiting_for_stuck_startup_backfill() -> anyhow::Resu
Ok(())
}
#[tokio::test]
async fn try_init_reopens_ready_state_without_running_mutating_init() -> anyhow::Result<()> {
let home = TempDir::new().expect("temp dir");
let runtime =
codex_state::StateRuntime::init(home.path().to_path_buf(), "test-provider".to_string())
.await?;
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
let legacy_state_path = home.path().join("state.sqlite");
tokio::fs::write(&legacy_state_path, b"legacy").await?;
let initialized = try_init_with_roots(
home.path().to_path_buf(),
home.path().to_path_buf(),
"test-provider".to_string(),
)
.await?;
assert_eq!(
initialized.get_backfill_state().await?.status,
codex_state::BackfillStatus::Complete
);
assert!(
tokio::fs::try_exists(&legacy_state_path).await?,
"ready-state fast path should not run legacy db cleanup"
);
Ok(())
}

View File

@@ -46,7 +46,6 @@ use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::sqlite::SqliteSynchronous;
use std::collections::BTreeSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -130,6 +129,90 @@ impl StateRuntime {
return Err(err);
}
};
Self::finish_open(codex_home, default_provider, pool, logs_pool, &logs_path).await
}
/// Open an already-ready state runtime without mutating the state database.
///
/// Returns `Ok(None)` when the state database is missing, behind the current
/// schema, or still needs rollout backfill work. Callers can then fall back
/// to [`Self::init`] to create, migrate, or repair the local state.
pub async fn try_open_existing_ready(
codex_home: PathBuf,
default_provider: String,
) -> anyhow::Result<Option<Arc<Self>>> {
let state_path = state_db_path(codex_home.as_path());
if !tokio::fs::try_exists(&state_path).await? {
return Ok(None);
}
let state_migrator = runtime_state_migrator();
let Some(latest_state_migration) = state_migrator.migrations.last() else {
return Ok(None);
};
let options = SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(false)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
let journal_mode: String = sqlx::query_scalar("PRAGMA journal_mode")
.fetch_one(&pool)
.await?;
let ready_status = sqlx::query_scalar(
r#"
SELECT backfill_state.status
FROM backfill_state
WHERE backfill_state.id = 1
AND EXISTS (
SELECT 1
FROM _sqlx_migrations
WHERE version = ? AND success = 1
)
"#,
)
.bind(latest_state_migration.version)
.fetch_optional(&pool)
.await
.ok()
.flatten();
if !journal_mode.eq_ignore_ascii_case("wal")
|| ready_status.as_deref() != Some(crate::BackfillStatus::Complete.as_str())
{
pool.close().await;
return Ok(None);
}
let logs_path = logs_db_path(codex_home.as_path());
let logs_migrator = runtime_logs_migrator();
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
return Err(err);
}
};
let runtime = Self::finish_open(
codex_home,
default_provider,
Arc::new(pool),
logs_pool,
&logs_path,
)
.await?;
Ok(Some(runtime))
}
async fn finish_open(
codex_home: PathBuf,
default_provider: String,
pool: Arc<SqlitePool>,
logs_pool: Arc<SqlitePool>,
logs_path: &Path,
) -> anyhow::Result<Arc<Self>> {
let thread_updated_at_millis: Option<i64> =
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
.fetch_one(pool.as_ref())
@@ -295,15 +378,18 @@ fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -
#[cfg(test)]
mod tests {
use super::StateRuntime;
use super::open_state_sqlite;
use super::runtime_state_migrator;
use super::state_db_path;
use super::test_support::unique_temp_dir;
use crate::migrations::STATE_MIGRATOR;
use sqlx::Connection;
use sqlx::SqlitePool;
use sqlx::migrate::MigrateError;
use sqlx::sqlite::SqliteConnectOptions;
use std::path::Path;
use std::time::Duration;
async fn open_db_pool(path: &Path) -> SqlitePool {
SqlitePool::connect_with(
@@ -362,4 +448,45 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn try_open_existing_ready_does_not_need_state_db_write_lock() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("mark backfill complete");
let state_path = state_db_path(codex_home.as_path());
let mut writer = sqlx::SqliteConnection::connect_with(
&SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(false),
)
.await
.expect("open writer connection");
sqlx::query("BEGIN IMMEDIATE")
.execute(&mut writer)
.await
.expect("reserve state db writer slot");
let reopened = tokio::time::timeout(
Duration::from_secs(1),
StateRuntime::try_open_existing_ready(codex_home.clone(), "test-provider".to_string()),
)
.await
.expect("ready reopen should not wait on a state db writer")
.expect("open ready runtime")
.expect("ready state db should reopen");
assert_eq!(reopened.codex_home(), codex_home.as_path());
sqlx::query("ROLLBACK")
.execute(&mut writer)
.await
.expect("release writer reservation");
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}