add ability to recover page 0 corruptions

This commit is contained in:
David de Regt
2026-05-21 10:39:30 -07:00
parent 03f1c6f260
commit 90ca356b53
3 changed files with 569 additions and 0 deletions

View File

@@ -18,6 +18,7 @@ use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tracing::warn;
mod lost_and_found;
mod recover_api;
const SQLITE_CORRUPT: i32 = 11;
@@ -206,6 +207,7 @@ async fn run_recovery(path: &Path, recovered_path: &Path, migrator: &Migrator) -
.context("sqlite recovery task panicked")??;
let pool = open_recovered_pool(recovered_path.as_path()).await?;
lost_and_found::rebuild_from_recovered_schema_if_needed(&pool).await?;
assert_recovered_schema(&pool).await?;
assert_integrity_ok(&pool).await?;
match migrator.run(&pool).await {
@@ -509,6 +511,7 @@ mod tests {
use pretty_assertions::assert_eq;
use sqlx::migrate::Migrator;
use std::fs::OpenOptions;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
@@ -654,6 +657,90 @@ INSERT INTO threads (
Ok(())
}
#[tokio::test]
async fn recovery_rebuilds_when_header_and_schema_root_are_lost() -> Result<()> {
let temp_dir = super::super::test_support::unique_temp_dir();
tokio::fs::create_dir_all(temp_dir.as_path()).await?;
let db_path = temp_dir.join(super::super::STATE_DB.filename);
let migrator = crate::migrations::runtime_state_migrator();
let pool = open_migrated_pool(db_path.as_path(), &migrator).await?;
let thread_id = "00000000-0000-0000-0000-000000000789";
sqlx::query(
r#"
INSERT INTO threads (
id,
rollout_path,
created_at,
updated_at,
source,
model_provider,
cwd,
title,
sandbox_policy,
approval_mode
) VALUES (?, ?, 1, 1, 'cli', 'test-provider', ?, 'schema root lost', 'read-only', 'on-request')
"#,
)
.bind(thread_id)
.bind(temp_dir.join("session.jsonl").display().to_string())
.bind(temp_dir.as_path().display().to_string())
.execute(&pool)
.await?;
sqlx::query(
r#"
INSERT INTO thread_dynamic_tools (
thread_id,
position,
name,
description,
input_schema
) VALUES (?, 0, 'shell', 'run a command', '{"type":"object"}')
"#,
)
.bind(thread_id)
.execute(&pool)
.await?;
let page_size: i64 = sqlx::query_scalar("PRAGMA page_size")
.fetch_one(&pool)
.await?;
let threads_root_page: i64 =
sqlx::query_scalar("SELECT rootpage FROM sqlite_schema WHERE name = 'threads'")
.fetch_one(&pool)
.await?;
pool.close().await;
overwrite_page_with_page(
db_path.as_path(),
page_size.try_into()?,
threads_root_page.try_into()?,
/*destination_page*/ 1,
)?;
let err = anyhow::anyhow!("file is not a database");
recover_database(db_path.as_path(), super::super::STATE_DB, &migrator, &err).await?;
let pool = open_recovered_pool(db_path.as_path()).await?;
let title: String = sqlx::query_scalar("SELECT title FROM threads WHERE id = ?")
.bind(thread_id)
.fetch_one(&pool)
.await?;
let tool_schema: String =
sqlx::query_scalar("SELECT input_schema FROM thread_dynamic_tools WHERE thread_id = ?")
.bind(thread_id)
.fetch_one(&pool)
.await?;
let migration_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM _sqlx_migrations")
.fetch_one(&pool)
.await?;
pool.close().await;
assert_eq!(title, "schema root lost");
assert_eq!(tool_schema, r#"{"type":"object"}"#);
assert_eq!(migration_count, migrator.migrations.len() as i64);
let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await;
Ok(())
}
#[tokio::test]
async fn recovery_rejects_output_without_user_tables() -> Result<()> {
let temp_dir = super::super::test_support::unique_temp_dir();
@@ -720,4 +807,25 @@ INSERT INTO threads (
file.write_all(&[0; 16])?;
Ok(())
}
fn overwrite_page_with_page(
path: &Path,
page_size: u64,
source_page: u64,
destination_page: u64,
) -> Result<()> {
let mut page = vec![0; page_size.try_into()?];
let mut file = OpenOptions::new().read(true).write(true).open(path)?;
let source_offset = page_size
.checked_mul(source_page.saturating_sub(1))
.context("source page offset overflowed")?;
file.seek(SeekFrom::Start(source_offset))?;
file.read_exact(page.as_mut_slice())?;
let destination_offset = page_size
.checked_mul(destination_page.saturating_sub(1))
.context("destination page offset overflowed")?;
file.seek(SeekFrom::Start(destination_offset))?;
file.write_all(page.as_slice())?;
Ok(())
}
}

View File

@@ -0,0 +1,219 @@
use super::quote_identifier;
use anyhow::Context;
use anyhow::Result;
use sqlx::Row;
use sqlx::SqlitePool;
use tracing::warn;
// If page 1 is destroyed, SQLite's recovery extension cannot discover the
// sqlite_schema root. It can still recover orphaned sqlite_schema rows into
// lost_and_found, so this module rebuilds tables from those rows and then
// copies matching lost_and_found record groups back into the recreated tables.
#[derive(Debug)]
struct SchemaObject {
object_type: String,
name: String,
rootpage: i64,
sql: String,
}
#[derive(Debug)]
struct TableColumn {
cid: i64,
name: String,
column_type: String,
pk: i64,
hidden: i64,
}
pub(super) async fn rebuild_from_recovered_schema_if_needed(pool: &SqlitePool) -> Result<bool> {
if !only_lost_and_found_table_exists(pool).await? {
return Ok(false);
}
let schema = recovered_schema_objects(pool).await?;
if schema
.iter()
.filter(|object| object.object_type == "table")
.count()
== 0
{
return Ok(false);
}
warn!(
"SQLite recovery produced only lost_and_found rows; rebuilding tables from recovered schema rows"
);
sqlx::query("PRAGMA foreign_keys = OFF")
.execute(pool)
.await?;
for object in schema.iter().filter(|object| object.object_type == "table") {
sqlx::query(object.sql.as_str())
.execute(pool)
.await
.with_context(|| format!("failed to recreate recovered table {}", object.name))?;
}
let value_column_count = lost_and_found_value_column_count(pool).await?;
for object in schema.iter().filter(|object| object.object_type == "table") {
copy_lost_and_found_rows(pool, object, value_column_count).await?;
}
for object in schema.iter().filter(|object| object.object_type != "table") {
if let Err(err) = sqlx::query(object.sql.as_str()).execute(pool).await {
warn!(
"skipping recovered {} {} during lost_and_found rebuild: {err}",
object.object_type, object.name
);
}
}
Ok(true)
}
async fn only_lost_and_found_table_exists(pool: &SqlitePool) -> Result<bool> {
let tables = sqlx::query_scalar::<_, String>(
r#"
SELECT name
FROM sqlite_schema
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
AND name != '_sqlx_migrations'
ORDER BY name
"#,
)
.fetch_all(pool)
.await?;
Ok(tables.len() == 1 && tables[0] == "lost_and_found")
}
async fn recovered_schema_objects(pool: &SqlitePool) -> Result<Vec<SchemaObject>> {
let rows = sqlx::query(
r#"
SELECT
CAST(c0 AS TEXT) AS object_type,
CAST(c1 AS TEXT) AS name,
CAST(c3 AS INTEGER) AS rootpage,
CAST(c4 AS TEXT) AS sql
FROM lost_and_found
WHERE nfield = 5
AND c0 IN ('table', 'index', 'trigger', 'view')
AND typeof(c1) = 'text'
AND typeof(c4) = 'text'
AND c1 NOT LIKE 'sqlite_%'
ORDER BY
CASE c0
WHEN 'table' THEN 0
WHEN 'view' THEN 1
WHEN 'index' THEN 2
WHEN 'trigger' THEN 3
ELSE 4
END,
id
"#,
)
.fetch_all(pool)
.await?;
rows.into_iter()
.map(|row| {
Ok(SchemaObject {
object_type: row.try_get("object_type")?,
name: row.try_get("name")?,
rootpage: row.try_get("rootpage")?,
sql: row.try_get("sql")?,
})
})
.collect()
}
async fn lost_and_found_value_column_count(pool: &SqlitePool) -> Result<i64> {
let rows = sqlx::query("SELECT name FROM pragma_table_xinfo('lost_and_found')")
.fetch_all(pool)
.await?;
Ok(rows
.iter()
.filter_map(|row| row.try_get::<String, _>("name").ok())
.filter_map(|name| name.strip_prefix('c')?.parse::<i64>().ok())
.max()
.map_or(0, |max_column| max_column + 1))
}
async fn copy_lost_and_found_rows(
pool: &SqlitePool,
table: &SchemaObject,
value_column_count: i64,
) -> Result<()> {
if table.rootpage <= 0 {
return Ok(());
}
let columns = table_columns(pool, table.name.as_str()).await?;
let mut column_names = Vec::new();
let mut value_expressions = Vec::new();
for column in columns.into_iter().filter(|column| column.hidden == 0) {
column_names.push(quote_identifier(column.name.as_str()));
if is_integer_primary_key(&column) {
// INTEGER PRIMARY KEY values live in the b-tree rowid, not in the
// record body, so sqlite_dbdata exposes them through lost_and_found.id.
value_expressions.push("id".to_string());
} else {
if column.cid >= value_column_count {
anyhow::bail!(
"lost_and_found table has {value_column_count} value columns, but recovered table {} needs c{}",
table.name,
column.cid
);
}
value_expressions.push(format!("c{}", column.cid));
}
}
if column_names.is_empty() {
return Ok(());
}
let table_name = quote_identifier(table.name.as_str());
let sql = format!(
"INSERT OR REPLACE INTO main.{table_name} ({}) SELECT {} FROM lost_and_found WHERE rootpgno = ?",
column_names.join(", "),
value_expressions.join(", ")
);
sqlx::query(sql.as_str())
.bind(table.rootpage)
.execute(pool)
.await
.with_context(|| format!("failed to copy lost_and_found rows into {}", table.name))?;
Ok(())
}
async fn table_columns(pool: &SqlitePool, table: &str) -> Result<Vec<TableColumn>> {
let rows = sqlx::query(
r#"
SELECT cid, name, type, pk, hidden
FROM pragma_table_xinfo(?)
ORDER BY cid
"#,
)
.bind(table)
.fetch_all(pool)
.await?;
rows.into_iter()
.map(|row| {
Ok(TableColumn {
cid: row.try_get("cid")?,
name: row.try_get("name")?,
column_type: row.try_get("type")?,
pk: row.try_get("pk")?,
hidden: row.try_get("hidden")?,
})
})
.collect()
}
fn is_integer_primary_key(column: &TableColumn) -> bool {
column.pk > 0 && column.column_type.eq_ignore_ascii_case("INTEGER")
}

View File

@@ -6,10 +6,28 @@ use std::ffi::CString;
use std::ffi::c_char;
use std::ffi::c_int;
use std::ffi::c_void;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::ptr;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
const SQLITE_RECOVER_LOST_AND_FOUND: c_int = 1;
const SQLITE_HEADER: &[u8; 16] = b"SQLite format 3\0";
const SQLITE_UTF8_ENCODING: u32 = 1;
const SQLITE_SCHEMA_FORMAT_4: u32 = 4;
const SQLITE_MAX_U16_PAGE_SIZE_SENTINEL: u32 = 1;
const SQLITE_MAX_PAGE_SIZE: usize = 65_536;
const SQLITE_MIN_PAGE_SIZE: usize = 512;
const SQLITE_PAGE1_BTREE_HEADER_OFFSET: usize = 100;
const SQLITE_LEAF_TABLE_PAGE: u8 = 0x0d;
const HEADER_REPAIR_SCAN_PAGES: usize = 64;
#[repr(C)]
struct SqliteRecover {
@@ -30,6 +48,26 @@ unsafe extern "C" {
}
pub(super) fn recover(path: &Path, recovered_path: &Path) -> Result<()> {
match recover_inner(path, recovered_path) {
Ok(()) => Ok(()),
Err(err) if is_notadb_error(&err) => {
let repaired = HeaderRepairedInput::create(path).with_context(|| {
format!(
"failed to prepare header-repaired recovery input for {}",
path.display()
)
})?;
recover_inner(repaired.path(), recovered_path).with_context(|| {
format!(
"SQLite recovery with a synthesized header failed after direct recovery failed: {err}"
)
})
}
Err(err) => Err(err),
}
}
fn recover_inner(path: &Path, recovered_path: &Path) -> Result<()> {
let db = SqliteHandle::open(path)?;
let recovered_path = path_to_cstring(recovered_path)?;
let mut recovery = RecoveryHandle::new(db.as_ptr(), recovered_path.as_c_str())?;
@@ -38,6 +76,210 @@ pub(super) fn recover(path: &Path, recovered_path: &Path) -> Result<()> {
recovery.finish()
}
fn is_notadb_error(err: &anyhow::Error) -> bool {
err.chain().any(|cause| {
let message = cause.to_string();
message.contains("(26)") || message.contains("file is not a database")
})
}
struct HeaderRepairedInput {
path: PathBuf,
}
impl HeaderRepairedInput {
fn create(path: &Path) -> Result<Self> {
let mut input =
File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
let file_len = input.metadata()?.len();
let page_size = detect_page_size(&mut input, file_len)?;
let page_count = file_len
.checked_div(page_size as u64)
.context("database page count overflowed")?;
let page_count = u32::try_from(page_count)
.context("database is too large to synthesize a SQLite header")?;
let mut output = None;
for sequence in 0..1000 {
let repaired_path = header_repair_path(path, sequence)?;
match OpenOptions::new()
.write(true)
.create_new(true)
.open(repaired_path.as_path())
{
Ok(file) => {
output = Some((repaired_path, file));
break;
}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(err) => return Err(err.into()),
}
}
let (repaired_path, mut output) =
output.context("failed to allocate a unique header-repaired recovery path")?;
let header = synthesized_header_page(page_size, page_count)?;
output.write_all(header.as_slice())?;
input.seek(SeekFrom::Start(page_size as u64))?;
std::io::copy(&mut input, &mut output)?;
Ok(Self {
path: repaired_path,
})
}
fn path(&self) -> &Path {
self.path.as_path()
}
}
impl Drop for HeaderRepairedInput {
fn drop(&mut self) {
let _ = std::fs::remove_file(self.path.as_path());
}
}
fn header_repair_path(path: &Path, sequence: u32) -> Result<PathBuf> {
let file_name = path.file_name().ok_or_else(|| {
anyhow::anyhow!(
"cannot create a header-repaired recovery file name for {}",
path.display()
)
})?;
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos());
let mut candidate = file_name.to_os_string();
candidate.push(format!(
".codex-recovery-header-repair.{}.{timestamp}.{sequence}.sqlite",
std::process::id()
));
Ok(path.with_file_name(candidate))
}
fn detect_page_size(input: &mut File, file_len: u64) -> Result<usize> {
if file_len < SQLITE_MIN_PAGE_SIZE as u64 {
anyhow::bail!("database file is too small to repair its SQLite header");
}
input.seek(SeekFrom::Start(0))?;
let scan_len = file_len.min((SQLITE_MAX_PAGE_SIZE * 4) as u64) as usize;
let mut prefix = vec![0; scan_len];
input.read_exact(prefix.as_mut_slice())?;
let mut best_page_size = None;
let mut best_valid_pages = 0;
let mut page_size = SQLITE_MIN_PAGE_SIZE;
while page_size <= SQLITE_MAX_PAGE_SIZE {
if file_len.is_multiple_of(page_size as u64) {
let valid_pages = count_valid_btree_pages(prefix.as_slice(), page_size);
if valid_pages > best_valid_pages {
best_valid_pages = valid_pages;
best_page_size = Some(page_size);
}
}
page_size *= 2;
}
best_page_size.context("failed to infer SQLite page size for header repair")
}
fn count_valid_btree_pages(prefix: &[u8], page_size: usize) -> usize {
prefix
.chunks_exact(page_size)
.take(HEADER_REPAIR_SCAN_PAGES)
.filter(|page| is_plausible_btree_page(page))
.count()
}
fn is_plausible_btree_page(page: &[u8]) -> bool {
let page_type = page[0];
let header_size = match page_type {
0x02 | 0x05 => 12,
0x0a | 0x0d => 8,
_ => return false,
};
let first_freeblock = read_u16(&page[1..3]) as usize;
let cell_count = read_u16(&page[3..5]) as usize;
let cell_content_start = read_u16(&page[5..7]) as usize;
let cell_content_start = if cell_content_start == 0 && page.len() == SQLITE_MAX_PAGE_SIZE {
SQLITE_MAX_PAGE_SIZE
} else {
cell_content_start
};
let pointer_array_end = header_size + cell_count.saturating_mul(2);
if first_freeblock >= page.len()
|| cell_content_start > page.len()
|| pointer_array_end > page.len()
|| cell_count > (page.len() - header_size) / 2
{
return false;
}
for index in 0..cell_count {
let offset = header_size + index * 2;
let cell_offset = read_u16(&page[offset..offset + 2]) as usize;
if cell_offset < pointer_array_end || cell_offset >= page.len() {
return false;
}
}
true
}
fn synthesized_header_page(page_size: usize, page_count: u32) -> Result<Vec<u8>> {
if !(SQLITE_MIN_PAGE_SIZE..=SQLITE_MAX_PAGE_SIZE).contains(&page_size)
|| !page_size.is_power_of_two()
{
anyhow::bail!("invalid SQLite page size inferred for header repair: {page_size}");
}
let mut page = vec![0; page_size];
page[0..16].copy_from_slice(SQLITE_HEADER);
let header_page_size = if page_size == SQLITE_MAX_PAGE_SIZE {
SQLITE_MAX_U16_PAGE_SIZE_SENTINEL
} else {
page_size as u32
};
write_u16(&mut page[16..18], header_page_size);
page[18] = 1;
page[19] = 1;
page[20] = 0;
page[21] = 64;
page[22] = 32;
page[23] = 32;
write_u32(&mut page[28..32], page_count);
write_u32(&mut page[44..48], SQLITE_SCHEMA_FORMAT_4);
write_u32(&mut page[56..60], SQLITE_UTF8_ENCODING);
// Use an empty sqlite_schema b-tree. The damaged page 1 cannot be trusted,
// and orphaned schema rows will be recovered through lost_and_found.
page[SQLITE_PAGE1_BTREE_HEADER_OFFSET] = SQLITE_LEAF_TABLE_PAGE;
let btree_content_start = if page_size == SQLITE_MAX_PAGE_SIZE {
0
} else {
page_size as u32
};
write_u16(
&mut page[SQLITE_PAGE1_BTREE_HEADER_OFFSET + 5..SQLITE_PAGE1_BTREE_HEADER_OFFSET + 7],
btree_content_start,
);
Ok(page)
}
fn read_u16(bytes: &[u8]) -> u16 {
u16::from_be_bytes([bytes[0], bytes[1]])
}
fn write_u16(bytes: &mut [u8], value: u32) {
bytes.copy_from_slice(&(value as u16).to_be_bytes());
}
fn write_u32(bytes: &mut [u8], value: u32) {
bytes.copy_from_slice(&value.to_be_bytes());
}
struct SqliteHandle {
db: *mut ffi::sqlite3,
}