Automatic SQLite DB recovery

We're seeing increasing user reports of corrupted SQLite databases, and the vast majority of those have been recoverable with sqlite's built-in recovery process.  Unfortunately, the rust crate we're using doesn't have the recovery code compiled in, so we have to compile the recovery functions specifically ourselves, so we vendor those files into the repo here and run it if we detect a malformed DB during sqlite startup.  If the recovery attempt fails, it reverts to the original logic where it asks if the user wants to blow away their current database and start over.
This commit is contained in:
David de Regt
2026-05-20 13:41:14 -07:00
parent de80fa6e31
commit 878c1da249
16 changed files with 18612 additions and 5 deletions

View File

@@ -275,6 +275,14 @@ crate.annotation(
inject_repo(crate, "zlib")
crate.annotation(
build_script_env = {
"LIBSQLITE3_FLAGS": "SQLITE_ENABLE_DBPAGE_VTAB",
},
crate = "libsqlite3-sys",
gen_build_script = "on",
)
bazel_dep(name = "xz", version = "5.4.5.bcr.8")
single_version_override(
module_name = "xz",

View File

@@ -1,3 +1,6 @@
[env]
LIBSQLITE3_FLAGS = "SQLITE_ENABLE_DBPAGE_VTAB"
[target.'cfg(all(windows, target_env = "msvc"))']
rustflags = ["-C", "link-arg=/STACK:8388608", "-C", "target-feature=+crt-static"]

2
codex-rs/Cargo.lock generated
View File

@@ -3680,11 +3680,13 @@ name = "codex-state"
version = "0.0.0"
dependencies = [
"anyhow",
"cc",
"chrono",
"clap",
"codex-git-utils",
"codex-protocol",
"dirs",
"libsqlite3-sys",
"log",
"owo-colors",
"pretty_assertions",

View File

@@ -260,6 +260,7 @@ axum = { version = "0.8", default-features = false }
base64 = "0.22.1"
bm25 = "2.3.2"
bytes = "1.10.1"
cc = "1.2.55"
chardetng = "0.1.17"
chrono = "0.4.43"
clap = "4"
@@ -306,6 +307,7 @@ keyring = { version = "3.6", default-features = false }
landlock = "0.4.4"
lazy_static = "1"
libc = "0.2.182"
libsqlite3-sys = "0.30.1"
log = "0.4"
lru = "0.16.3"
maplit = "1.0.2"

View File

@@ -18,7 +18,9 @@ pub(crate) fn is_locked(detail: &str) -> bool {
pub(crate) fn confirm_repair(startup_error: &LocalStateDbStartupError) -> std::io::Result<bool> {
eprintln!("Codex couldn't start because its local database appears to be damaged.");
eprintln!("Codex can try a safe repair by backing up those files and rebuilding them.");
eprintln!(
"Codex can try a last-resort startup repair by backing up those files and rebuilding empty local databases."
);
print_technical_details(startup_error);
crate::confirm("Repair Codex local data now? [y/N]: ")
}

View File

@@ -3,5 +3,6 @@ load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "state",
crate_name = "codex_state",
build_script_data = glob(["vendor/sqlite-recover/**"]),
compile_data = glob(["goals_migrations/**", "logs_migrations/**", "migrations/**"]),
)

View File

@@ -3,6 +3,7 @@ name = "codex-state"
version.workspace = true
edition.workspace = true
license.workspace = true
build = "build.rs"
[dependencies]
anyhow = { workspace = true }
@@ -10,6 +11,7 @@ chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
codex-protocol = { workspace = true }
dirs = { workspace = true }
libsqlite3-sys = { workspace = true }
log = { workspace = true }
owo-colors = { workspace = true }
serde = { workspace = true, features = ["derive"] }
@@ -25,5 +27,8 @@ uuid = { workspace = true }
codex-git-utils = { workspace = true }
pretty_assertions = { workspace = true }
[build-dependencies]
cc = { workspace = true }
[lints]
workspace = true

13
codex-rs/state/build.rs Normal file
View File

@@ -0,0 +1,13 @@
fn main() {
println!("cargo:rerun-if-changed=vendor/sqlite-recover/dbdata.c");
println!("cargo:rerun-if-changed=vendor/sqlite-recover/sqlite3.h");
println!("cargo:rerun-if-changed=vendor/sqlite-recover/sqlite3recover.c");
println!("cargo:rerun-if-changed=vendor/sqlite-recover/sqlite3recover.h");
cc::Build::new()
.file("vendor/sqlite-recover/dbdata.c")
.file("vendor/sqlite-recover/sqlite3recover.c")
.include("vendor/sqlite-recover")
.warnings(false)
.compile("codex_sqlite_recover");
}

View File

@@ -61,6 +61,7 @@ mod backfill;
mod goals;
mod logs;
mod memories;
mod recovery;
mod remote_control;
#[cfg(test)]
mod test_support;
@@ -294,6 +295,37 @@ async fn open_sqlite(
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = match connect_sqlite(options.clone(), spec, telemetry_override).await {
Ok(pool) => pool,
Err(err) => {
if !recovery::is_malformed_sqlite_error(&err) {
return Err(err);
}
recovery::recover_database(path, spec, migrator, &err).await?;
connect_sqlite(options.clone(), spec, telemetry_override).await?
}
};
match migrate_sqlite(&pool, migrator, spec, telemetry_override).await {
Ok(()) => Ok(pool),
Err(err) => {
if !recovery::is_malformed_sqlite_error(&err) {
return Err(err);
}
pool.close().await;
recovery::recover_database(path, spec, migrator, &err).await?;
let pool = connect_sqlite(options, spec, telemetry_override).await?;
migrate_sqlite(&pool, migrator, spec, telemetry_override).await?;
Ok(pool)
}
}
}
async fn connect_sqlite(
options: SqliteConnectOptions,
spec: RuntimeDbSpec,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
let started = Instant::now();
let pool_result = SqlitePoolOptions::new()
.max_connections(5)
@@ -307,9 +339,17 @@ async fn open_sqlite(
started.elapsed(),
&pool_result,
);
let pool = pool_result?;
pool_result
}
async fn migrate_sqlite(
pool: &SqlitePool,
migrator: &Migrator,
spec: RuntimeDbSpec,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<()> {
let started = Instant::now();
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
let migrate_result = migrator.run(pool).await.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
telemetry_override,
spec.kind,
@@ -317,8 +357,7 @@ async fn open_sqlite(
started.elapsed(),
&migrate_result,
);
migrate_result?;
Ok(pool)
migrate_result
}
pub(super) async fn ensure_backfill_state_row_in_pool(

View File

@@ -0,0 +1,723 @@
use super::RuntimeDbSpec;
use anyhow::Context;
use anyhow::Result;
use log::LevelFilter;
use sqlx::ConnectOptions;
use sqlx::Row;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqlitePoolOptions;
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::ffi::OsString;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tracing::warn;
mod recover_api;
const SQLITE_CORRUPT: i32 = 11;
const SQLITE_NOTADB: i32 = 26;
#[derive(Debug)]
struct RecoveryPaths {
recovered_path: PathBuf,
backup_paths: Vec<PathBuf>,
}
pub(super) fn is_malformed_sqlite_error(err: &anyhow::Error) -> bool {
// Prefer SQLite result codes, but keep a message fallback for migration
// wrappers that stringify the underlying database error.
for cause in err.chain() {
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>()
&& sqlx_error_is_malformed(sqlx_err)
{
return true;
}
}
err.chain()
.map(ToString::to_string)
.any(|message| error_message_is_malformed(message.as_str()))
}
pub(super) async fn recover_database(
path: &Path,
spec: RuntimeDbSpec,
migrator: &Migrator,
original_error: &anyhow::Error,
) -> Result<()> {
let recovery = prepare_recovery_paths(path).await.with_context(|| {
format!(
"failed to prepare automatic recovery for {} at {}",
spec.label,
path.display()
)
})?;
warn!(
"{} at {} appears malformed ({original_error}); attempting automatic recovery after backing up to {}",
spec.label,
path.display(),
format_backup_paths(&recovery.backup_paths)
);
print_status(recovery_started_status(
path,
spec,
recovery.backup_paths.as_slice(),
));
match run_recovery(path, recovery.recovered_path.as_path(), migrator).await {
Ok(()) => {
if let Err(err) =
replace_with_recovered_database(path, recovery.recovered_path.as_path()).await
{
print_status(recovery_failed_status(
path,
spec,
recovery.backup_paths.as_slice(),
));
return Err(err).with_context(|| {
format!(
"automatic recovery rebuilt {} at {} but failed to replace the original database; backup files remain at {}",
spec.label,
path.display(),
format_backup_paths(&recovery.backup_paths)
)
});
}
warn!(
"automatically recovered {} at {} with the SQLite recovery API",
spec.label,
path.display()
);
print_status(recovery_completed_status(path, spec));
Ok(())
}
Err(err) => {
let _ = tokio::fs::remove_file(recovery.recovered_path.as_path()).await;
print_status(recovery_failed_status(
path,
spec,
recovery.backup_paths.as_slice(),
));
Err(err).with_context(|| {
format!(
"automatic recovery failed for {} at {}; backup files remain at {}",
spec.label,
path.display(),
format_backup_paths(&recovery.backup_paths)
)
})
}
}
}
fn print_status(message: String) {
// Keep startup recovery status on stderr so stdout remains available for
// command output and app-server JSON-RPC transports.
eprintln!("{message}");
}
fn sqlx_error_is_malformed(err: &sqlx::Error) -> bool {
match err {
sqlx::Error::Database(database_error) => {
let code = database_error
.code()
.unwrap_or(Cow::Borrowed("none"))
.to_string();
sqlite_code_is_malformed(code.as_str())
|| error_message_is_malformed(database_error.message())
}
_ => error_message_is_malformed(err.to_string().as_str()),
}
}
fn sqlite_code_is_malformed(code: &str) -> bool {
let primary_code = code.parse::<i32>().ok().map(|code| code & 0xff);
matches!(primary_code, Some(SQLITE_CORRUPT | SQLITE_NOTADB))
}
fn error_message_is_malformed(message: &str) -> bool {
let message = message.to_ascii_lowercase();
message.contains("database disk image is malformed")
|| message.contains("file is not a database")
|| message.contains("database schema is malformed")
|| message.contains("database is corrupt")
|| (message.contains("database") && message.contains("malformed"))
}
async fn prepare_recovery_paths(path: &Path) -> Result<RecoveryPaths> {
if !tokio::fs::try_exists(path).await? {
anyhow::bail!("database file does not exist");
}
let suffix = recovery_suffix();
let backup_paths = backup_sqlite_files(path, suffix.as_str()).await?;
if backup_paths.is_empty() {
anyhow::bail!("no database files were available to back up");
}
let recovered_path = unique_sibling_path(path, suffix.as_str(), "recovered").await?;
Ok(RecoveryPaths {
recovered_path,
backup_paths,
})
}
async fn backup_sqlite_files(path: &Path, suffix: &str) -> Result<Vec<PathBuf>> {
let mut backups = Vec::new();
for sqlite_path in sqlite_paths(path) {
if tokio::fs::try_exists(sqlite_path.as_path()).await? {
let backup_path = unique_sibling_path(sqlite_path.as_path(), suffix, "bak").await?;
tokio::fs::copy(sqlite_path.as_path(), backup_path.as_path()).await?;
backups.push(backup_path);
}
}
Ok(backups)
}
async fn unique_sibling_path(path: &Path, suffix: &str, extension: &str) -> Result<PathBuf> {
let file_name = path.file_name().ok_or_else(|| {
anyhow::anyhow!("cannot create a recovery file name for {}", path.display())
})?;
let mut sequence = 0;
loop {
let mut candidate = file_name.to_os_string();
candidate.push(format!(".{suffix}.{sequence}.{extension}"));
let candidate = path.with_file_name(candidate);
if !tokio::fs::try_exists(candidate.as_path()).await? {
return Ok(candidate);
}
sequence += 1;
}
}
async fn run_recovery(path: &Path, recovered_path: &Path, migrator: &Migrator) -> Result<()> {
let path = path.to_path_buf();
let recovered_path = recovered_path.to_path_buf();
let recovered_path_for_task = recovered_path.clone();
tokio::task::spawn_blocking(move || {
recover_api::recover(path.as_path(), recovered_path_for_task.as_path())
})
.await
.context("sqlite recovery task panicked")??;
let pool = open_recovered_pool(recovered_path.as_path()).await?;
assert_recovered_schema(&pool).await?;
assert_integrity_ok(&pool).await?;
match migrator.run(&pool).await {
Ok(()) => {
assert_integrity_ok(&pool).await?;
pool.close().await;
}
Err(err) => {
pool.close().await;
// Recovery can restore user tables while losing SQLx's bookkeeping
// table. Normalize through a fresh migrated DB before giving up.
rebuild_recovered_database(recovered_path.as_path(), migrator)
.await
.with_context(|| {
format!("failed to normalize recovered database after migration failure: {err}")
})?;
}
}
Ok(())
}
async fn open_recovered_pool(path: &Path) -> Result<SqlitePool> {
let options = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off);
Ok(SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await?)
}
async fn open_migrated_pool(path: &Path, migrator: &Migrator) -> Result<SqlitePool> {
let options = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await?;
migrator
.run(&pool)
.await
.context("failed to migrate fresh database for recovered data")?;
Ok(pool)
}
async fn rebuild_recovered_database(recovered_path: &Path, migrator: &Migrator) -> Result<()> {
let normalized_path =
unique_sibling_path(recovered_path, "codex-recovery-normalized", "sqlite").await?;
let pool = open_migrated_pool(normalized_path.as_path(), migrator).await?;
let rebuild_result = async {
copy_recovered_tables(&pool, recovered_path).await?;
assert_recovered_schema(&pool).await?;
assert_integrity_ok(&pool).await?;
Ok::<(), anyhow::Error>(())
}
.await;
pool.close().await;
if let Err(err) = rebuild_result {
let _ = tokio::fs::remove_file(normalized_path.as_path()).await;
return Err(err);
}
tokio::fs::remove_file(recovered_path).await?;
tokio::fs::rename(normalized_path, recovered_path).await?;
Ok(())
}
async fn copy_recovered_tables(pool: &SqlitePool, recovered_path: &Path) -> Result<()> {
let recovered_path = recovered_path.to_str().with_context(|| {
format!(
"recovered path is not valid UTF-8: {}",
recovered_path.display()
)
})?;
sqlx::query("ATTACH DATABASE ? AS recovered")
.bind(recovered_path)
.execute(pool)
.await?;
sqlx::query("PRAGMA foreign_keys = OFF")
.execute(pool)
.await?;
let copy_result = async {
let destination_tables = user_tables(pool, "main").await?;
let source_tables = user_tables(pool, "recovered").await?;
let mut copied_table_count = 0;
for table in source_tables {
if destination_tables.contains(&table) {
// Copy only columns that survived recovery and still exist in
// the current schema. Missing new columns rely on migration
// defaults.
if copy_current_schema_table(pool, table.as_str()).await? {
copied_table_count += 1;
}
} else {
copy_extra_recovered_table(pool, table.as_str()).await?;
copied_table_count += 1;
}
}
if copied_table_count == 0 {
anyhow::bail!("recovered database did not contain any current schema tables");
}
Ok::<(), anyhow::Error>(())
}
.await;
let detach_result = sqlx::query("DETACH DATABASE recovered").execute(pool).await;
copy_result?;
detach_result?;
Ok(())
}
async fn user_tables(pool: &SqlitePool, schema: &str) -> Result<BTreeSet<String>> {
let sql = format!(
r#"
SELECT name
FROM {}.sqlite_schema
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
AND name != '_sqlx_migrations'
ORDER BY name
"#,
quote_identifier(schema)
);
let rows = sqlx::query_scalar::<_, String>(sql.as_str())
.fetch_all(pool)
.await?;
Ok(rows.into_iter().collect())
}
async fn copy_current_schema_table(pool: &SqlitePool, table: &str) -> Result<bool> {
let destination_columns = table_columns(pool, "main", table).await?;
let source_columns = table_columns(pool, "recovered", table).await?;
let source_columns = source_columns.into_iter().collect::<BTreeSet<_>>();
let columns = destination_columns
.into_iter()
.filter(|column| source_columns.contains(column))
.collect::<Vec<_>>();
if columns.is_empty() {
return Ok(false);
}
let column_list = columns
.iter()
.map(|column| quote_identifier(column))
.collect::<Vec<_>>()
.join(", ");
let table_name = quote_identifier(table);
let sql = format!(
"INSERT OR REPLACE INTO main.{table_name} ({column_list}) SELECT {column_list} FROM recovered.{table_name}"
);
sqlx::query(sql.as_str()).execute(pool).await?;
Ok(true)
}
async fn copy_extra_recovered_table(pool: &SqlitePool, table: &str) -> Result<()> {
let table_name = quote_identifier(table);
let sql = format!("CREATE TABLE main.{table_name} AS SELECT * FROM recovered.{table_name}");
sqlx::query(sql.as_str()).execute(pool).await?;
Ok(())
}
async fn table_columns(pool: &SqlitePool, schema: &str, table: &str) -> Result<Vec<String>> {
let sql = format!(
"SELECT name FROM {}.pragma_table_xinfo(?) WHERE hidden = 0 ORDER BY cid",
quote_identifier(schema)
);
Ok(sqlx::query_scalar::<_, String>(sql.as_str())
.bind(table)
.fetch_all(pool)
.await?)
}
fn quote_identifier(identifier: &str) -> String {
format!("\"{}\"", identifier.replace('"', "\"\""))
}
async fn assert_recovered_schema(pool: &SqlitePool) -> Result<()> {
// A non-SQLite file can produce a valid empty database through recovery;
// accepting that would silently turn a damaged database into data loss.
let row = sqlx::query(
r#"
SELECT COUNT(*) AS table_count
FROM sqlite_schema
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
AND name != '_sqlx_migrations'
"#,
)
.fetch_one(pool)
.await?;
let table_count: i64 = row.try_get("table_count")?;
if table_count == 0 {
anyhow::bail!("SQLite recovery did not recover any user tables");
}
Ok(())
}
async fn assert_integrity_ok(pool: &SqlitePool) -> Result<()> {
let rows = sqlx::query_scalar::<_, String>("PRAGMA integrity_check")
.fetch_all(pool)
.await?;
if !rows.iter().all(|row| row == "ok") {
anyhow::bail!(
"recovered database failed integrity_check: {}",
rows.join("; ")
);
}
Ok(())
}
async fn replace_with_recovered_database(path: &Path, recovered_path: &Path) -> Result<()> {
// Stale WAL files belong to the damaged database and must not be replayed
// against the recovered replacement.
for sidecar in sqlite_sidecar_paths(path) {
match tokio::fs::remove_file(sidecar.as_path()).await {
Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
}
#[cfg(windows)]
match tokio::fs::remove_file(path).await {
Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
tokio::fs::rename(recovered_path, path).await?;
Ok(())
}
fn sqlite_paths(path: &Path) -> Vec<PathBuf> {
let mut paths = vec![path.to_path_buf()];
paths.extend(sqlite_sidecar_paths(path));
paths
}
fn sqlite_sidecar_paths(path: &Path) -> Vec<PathBuf> {
["-wal", "-shm"]
.into_iter()
.map(|suffix| {
let mut sidecar = OsString::from(path.as_os_str());
sidecar.push(suffix);
PathBuf::from(sidecar)
})
.collect()
}
fn recovery_suffix() -> String {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_secs());
format!("codex-recovery-{timestamp}")
}
fn format_backup_paths(paths: &[PathBuf]) -> String {
paths
.iter()
.map(|path| path.display().to_string())
.collect::<Vec<_>>()
.join(", ")
}
fn recovery_started_status(path: &Path, spec: RuntimeDbSpec, backup_paths: &[PathBuf]) -> String {
format!(
"Codex detected a malformed {} at {}. Please wait while Codex attempts to recover it automatically. Backup files: {}",
spec.label,
path.display(),
format_backup_paths(backup_paths)
)
}
fn recovery_completed_status(path: &Path, spec: RuntimeDbSpec) -> String {
format!(
"Codex successfully recovered {} at {}.",
spec.label,
path.display()
)
}
fn recovery_failed_status(path: &Path, spec: RuntimeDbSpec, backup_paths: &[PathBuf]) -> String {
format!(
"Codex could not automatically recover {} at {}. Backup files remain at {}.",
spec.label,
path.display(),
format_backup_paths(backup_paths)
)
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use sqlx::migrate::Migrator;
use std::fs::OpenOptions;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
#[test]
fn detects_malformed_sqlite_messages() {
let err = anyhow::anyhow!("database disk image is malformed");
assert!(is_malformed_sqlite_error(&err));
}
#[test]
fn recovery_status_messages_include_path_and_backup() {
let db_path = Path::new("state.sqlite");
let backup_path = PathBuf::from("state.sqlite.codex-recovery-1.0.bak");
assert_eq!(
recovery_started_status(
db_path,
super::super::STATE_DB,
std::slice::from_ref(&backup_path)
),
"Codex detected a malformed state DB at state.sqlite. Please wait while Codex attempts to recover it automatically. Backup files: state.sqlite.codex-recovery-1.0.bak"
);
assert_eq!(
recovery_completed_status(db_path, super::super::STATE_DB),
"Codex successfully recovered state DB at state.sqlite."
);
assert_eq!(
recovery_failed_status(db_path, super::super::STATE_DB, &[backup_path]),
"Codex could not automatically recover state DB at state.sqlite. Backup files remain at state.sqlite.codex-recovery-1.0.bak."
);
}
#[tokio::test]
async fn recovery_preserves_backup_and_replaces_malformed_database() -> Result<()> {
let temp_dir = super::super::test_support::unique_temp_dir();
tokio::fs::create_dir_all(temp_dir.as_path()).await?;
let db_path = temp_dir.join("sample.sqlite");
create_sample_db(db_path.as_path()).await?;
corrupt_first_table_page(db_path.as_path())?;
let err = anyhow::anyhow!("database disk image is malformed");
recover_database(
db_path.as_path(),
super::super::STATE_DB,
&Migrator::DEFAULT,
&err,
)
.await?;
let rows = super::super::sqlite_integrity_check(db_path.as_path()).await?;
assert_eq!(rows, vec!["ok".to_string()]);
let backup_count = std::fs::read_dir(temp_dir.as_path())?
.filter_map(std::result::Result::ok)
.filter(|entry| {
entry
.file_name()
.to_string_lossy()
.contains(".codex-recovery-")
})
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".bak"))
.count();
assert_eq!(backup_count, 1);
let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await;
Ok(())
}
#[tokio::test]
async fn recovery_normalizes_database_when_migration_metadata_is_lost() -> Result<()> {
let temp_dir = super::super::test_support::unique_temp_dir();
tokio::fs::create_dir_all(temp_dir.as_path()).await?;
let db_path = temp_dir.join(super::super::STATE_DB.filename);
let migrator = crate::migrations::runtime_state_migrator();
let pool = open_migrated_pool(db_path.as_path(), &migrator).await?;
let thread_id = "00000000-0000-0000-0000-000000000456";
sqlx::query(
r#"
INSERT INTO threads (
id,
rollout_path,
created_at,
updated_at,
source,
model_provider,
cwd,
title,
sandbox_policy,
approval_mode
) VALUES (?, ?, 1, 1, 'cli', 'test-provider', ?, 'survived recovery', 'read-only', 'on-request')
"#,
)
.bind(thread_id)
.bind(temp_dir.join("session.jsonl").display().to_string())
.bind(temp_dir.as_path().display().to_string())
.execute(&pool)
.await?;
sqlx::query(
"CREATE TABLE extra_recovered_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
)
.execute(&pool)
.await?;
sqlx::query("INSERT INTO extra_recovered_data (value) VALUES ('preserved')")
.execute(&pool)
.await?;
let page_size: i64 = sqlx::query_scalar("PRAGMA page_size")
.fetch_one(&pool)
.await?;
let migration_root_page: i64 = sqlx::query_scalar(
"SELECT rootpage FROM sqlite_schema WHERE name = '_sqlx_migrations'",
)
.fetch_one(&pool)
.await?;
pool.close().await;
corrupt_page(
db_path.as_path(),
page_size.try_into()?,
migration_root_page.try_into()?,
)?;
let err = anyhow::anyhow!("database disk image is malformed");
recover_database(db_path.as_path(), super::super::STATE_DB, &migrator, &err).await?;
let pool = open_recovered_pool(db_path.as_path()).await?;
let title: String = sqlx::query_scalar("SELECT title FROM threads WHERE id = ?")
.bind(thread_id)
.fetch_one(&pool)
.await?;
let migration_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM _sqlx_migrations")
.fetch_one(&pool)
.await?;
let extra_value: String =
sqlx::query_scalar("SELECT value FROM extra_recovered_data WHERE id = 1")
.fetch_one(&pool)
.await?;
pool.close().await;
assert_eq!(title, "survived recovery");
assert_eq!(migration_count, migrator.migrations.len() as i64);
assert_eq!(extra_value, "preserved");
let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await;
Ok(())
}
#[tokio::test]
async fn recovery_rejects_output_without_user_tables() -> Result<()> {
let temp_dir = super::super::test_support::unique_temp_dir();
tokio::fs::create_dir_all(temp_dir.as_path()).await?;
let db_path = temp_dir.join("not-a-db.sqlite");
tokio::fs::write(db_path.as_path(), b"not sqlite").await?;
let err = anyhow::anyhow!("file is not a database");
let recover_err = recover_database(
db_path.as_path(),
super::super::STATE_DB,
&Migrator::DEFAULT,
&err,
)
.await
.expect_err("recovery without schema should fail");
assert!(
recover_err
.to_string()
.contains("automatic recovery failed"),
"unexpected error: {recover_err}"
);
assert!(tokio::fs::try_exists(db_path.as_path()).await?);
let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await;
Ok(())
}
async fn create_sample_db(path: &Path) -> Result<()> {
let options = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.log_statements(LevelFilter::Off);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await?;
sqlx::query("CREATE TABLE sample (id INTEGER PRIMARY KEY, value TEXT NOT NULL)")
.execute(&pool)
.await?;
sqlx::query("INSERT INTO sample (value) VALUES ('one'), ('two')")
.execute(&pool)
.await?;
pool.close().await;
Ok(())
}
fn corrupt_first_table_page(path: &Path) -> Result<()> {
let mut bytes = std::fs::read(path)?;
if bytes.len() <= 4096 {
anyhow::bail!("sample database was smaller than two pages");
}
bytes[4096] = 0;
std::fs::write(path, bytes)?;
Ok(())
}
fn corrupt_page(path: &Path, page_size: u64, page_number: u64) -> Result<()> {
let offset = page_size
.checked_mul(page_number.saturating_sub(1))
.context("corrupt page offset overflowed")?;
let mut file = OpenOptions::new().write(true).open(path)?;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&[0; 16])?;
Ok(())
}
}

View File

@@ -0,0 +1,173 @@
use anyhow::Context;
use anyhow::Result;
use libsqlite3_sys as ffi;
use std::ffi::CStr;
use std::ffi::CString;
use std::ffi::c_char;
use std::ffi::c_int;
use std::ffi::c_void;
use std::path::Path;
use std::ptr;
const SQLITE_RECOVER_LOST_AND_FOUND: c_int = 1;
#[repr(C)]
struct SqliteRecover {
_private: [u8; 0],
}
unsafe extern "C" {
fn sqlite3_recover_init(
db: *mut ffi::sqlite3,
z_db: *const c_char,
z_uri: *const c_char,
) -> *mut SqliteRecover;
fn sqlite3_recover_config(recover: *mut SqliteRecover, op: c_int, arg: *mut c_void) -> c_int;
fn sqlite3_recover_run(recover: *mut SqliteRecover) -> c_int;
fn sqlite3_recover_errmsg(recover: *mut SqliteRecover) -> *const c_char;
fn sqlite3_recover_errcode(recover: *mut SqliteRecover) -> c_int;
fn sqlite3_recover_finish(recover: *mut SqliteRecover) -> c_int;
}
pub(super) fn recover(path: &Path, recovered_path: &Path) -> Result<()> {
let db = SqliteHandle::open(path)?;
let recovered_path = path_to_cstring(recovered_path)?;
let mut recovery = RecoveryHandle::new(db.as_ptr(), recovered_path.as_c_str())?;
recovery.configure_lost_and_found()?;
recovery.run()?;
recovery.finish()
}
struct SqliteHandle {
db: *mut ffi::sqlite3,
}
impl SqliteHandle {
fn open(path: &Path) -> Result<Self> {
let path = path_to_cstring(path)?;
let mut db = ptr::null_mut();
let flags = ffi::SQLITE_OPEN_READWRITE | ffi::SQLITE_OPEN_URI;
// The recovery API reads pages through sqlite_dbpage on this handle.
// It does not depend on SQLx because the database may be malformed.
let rc = unsafe { ffi::sqlite3_open_v2(path.as_ptr(), &mut db, flags, ptr::null()) };
if rc != ffi::SQLITE_OK {
let message = sqlite_error_message(db);
if !db.is_null() {
let _ = unsafe { ffi::sqlite3_close(db) };
}
anyhow::bail!("failed to open malformed database for recovery ({rc}): {message}");
}
Ok(Self { db })
}
fn as_ptr(&self) -> *mut ffi::sqlite3 {
self.db
}
}
impl Drop for SqliteHandle {
fn drop(&mut self) {
if !self.db.is_null() {
let _ = unsafe { ffi::sqlite3_close(self.db) };
}
}
}
struct RecoveryHandle {
recover: *mut SqliteRecover,
}
impl RecoveryHandle {
fn new(db: *mut ffi::sqlite3, recovered_path: &CStr) -> Result<Self> {
let recover =
unsafe { sqlite3_recover_init(db, c"main".as_ptr(), recovered_path.as_ptr()) };
if recover.is_null() {
anyhow::bail!("failed to initialize SQLite recovery: out of memory");
}
Ok(Self { recover })
}
fn configure_lost_and_found(&mut self) -> Result<()> {
// Match sqlite3 shell recovery behavior by keeping orphaned rows in a
// table instead of discarding pages not reachable from recovered schema.
let table_name = c"lost_and_found";
self.configure(
SQLITE_RECOVER_LOST_AND_FOUND,
table_name.as_ptr().cast_mut().cast(),
)
}
fn configure(&mut self, op: c_int, arg: *mut c_void) -> Result<()> {
let rc = unsafe { sqlite3_recover_config(self.recover, op, arg) };
if rc != ffi::SQLITE_OK {
anyhow::bail!(
"failed to configure SQLite recovery ({rc}): {}",
self.error_message()
);
}
Ok(())
}
fn run(&mut self) -> Result<()> {
let rc = unsafe { sqlite3_recover_run(self.recover) };
if rc != ffi::SQLITE_OK {
anyhow::bail!("SQLite recovery failed ({rc}): {}", self.error_message());
}
Ok(())
}
fn finish(mut self) -> Result<()> {
let rc = unsafe { sqlite3_recover_finish(self.recover) };
self.recover = ptr::null_mut();
if rc != ffi::SQLITE_OK {
anyhow::bail!("SQLite recovery cleanup failed ({rc})");
}
Ok(())
}
fn error_message(&self) -> String {
let errcode = unsafe { sqlite3_recover_errcode(self.recover) };
let message = unsafe { sqlite3_recover_errmsg(self.recover) };
format!("{errcode}: {}", c_string_lossy(message))
}
}
impl Drop for RecoveryHandle {
fn drop(&mut self) {
if !self.recover.is_null() {
let _ = unsafe { sqlite3_recover_finish(self.recover) };
}
}
}
fn sqlite_error_message(db: *mut ffi::sqlite3) -> String {
if db.is_null() {
return "out of memory".to_string();
}
c_string_lossy(unsafe { ffi::sqlite3_errmsg(db) })
}
fn c_string_lossy(message: *const c_char) -> String {
if message.is_null() {
return "unknown error".to_string();
}
unsafe { CStr::from_ptr(message) }
.to_string_lossy()
.into_owned()
}
#[cfg(unix)]
fn path_to_cstring(path: &Path) -> Result<CString> {
use std::os::unix::ffi::OsStrExt;
CString::new(path.as_os_str().as_bytes())
.with_context(|| format!("path contains a NUL byte: {}", path.display()))
}
#[cfg(not(unix))]
fn path_to_cstring(path: &Path) -> Result<CString> {
let path_str = path
.to_str()
.with_context(|| format!("path is not valid UTF-8: {}", path.display()))?;
CString::new(path_str).with_context(|| format!("path contains a NUL byte: {}", path.display()))
}

View File

@@ -0,0 +1,14 @@
Vendored from upstream SQLite Fossil trunk:
- `ext/recover/sqlite3recover.c`
- `ext/recover/sqlite3recover.h`
- `ext/recover/dbdata.c`
`sqlite3.h` is copied from the `libsqlite3-sys` 0.30.1 bundled SQLite
source so Cargo and Bazel compile these extension files with matching public
SQLite declarations without adding a build-dependency that would compile
SQLite a second time.
These files implement SQLite's recover extension without invoking the
`sqlite3` command-line shell. They are compiled into `codex-state` and link
against the same `libsqlite3-sys` library that SQLx uses.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,249 @@
/*
** 2022-08-27
**
** The author disclaims copyright to this source code. In place of
** a legal notice, here is a blessing:
**
** May you do good and not evil.
** May you find forgiveness for yourself and forgive others.
** May you share freely, never taking more than you give.
**
*************************************************************************
**
** This file contains the public interface to the "recover" extension -
** an SQLite extension designed to recover data from corrupted database
** files.
*/
/*
** OVERVIEW:
**
** To use the API to recover data from a corrupted database, an
** application:
**
** 1) Creates an sqlite3_recover handle by calling either
** sqlite3_recover_init() or sqlite3_recover_init_sql().
**
** 2) Configures the new handle using one or more calls to
** sqlite3_recover_config().
**
** 3) Executes the recovery by repeatedly calling sqlite3_recover_step() on
** the handle until it returns something other than SQLITE_OK. If it
** returns SQLITE_DONE, then the recovery operation completed without
** error. If it returns some other non-SQLITE_OK value, then an error
** has occurred.
**
** 4) Retrieves any error code and English language error message using the
** sqlite3_recover_errcode() and sqlite3_recover_errmsg() APIs,
** respectively.
**
** 5) Destroys the sqlite3_recover handle and frees all resources
** using sqlite3_recover_finish().
**
** The application may abandon the recovery operation at any point
** before it is finished by passing the sqlite3_recover handle to
** sqlite3_recover_finish(). This is not an error, but the final state
** of the output database, or the results of running the partial script
** delivered to the SQL callback, are undefined.
*/
#ifndef _SQLITE_RECOVER_H
#define _SQLITE_RECOVER_H
#include "sqlite3.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
** An instance of the sqlite3_recover object represents a recovery
** operation in progress.
**
** Constructors:
**
** sqlite3_recover_init()
** sqlite3_recover_init_sql()
**
** Destructor:
**
** sqlite3_recover_finish()
**
** Methods:
**
** sqlite3_recover_config()
** sqlite3_recover_errcode()
** sqlite3_recover_errmsg()
** sqlite3_recover_run()
** sqlite3_recover_step()
*/
typedef struct sqlite3_recover sqlite3_recover;
/*
** These two APIs attempt to create and return a new sqlite3_recover object.
** In both cases the first two arguments identify the (possibly
** corrupt) database to recover data from. The first argument is an open
** database handle and the second the name of a database attached to that
** handle (i.e. "main", "temp" or the name of an attached database).
**
** If sqlite3_recover_init() is used to create the new sqlite3_recover
** handle, then data is recovered into a new database, identified by
** string parameter zUri. zUri may be an absolute or relative file path,
** or may be an SQLite URI. If the identified database file already exists,
** it is overwritten.
**
** If sqlite3_recover_init_sql() is invoked, then any recovered data will
** be returned to the user as a series of SQL statements. Executing these
** SQL statements results in the same database as would have been created
** had sqlite3_recover_init() been used. For each SQL statement in the
** output, the callback function passed as the third argument (xSql) is
** invoked once. The first parameter is a passed a copy of the fourth argument
** to this function (pCtx) as its first parameter, and a pointer to a
** nul-terminated buffer containing the SQL statement formated as UTF-8 as
** the second. If the xSql callback returns any value other than SQLITE_OK,
** then processing is immediately abandoned and the value returned used as
** the recover handle error code (see below).
**
** If an out-of-memory error occurs, NULL may be returned instead of
** a valid handle. In all other cases, it is the responsibility of the
** application to avoid resource leaks by ensuring that
** sqlite3_recover_finish() is called on all allocated handles.
*/
sqlite3_recover *sqlite3_recover_init(
sqlite3* db,
const char *zDb,
const char *zUri
);
sqlite3_recover *sqlite3_recover_init_sql(
sqlite3* db,
const char *zDb,
int (*xSql)(void*, const char*),
void *pCtx
);
/*
** Configure an sqlite3_recover object that has just been created using
** sqlite3_recover_init() or sqlite3_recover_init_sql(). This function
** may only be called before the first call to sqlite3_recover_step()
** or sqlite3_recover_run() on the object.
**
** The second argument passed to this function must be one of the
** SQLITE_RECOVER_* symbols defined below. Valid values for the third argument
** depend on the specific SQLITE_RECOVER_* symbol in use.
**
** SQLITE_OK is returned if the configuration operation was successful,
** or an SQLite error code otherwise.
*/
int sqlite3_recover_config(sqlite3_recover*, int op, void *pArg);
/*
** SQLITE_RECOVER_LOST_AND_FOUND:
** The pArg argument points to a string buffer containing the name
** of a "lost-and-found" table in the output database, or NULL. If
** the argument is non-NULL and the database contains seemingly
** valid pages that cannot be associated with any table in the
** recovered part of the schema, data is extracted from these
** pages to add to the lost-and-found table.
**
** SQLITE_RECOVER_FREELIST_CORRUPT:
** The pArg value must actually be a pointer to a value of type
** int containing value 0 or 1 cast as a (void*). If this option is set
** (argument is 1) and a lost-and-found table has been configured using
** SQLITE_RECOVER_LOST_AND_FOUND, then is assumed that the freelist is
** corrupt and an attempt is made to recover records from pages that
** appear to be linked into the freelist. Otherwise, pages on the freelist
** are ignored. Setting this option can recover more data from the
** database, but often ends up "recovering" deleted records. The default
** value is 0 (clear).
**
** SQLITE_RECOVER_ROWIDS:
** The pArg value must actually be a pointer to a value of type
** int containing value 0 or 1 cast as a (void*). If this option is set
** (argument is 1), then an attempt is made to recover rowid values
** that are not also INTEGER PRIMARY KEY values. If this option is
** clear, then new rowids are assigned to all recovered rows. The
** default value is 1 (set).
**
** SQLITE_RECOVER_SLOWINDEXES:
** The pArg value must actually be a pointer to a value of type
** int containing value 0 or 1 cast as a (void*). If this option is clear
** (argument is 0), then when creating an output database, the recover
** module creates and populates non-UNIQUE indexes right at the end of the
** recovery operation - after all recoverable data has been inserted
** into the new database. This is faster overall, but means that the
** final call to sqlite3_recover_step() for a recovery operation may
** be need to create a large number of indexes, which may be very slow.
**
** Or, if this option is set (argument is 1), then non-UNIQUE indexes
** are created in the output database before it is populated with
** recovered data. This is slower overall, but avoids the slow call
** to sqlite3_recover_step() at the end of the recovery operation.
**
** The default option value is 0.
*/
#define SQLITE_RECOVER_LOST_AND_FOUND 1
#define SQLITE_RECOVER_FREELIST_CORRUPT 2
#define SQLITE_RECOVER_ROWIDS 3
#define SQLITE_RECOVER_SLOWINDEXES 4
/*
** Perform a unit of work towards the recovery operation. This function
** must normally be called multiple times to complete database recovery.
**
** If no error occurs but the recovery operation is not completed, this
** function returns SQLITE_OK. If recovery has been completed successfully
** then SQLITE_DONE is returned. If an error has occurred, then an SQLite
** error code (e.g. SQLITE_IOERR or SQLITE_NOMEM) is returned. It is not
** considered an error if some or all of the data cannot be recovered
** due to database corruption.
**
** Once sqlite3_recover_step() has returned a value other than SQLITE_OK,
** all further such calls on the same recover handle are no-ops that return
** the same non-SQLITE_OK value.
*/
int sqlite3_recover_step(sqlite3_recover*);
/*
** Run the recovery operation to completion. Return SQLITE_OK if successful,
** or an SQLite error code otherwise. Calling this function is the same
** as executing:
**
** while( SQLITE_OK==sqlite3_recover_step(p) );
** return sqlite3_recover_errcode(p);
*/
int sqlite3_recover_run(sqlite3_recover*);
/*
** If an error has been encountered during a prior call to
** sqlite3_recover_step(), then this function attempts to return a
** pointer to a buffer containing an English language explanation of
** the error. If no error message is available, or if an out-of memory
** error occurs while attempting to allocate a buffer in which to format
** the error message, NULL is returned.
**
** The returned buffer remains valid until the sqlite3_recover handle is
** destroyed using sqlite3_recover_finish().
*/
const char *sqlite3_recover_errmsg(sqlite3_recover*);
/*
** If this function is called on an sqlite3_recover handle after
** an error occurs, an SQLite error code is returned. Otherwise, SQLITE_OK.
*/
int sqlite3_recover_errcode(sqlite3_recover*);
/*
** Clean up a recovery object created by a call to sqlite3_recover_init().
** The results of using a recovery object with any API after it has been
** passed to this function are undefined.
**
** This function returns the same value as sqlite3_recover_errcode().
*/
int sqlite3_recover_finish(sqlite3_recover*);
#ifdef __cplusplus
} /* end of the 'extern "C"' block */
#endif
#endif /* ifndef _SQLITE_RECOVER_H */