Compare commits

...

1 Commits

Author SHA1 Message Date
David Wiesen
1654150339 fix(state): repair line-ending-only sqlx migration drift 2026-05-21 09:51:25 -07:00

View File

@@ -40,6 +40,7 @@ use sqlx::Row;
use sqlx::Sqlite;
use sqlx::SqliteConnection;
use sqlx::SqlitePool;
use sqlx::migrate::Migration;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqliteAutoVacuum;
use sqlx::sqlite::SqliteConnectOptions;
@@ -47,6 +48,7 @@ use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::sqlite::SqliteSynchronous;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -177,6 +179,7 @@ async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<S
.max_connections(5)
.connect_with(options)
.await?;
repair_line_ending_compatible_migration_checksums(&pool, migrator, path, "state").await?;
migrator.run(&pool).await?;
let auto_vacuum = sqlx::query_scalar::<_, i64>("PRAGMA auto_vacuum")
.fetch_one(&pool)
@@ -203,10 +206,111 @@ async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<Sq
.max_connections(5)
.connect_with(options)
.await?;
repair_line_ending_compatible_migration_checksums(&pool, migrator, path, "logs").await?;
migrator.run(&pool).await?;
Ok(pool)
}
async fn repair_line_ending_compatible_migration_checksums(
pool: &SqlitePool,
migrator: &Migrator,
path: &Path,
db_label: &str,
) -> anyhow::Result<()> {
let migrations_table_exists = sqlx::query_scalar::<_, i64>(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = '_sqlx_migrations'",
)
.fetch_optional(pool)
.await?
.is_some();
if !migrations_table_exists {
return Ok(());
}
let applied_migrations =
sqlx::query("SELECT version, checksum FROM _sqlx_migrations WHERE success = 1")
.fetch_all(pool)
.await?;
let applied_checksums: HashMap<i64, Vec<u8>> = applied_migrations
.into_iter()
.map(|row| (row.get("version"), row.get("checksum")))
.collect();
let mut repaired_versions = Vec::new();
for migration in migrator
.iter()
.filter(|migration| migration.migration_type.is_up_migration())
{
let Some(applied_checksum) = applied_checksums.get(&migration.version) else {
continue;
};
if applied_checksum.as_slice() == migration.checksum.as_ref() {
continue;
}
if !has_line_ending_compatible_checksum(migration, applied_checksum.as_slice()) {
continue;
}
sqlx::query("UPDATE _sqlx_migrations SET checksum = ? WHERE version = ?")
.bind(migration.checksum.as_ref())
.bind(migration.version)
.execute(pool)
.await?;
repaired_versions.push(migration.version);
}
if !repaired_versions.is_empty() {
warn!(
"normalized line-ending-only migration checksum drift for {db_label} db at {} (versions: {:?})",
path.display(),
repaired_versions,
);
}
Ok(())
}
fn has_line_ending_compatible_checksum(migration: &Migration, applied_checksum: &[u8]) -> bool {
line_ending_compatible_checksums(migration)
.iter()
.any(|checksum| checksum.as_slice() == applied_checksum)
}
fn line_ending_compatible_checksums(migration: &Migration) -> Vec<Vec<u8>> {
let mut checksums = Vec::new();
let lf_sql = migration.sql.replace("\r\n", "\n");
if lf_sql != migration.sql {
checksums.push(
Migration::new(
migration.version,
migration.description.clone(),
migration.migration_type,
lf_sql.into(),
migration.no_tx,
)
.checksum
.into_owned(),
);
}
let crlf_sql = lf_sql.replace('\n', "\r\n");
if crlf_sql != migration.sql {
checksums.push(
Migration::new(
migration.version,
migration.description.clone(),
migration.migration_type,
crlf_sql.into(),
migration.no_tx,
)
.checksum
.into_owned(),
);
}
checksums
}
fn db_filename(base_name: &str, version: u32) -> String {
format!("{base_name}_{version}.sqlite")
}
@@ -312,6 +416,7 @@ fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -
#[cfg(test)]
mod tests {
use super::has_line_ending_compatible_checksum;
use super::open_state_sqlite;
use super::runtime_state_migrator;
use super::state_db_path;
@@ -379,4 +484,111 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn open_state_sqlite_repairs_line_ending_only_checksum_drift() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let state_path = state_db_path(codex_home.as_path());
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(true),
)
.await
.expect("open state db");
STATE_MIGRATOR
.run(&pool)
.await
.expect("apply current state schema");
let first_migration = STATE_MIGRATOR
.iter()
.find(|migration| migration.version == 1)
.expect("state migration 1");
let crlf_sql = first_migration.sql.replace('\n', "\r\n");
let crlf_checksum = sqlx::migrate::Migration::new(
first_migration.version,
first_migration.description.clone(),
first_migration.migration_type,
crlf_sql.into(),
first_migration.no_tx,
)
.checksum
.into_owned();
assert!(has_line_ending_compatible_checksum(
first_migration,
crlf_checksum.as_slice()
));
sqlx::query("UPDATE _sqlx_migrations SET checksum = ? WHERE version = 1")
.bind(crlf_checksum)
.execute(&pool)
.await
.expect("corrupt migration checksum with CRLF variant");
pool.close().await;
let strict_pool = open_db_pool(state_path.as_path()).await;
let strict_err = STATE_MIGRATOR
.run(&strict_pool)
.await
.expect_err("strict migrator should reject checksum drift");
assert!(matches!(strict_err, MigrateError::VersionMismatch(1)));
strict_pool.close().await;
let tolerant_pool = open_state_sqlite(state_path.as_path(), &runtime_state_migrator())
.await
.expect("runtime migrator should repair line-ending-only drift");
let repaired_checksum: Vec<u8> =
sqlx::query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = 1")
.fetch_one(&tolerant_pool)
.await
.expect("read repaired checksum");
assert_eq!(
repaired_checksum.as_slice(),
first_migration.checksum.as_ref()
);
tolerant_pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn open_state_sqlite_keeps_rejecting_real_checksum_mismatches() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let state_path = state_db_path(codex_home.as_path());
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(true),
)
.await
.expect("open state db");
STATE_MIGRATOR
.run(&pool)
.await
.expect("apply current state schema");
sqlx::query("UPDATE _sqlx_migrations SET checksum = ? WHERE version = 1")
.bind(vec![9_u8; 48])
.execute(&pool)
.await
.expect("corrupt migration checksum");
pool.close().await;
let err = open_state_sqlite(state_path.as_path(), &runtime_state_migrator())
.await
.expect_err("runtime migrator should still reject real checksum mismatches");
let migrate_err = err
.downcast_ref::<MigrateError>()
.expect("runtime error should preserve migrate error");
assert!(matches!(migrate_err, MigrateError::VersionMismatch(1)));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}