mirror of
https://github.com/openai/codex.git
synced 2026-05-18 18:22:39 +00:00
sqlite: no more destructive version bumps (#21847)
## Why We'd like SQLite state to become required and load-bearing. As a first step, let's remove the mechanism that allows us to blow away the SQLite DB on a version bump, and instead rely on graceful migrations. The original motivation ([PR](https://github.com/openai/codex/pull/10623)) behind this mechanism was to care less about backwards compatibility while SQLite was being landed, but I'd say it's quite important now to keep the data in it. ## What changed - Make `STATE_DB_FILENAME` and `LOGS_DB_FILENAME` the full canonical filenames: `state_5.sqlite` and `logs_2.sqlite`. - Remove `STATE_DB_VERSION` / `LOGS_DB_VERSION` and the helper that constructed filenames from versions. - Stop `StateRuntime::init` from scanning for or deleting older SQLite DB filenames at startup. - Delete the tests that encoded legacy state/logs DB deletion behavior. ## Verification - `cargo test -p codex-state`
This commit is contained in:
@@ -60,10 +60,8 @@ pub use runtime::state_db_path;
|
||||
/// Environment variable for overriding the SQLite state database home directory.
|
||||
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
|
||||
|
||||
pub const LOGS_DB_FILENAME: &str = "logs";
|
||||
pub const LOGS_DB_VERSION: u32 = 2;
|
||||
pub const STATE_DB_FILENAME: &str = "state";
|
||||
pub const STATE_DB_VERSION: u32 = 5;
|
||||
pub const LOGS_DB_FILENAME: &str = "logs_2.sqlite";
|
||||
pub const STATE_DB_FILENAME: &str = "state_5.sqlite";
|
||||
|
||||
/// Errors encountered during DB operations. Tags: [stage]
|
||||
pub const DB_ERROR_METRIC: &str = "codex.db.error";
|
||||
|
||||
@@ -6,12 +6,10 @@ use crate::AgentJobItemStatus;
|
||||
use crate::AgentJobProgress;
|
||||
use crate::AgentJobStatus;
|
||||
use crate::LOGS_DB_FILENAME;
|
||||
use crate::LOGS_DB_VERSION;
|
||||
use crate::LogEntry;
|
||||
use crate::LogQuery;
|
||||
use crate::LogRow;
|
||||
use crate::STATE_DB_FILENAME;
|
||||
use crate::STATE_DB_VERSION;
|
||||
use crate::SortKey;
|
||||
use crate::ThreadMetadata;
|
||||
use crate::ThreadMetadataBuilder;
|
||||
@@ -98,22 +96,6 @@ impl StateRuntime {
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
let state_migrator = runtime_state_migrator();
|
||||
let logs_migrator = runtime_logs_migrator();
|
||||
let current_state_name = state_db_filename();
|
||||
let current_logs_name = logs_db_filename();
|
||||
remove_legacy_db_files(
|
||||
&codex_home,
|
||||
current_state_name.as_str(),
|
||||
STATE_DB_FILENAME,
|
||||
"state",
|
||||
)
|
||||
.await;
|
||||
remove_legacy_db_files(
|
||||
&codex_home,
|
||||
current_logs_name.as_str(),
|
||||
LOGS_DB_FILENAME,
|
||||
"logs",
|
||||
)
|
||||
.await;
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let logs_path = logs_db_path(codex_home.as_path());
|
||||
let pool = match open_state_sqlite(&state_path, &state_migrator).await {
|
||||
@@ -190,12 +172,8 @@ async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<Sq
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
fn db_filename(base_name: &str, version: u32) -> String {
|
||||
format!("{base_name}_{version}.sqlite")
|
||||
}
|
||||
|
||||
pub fn state_db_filename() -> String {
|
||||
db_filename(STATE_DB_FILENAME, STATE_DB_VERSION)
|
||||
STATE_DB_FILENAME.to_string()
|
||||
}
|
||||
|
||||
pub fn state_db_path(codex_home: &Path) -> PathBuf {
|
||||
@@ -203,96 +181,13 @@ pub fn state_db_path(codex_home: &Path) -> PathBuf {
|
||||
}
|
||||
|
||||
pub fn logs_db_filename() -> String {
|
||||
db_filename(LOGS_DB_FILENAME, LOGS_DB_VERSION)
|
||||
LOGS_DB_FILENAME.to_string()
|
||||
}
|
||||
|
||||
pub fn logs_db_path(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join(logs_db_filename())
|
||||
}
|
||||
|
||||
async fn remove_legacy_db_files(
|
||||
codex_home: &Path,
|
||||
current_name: &str,
|
||||
base_name: &str,
|
||||
db_label: &str,
|
||||
) {
|
||||
let mut entries = match tokio::fs::read_dir(codex_home).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read codex_home for {db_label} db cleanup {}: {err}",
|
||||
codex_home.display(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut legacy_paths = Vec::new();
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
if !entry
|
||||
.file_type()
|
||||
.await
|
||||
.map(|file_type| file_type.is_file())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
if !should_remove_db_file(file_name.as_ref(), current_name, base_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
legacy_paths.push(entry.path());
|
||||
}
|
||||
|
||||
// On Windows, SQLite can keep the main database file undeletable until the
|
||||
// matching `-wal` / `-shm` sidecars are removed. Remove the longest
|
||||
// sidecar-style paths first so the main file is attempted last.
|
||||
legacy_paths.sort_by_key(|path| std::cmp::Reverse(path.as_os_str().len()));
|
||||
for legacy_path in legacy_paths {
|
||||
let mut result = tokio::fs::remove_file(&legacy_path).await;
|
||||
for _ in 0..3 {
|
||||
if result.is_ok() {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
result = tokio::fs::remove_file(&legacy_path).await;
|
||||
}
|
||||
if let Err(err) = result {
|
||||
warn!(
|
||||
"failed to remove legacy {db_label} db file {}: {err}",
|
||||
legacy_path.display(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -> bool {
|
||||
let mut normalized_name = file_name;
|
||||
for suffix in ["-wal", "-shm", "-journal"] {
|
||||
if let Some(stripped) = file_name.strip_suffix(suffix) {
|
||||
normalized_name = stripped;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if normalized_name == current_name {
|
||||
return false;
|
||||
}
|
||||
let unversioned_name = format!("{base_name}.sqlite");
|
||||
if normalized_name == unversioned_name {
|
||||
return true;
|
||||
}
|
||||
|
||||
let Some(version_with_extension) = normalized_name.strip_prefix(&format!("{base_name}_"))
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
let Some(version_suffix) = version_with_extension.strip_suffix(".sqlite") else {
|
||||
return false;
|
||||
};
|
||||
!version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::open_state_sqlite;
|
||||
|
||||
@@ -122,88 +122,10 @@ ON CONFLICT(id) DO NOTHING
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::StateRuntime;
|
||||
use super::state_db_filename;
|
||||
use super::test_support::unique_temp_dir;
|
||||
use crate::STATE_DB_FILENAME;
|
||||
use crate::STATE_DB_VERSION;
|
||||
use chrono::Utc;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_removes_legacy_state_db_files() {
|
||||
let codex_home = unique_temp_dir();
|
||||
tokio::fs::create_dir_all(&codex_home)
|
||||
.await
|
||||
.expect("create codex_home");
|
||||
|
||||
let current_name = state_db_filename();
|
||||
let previous_version = STATE_DB_VERSION.saturating_sub(1);
|
||||
let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite");
|
||||
for suffix in ["", "-wal", "-shm", "-journal"] {
|
||||
let path = codex_home.join(format!("{unversioned_name}{suffix}"));
|
||||
tokio::fs::write(path, b"legacy")
|
||||
.await
|
||||
.expect("write legacy");
|
||||
let old_version_path = codex_home.join(format!(
|
||||
"{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}"
|
||||
));
|
||||
tokio::fs::write(old_version_path, b"old_version")
|
||||
.await
|
||||
.expect("write old version");
|
||||
}
|
||||
let unrelated_path = codex_home.join("state.sqlite_backup");
|
||||
tokio::fs::write(&unrelated_path, b"keep")
|
||||
.await
|
||||
.expect("write unrelated");
|
||||
let numeric_path = codex_home.join("123");
|
||||
tokio::fs::write(&numeric_path, b"keep")
|
||||
.await
|
||||
.expect("write numeric");
|
||||
|
||||
let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
for suffix in ["", "-wal", "-shm", "-journal"] {
|
||||
let legacy_path = codex_home.join(format!("{unversioned_name}{suffix}"));
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&legacy_path)
|
||||
.await
|
||||
.expect("check legacy path"),
|
||||
false
|
||||
);
|
||||
let old_version_path = codex_home.join(format!(
|
||||
"{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}"
|
||||
));
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&old_version_path)
|
||||
.await
|
||||
.expect("check old version path"),
|
||||
false
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(codex_home.join(current_name))
|
||||
.await
|
||||
.expect("check new db path"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&unrelated_path)
|
||||
.await
|
||||
.expect("check unrelated path"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&numeric_path)
|
||||
.await
|
||||
.expect("check numeric path"),
|
||||
true
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn backfill_state_persists_progress_and_completion() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
@@ -698,67 +698,6 @@ mod tests {
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_recreates_legacy_logs_db_when_log_version_changes() {
|
||||
let codex_home = unique_temp_dir();
|
||||
tokio::fs::create_dir_all(&codex_home)
|
||||
.await
|
||||
.expect("create codex home");
|
||||
let legacy_logs_path = codex_home.join("logs_1.sqlite");
|
||||
let pool = SqlitePool::connect_with(
|
||||
SqliteConnectOptions::new()
|
||||
.filename(&legacy_logs_path)
|
||||
.create_if_missing(true),
|
||||
)
|
||||
.await
|
||||
.expect("open legacy logs db");
|
||||
LOGS_MIGRATOR
|
||||
.run(&pool)
|
||||
.await
|
||||
.expect("apply legacy logs schema");
|
||||
sqlx::query(
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(1_i64)
|
||||
.bind(0_i64)
|
||||
.bind("INFO")
|
||||
.bind("cli")
|
||||
.bind("legacy-log-row")
|
||||
.bind("mod")
|
||||
.bind("main.rs")
|
||||
.bind(7_i64)
|
||||
.bind("thread-1")
|
||||
.bind("proc-1")
|
||||
.bind(16_i64)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.expect("insert legacy log row");
|
||||
pool.close().await;
|
||||
drop(pool);
|
||||
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
assert!(
|
||||
!legacy_logs_path.exists(),
|
||||
"legacy logs db should be removed when the version changes"
|
||||
);
|
||||
assert!(
|
||||
logs_db_path(codex_home.as_path()).exists(),
|
||||
"current logs db should be recreated during init"
|
||||
);
|
||||
assert!(
|
||||
runtime
|
||||
.query_logs(&LogQuery::default())
|
||||
.await
|
||||
.expect("query recreated logs db")
|
||||
.is_empty()
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_configures_logs_db_with_incremental_auto_vacuum() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
Reference in New Issue
Block a user