diff --git a/codex-rs/state/src/runtime/recovery.rs b/codex-rs/state/src/runtime/recovery.rs index 49f9bd5d52..1964b5bf8a 100644 --- a/codex-rs/state/src/runtime/recovery.rs +++ b/codex-rs/state/src/runtime/recovery.rs @@ -18,6 +18,7 @@ use std::time::SystemTime; use std::time::UNIX_EPOCH; use tracing::warn; +mod lost_and_found; mod recover_api; const SQLITE_CORRUPT: i32 = 11; @@ -206,6 +207,7 @@ async fn run_recovery(path: &Path, recovered_path: &Path, migrator: &Migrator) - .context("sqlite recovery task panicked")??; let pool = open_recovered_pool(recovered_path.as_path()).await?; + lost_and_found::rebuild_from_recovered_schema_if_needed(&pool).await?; assert_recovered_schema(&pool).await?; assert_integrity_ok(&pool).await?; match migrator.run(&pool).await { @@ -509,6 +511,7 @@ mod tests { use pretty_assertions::assert_eq; use sqlx::migrate::Migrator; use std::fs::OpenOptions; + use std::io::Read; use std::io::Seek; use std::io::SeekFrom; use std::io::Write; @@ -654,6 +657,90 @@ INSERT INTO threads ( Ok(()) } + #[tokio::test] + async fn recovery_rebuilds_when_header_and_schema_root_are_lost() -> Result<()> { + let temp_dir = super::super::test_support::unique_temp_dir(); + tokio::fs::create_dir_all(temp_dir.as_path()).await?; + let db_path = temp_dir.join(super::super::STATE_DB.filename); + let migrator = crate::migrations::runtime_state_migrator(); + let pool = open_migrated_pool(db_path.as_path(), &migrator).await?; + let thread_id = "00000000-0000-0000-0000-000000000789"; + sqlx::query( + r#" +INSERT INTO threads ( + id, + rollout_path, + created_at, + updated_at, + source, + model_provider, + cwd, + title, + sandbox_policy, + approval_mode +) VALUES (?, ?, 1, 1, 'cli', 'test-provider', ?, 'schema root lost', 'read-only', 'on-request') + "#, + ) + .bind(thread_id) + .bind(temp_dir.join("session.jsonl").display().to_string()) + .bind(temp_dir.as_path().display().to_string()) + .execute(&pool) + .await?; + sqlx::query( + r#" +INSERT INTO thread_dynamic_tools ( + thread_id, + position, + name, + description, + input_schema +) VALUES (?, 0, 'shell', 'run a command', '{"type":"object"}') + "#, + ) + .bind(thread_id) + .execute(&pool) + .await?; + let page_size: i64 = sqlx::query_scalar("PRAGMA page_size") + .fetch_one(&pool) + .await?; + let threads_root_page: i64 = + sqlx::query_scalar("SELECT rootpage FROM sqlite_schema WHERE name = 'threads'") + .fetch_one(&pool) + .await?; + pool.close().await; + + overwrite_page_with_page( + db_path.as_path(), + page_size.try_into()?, + threads_root_page.try_into()?, + /*destination_page*/ 1, + )?; + + let err = anyhow::anyhow!("file is not a database"); + recover_database(db_path.as_path(), super::super::STATE_DB, &migrator, &err).await?; + + let pool = open_recovered_pool(db_path.as_path()).await?; + let title: String = sqlx::query_scalar("SELECT title FROM threads WHERE id = ?") + .bind(thread_id) + .fetch_one(&pool) + .await?; + let tool_schema: String = + sqlx::query_scalar("SELECT input_schema FROM thread_dynamic_tools WHERE thread_id = ?") + .bind(thread_id) + .fetch_one(&pool) + .await?; + let migration_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM _sqlx_migrations") + .fetch_one(&pool) + .await?; + pool.close().await; + + assert_eq!(title, "schema root lost"); + assert_eq!(tool_schema, r#"{"type":"object"}"#); + assert_eq!(migration_count, migrator.migrations.len() as i64); + let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await; + Ok(()) + } + #[tokio::test] async fn recovery_rejects_output_without_user_tables() -> Result<()> { let temp_dir = super::super::test_support::unique_temp_dir(); @@ -720,4 +807,25 @@ INSERT INTO threads ( file.write_all(&[0; 16])?; Ok(()) } + + fn overwrite_page_with_page( + path: &Path, + page_size: u64, + source_page: u64, + destination_page: u64, + ) -> Result<()> { + let mut page = vec![0; page_size.try_into()?]; + let mut file = OpenOptions::new().read(true).write(true).open(path)?; + let source_offset = page_size + .checked_mul(source_page.saturating_sub(1)) + .context("source page offset overflowed")?; + file.seek(SeekFrom::Start(source_offset))?; + file.read_exact(page.as_mut_slice())?; + let destination_offset = page_size + .checked_mul(destination_page.saturating_sub(1)) + .context("destination page offset overflowed")?; + file.seek(SeekFrom::Start(destination_offset))?; + file.write_all(page.as_slice())?; + Ok(()) + } } diff --git a/codex-rs/state/src/runtime/recovery/lost_and_found.rs b/codex-rs/state/src/runtime/recovery/lost_and_found.rs new file mode 100644 index 0000000000..bccd50b0e8 --- /dev/null +++ b/codex-rs/state/src/runtime/recovery/lost_and_found.rs @@ -0,0 +1,219 @@ +use super::quote_identifier; +use anyhow::Context; +use anyhow::Result; +use sqlx::Row; +use sqlx::SqlitePool; +use tracing::warn; + +// If page 1 is destroyed, SQLite's recovery extension cannot discover the +// sqlite_schema root. It can still recover orphaned sqlite_schema rows into +// lost_and_found, so this module rebuilds tables from those rows and then +// copies matching lost_and_found record groups back into the recreated tables. + +#[derive(Debug)] +struct SchemaObject { + object_type: String, + name: String, + rootpage: i64, + sql: String, +} + +#[derive(Debug)] +struct TableColumn { + cid: i64, + name: String, + column_type: String, + pk: i64, + hidden: i64, +} + +pub(super) async fn rebuild_from_recovered_schema_if_needed(pool: &SqlitePool) -> Result { + if !only_lost_and_found_table_exists(pool).await? { + return Ok(false); + } + + let schema = recovered_schema_objects(pool).await?; + if schema + .iter() + .filter(|object| object.object_type == "table") + .count() + == 0 + { + return Ok(false); + } + + warn!( + "SQLite recovery produced only lost_and_found rows; rebuilding tables from recovered schema rows" + ); + sqlx::query("PRAGMA foreign_keys = OFF") + .execute(pool) + .await?; + + for object in schema.iter().filter(|object| object.object_type == "table") { + sqlx::query(object.sql.as_str()) + .execute(pool) + .await + .with_context(|| format!("failed to recreate recovered table {}", object.name))?; + } + + let value_column_count = lost_and_found_value_column_count(pool).await?; + for object in schema.iter().filter(|object| object.object_type == "table") { + copy_lost_and_found_rows(pool, object, value_column_count).await?; + } + + for object in schema.iter().filter(|object| object.object_type != "table") { + if let Err(err) = sqlx::query(object.sql.as_str()).execute(pool).await { + warn!( + "skipping recovered {} {} during lost_and_found rebuild: {err}", + object.object_type, object.name + ); + } + } + + Ok(true) +} + +async fn only_lost_and_found_table_exists(pool: &SqlitePool) -> Result { + let tables = sqlx::query_scalar::<_, String>( + r#" +SELECT name +FROM sqlite_schema +WHERE type = 'table' + AND name NOT LIKE 'sqlite_%' + AND name != '_sqlx_migrations' +ORDER BY name + "#, + ) + .fetch_all(pool) + .await?; + Ok(tables.len() == 1 && tables[0] == "lost_and_found") +} + +async fn recovered_schema_objects(pool: &SqlitePool) -> Result> { + let rows = sqlx::query( + r#" +SELECT + CAST(c0 AS TEXT) AS object_type, + CAST(c1 AS TEXT) AS name, + CAST(c3 AS INTEGER) AS rootpage, + CAST(c4 AS TEXT) AS sql +FROM lost_and_found +WHERE nfield = 5 + AND c0 IN ('table', 'index', 'trigger', 'view') + AND typeof(c1) = 'text' + AND typeof(c4) = 'text' + AND c1 NOT LIKE 'sqlite_%' +ORDER BY + CASE c0 + WHEN 'table' THEN 0 + WHEN 'view' THEN 1 + WHEN 'index' THEN 2 + WHEN 'trigger' THEN 3 + ELSE 4 + END, + id + "#, + ) + .fetch_all(pool) + .await?; + + rows.into_iter() + .map(|row| { + Ok(SchemaObject { + object_type: row.try_get("object_type")?, + name: row.try_get("name")?, + rootpage: row.try_get("rootpage")?, + sql: row.try_get("sql")?, + }) + }) + .collect() +} + +async fn lost_and_found_value_column_count(pool: &SqlitePool) -> Result { + let rows = sqlx::query("SELECT name FROM pragma_table_xinfo('lost_and_found')") + .fetch_all(pool) + .await?; + Ok(rows + .iter() + .filter_map(|row| row.try_get::("name").ok()) + .filter_map(|name| name.strip_prefix('c')?.parse::().ok()) + .max() + .map_or(0, |max_column| max_column + 1)) +} + +async fn copy_lost_and_found_rows( + pool: &SqlitePool, + table: &SchemaObject, + value_column_count: i64, +) -> Result<()> { + if table.rootpage <= 0 { + return Ok(()); + } + + let columns = table_columns(pool, table.name.as_str()).await?; + let mut column_names = Vec::new(); + let mut value_expressions = Vec::new(); + for column in columns.into_iter().filter(|column| column.hidden == 0) { + column_names.push(quote_identifier(column.name.as_str())); + if is_integer_primary_key(&column) { + // INTEGER PRIMARY KEY values live in the b-tree rowid, not in the + // record body, so sqlite_dbdata exposes them through lost_and_found.id. + value_expressions.push("id".to_string()); + } else { + if column.cid >= value_column_count { + anyhow::bail!( + "lost_and_found table has {value_column_count} value columns, but recovered table {} needs c{}", + table.name, + column.cid + ); + } + value_expressions.push(format!("c{}", column.cid)); + } + } + + if column_names.is_empty() { + return Ok(()); + } + + let table_name = quote_identifier(table.name.as_str()); + let sql = format!( + "INSERT OR REPLACE INTO main.{table_name} ({}) SELECT {} FROM lost_and_found WHERE rootpgno = ?", + column_names.join(", "), + value_expressions.join(", ") + ); + sqlx::query(sql.as_str()) + .bind(table.rootpage) + .execute(pool) + .await + .with_context(|| format!("failed to copy lost_and_found rows into {}", table.name))?; + Ok(()) +} + +async fn table_columns(pool: &SqlitePool, table: &str) -> Result> { + let rows = sqlx::query( + r#" +SELECT cid, name, type, pk, hidden +FROM pragma_table_xinfo(?) +ORDER BY cid + "#, + ) + .bind(table) + .fetch_all(pool) + .await?; + + rows.into_iter() + .map(|row| { + Ok(TableColumn { + cid: row.try_get("cid")?, + name: row.try_get("name")?, + column_type: row.try_get("type")?, + pk: row.try_get("pk")?, + hidden: row.try_get("hidden")?, + }) + }) + .collect() +} + +fn is_integer_primary_key(column: &TableColumn) -> bool { + column.pk > 0 && column.column_type.eq_ignore_ascii_case("INTEGER") +} diff --git a/codex-rs/state/src/runtime/recovery/recover_api.rs b/codex-rs/state/src/runtime/recovery/recover_api.rs index 779406ab6b..d8567b1ff4 100644 --- a/codex-rs/state/src/runtime/recovery/recover_api.rs +++ b/codex-rs/state/src/runtime/recovery/recover_api.rs @@ -6,10 +6,28 @@ use std::ffi::CString; use std::ffi::c_char; use std::ffi::c_int; use std::ffi::c_void; +use std::fs::File; +use std::fs::OpenOptions; +use std::io::Read; +use std::io::Seek; +use std::io::SeekFrom; +use std::io::Write; use std::path::Path; +use std::path::PathBuf; use std::ptr; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; const SQLITE_RECOVER_LOST_AND_FOUND: c_int = 1; +const SQLITE_HEADER: &[u8; 16] = b"SQLite format 3\0"; +const SQLITE_UTF8_ENCODING: u32 = 1; +const SQLITE_SCHEMA_FORMAT_4: u32 = 4; +const SQLITE_MAX_U16_PAGE_SIZE_SENTINEL: u32 = 1; +const SQLITE_MAX_PAGE_SIZE: usize = 65_536; +const SQLITE_MIN_PAGE_SIZE: usize = 512; +const SQLITE_PAGE1_BTREE_HEADER_OFFSET: usize = 100; +const SQLITE_LEAF_TABLE_PAGE: u8 = 0x0d; +const HEADER_REPAIR_SCAN_PAGES: usize = 64; #[repr(C)] struct SqliteRecover { @@ -30,6 +48,26 @@ unsafe extern "C" { } pub(super) fn recover(path: &Path, recovered_path: &Path) -> Result<()> { + match recover_inner(path, recovered_path) { + Ok(()) => Ok(()), + Err(err) if is_notadb_error(&err) => { + let repaired = HeaderRepairedInput::create(path).with_context(|| { + format!( + "failed to prepare header-repaired recovery input for {}", + path.display() + ) + })?; + recover_inner(repaired.path(), recovered_path).with_context(|| { + format!( + "SQLite recovery with a synthesized header failed after direct recovery failed: {err}" + ) + }) + } + Err(err) => Err(err), + } +} + +fn recover_inner(path: &Path, recovered_path: &Path) -> Result<()> { let db = SqliteHandle::open(path)?; let recovered_path = path_to_cstring(recovered_path)?; let mut recovery = RecoveryHandle::new(db.as_ptr(), recovered_path.as_c_str())?; @@ -38,6 +76,210 @@ pub(super) fn recover(path: &Path, recovered_path: &Path) -> Result<()> { recovery.finish() } +fn is_notadb_error(err: &anyhow::Error) -> bool { + err.chain().any(|cause| { + let message = cause.to_string(); + message.contains("(26)") || message.contains("file is not a database") + }) +} + +struct HeaderRepairedInput { + path: PathBuf, +} + +impl HeaderRepairedInput { + fn create(path: &Path) -> Result { + let mut input = + File::open(path).with_context(|| format!("failed to open {}", path.display()))?; + let file_len = input.metadata()?.len(); + let page_size = detect_page_size(&mut input, file_len)?; + let page_count = file_len + .checked_div(page_size as u64) + .context("database page count overflowed")?; + let page_count = u32::try_from(page_count) + .context("database is too large to synthesize a SQLite header")?; + + let mut output = None; + for sequence in 0..1000 { + let repaired_path = header_repair_path(path, sequence)?; + match OpenOptions::new() + .write(true) + .create_new(true) + .open(repaired_path.as_path()) + { + Ok(file) => { + output = Some((repaired_path, file)); + break; + } + Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {} + Err(err) => return Err(err.into()), + } + } + let (repaired_path, mut output) = + output.context("failed to allocate a unique header-repaired recovery path")?; + + let header = synthesized_header_page(page_size, page_count)?; + output.write_all(header.as_slice())?; + input.seek(SeekFrom::Start(page_size as u64))?; + std::io::copy(&mut input, &mut output)?; + + Ok(Self { + path: repaired_path, + }) + } + + fn path(&self) -> &Path { + self.path.as_path() + } +} + +impl Drop for HeaderRepairedInput { + fn drop(&mut self) { + let _ = std::fs::remove_file(self.path.as_path()); + } +} + +fn header_repair_path(path: &Path, sequence: u32) -> Result { + let file_name = path.file_name().ok_or_else(|| { + anyhow::anyhow!( + "cannot create a header-repaired recovery file name for {}", + path.display() + ) + })?; + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0, |duration| duration.as_nanos()); + let mut candidate = file_name.to_os_string(); + candidate.push(format!( + ".codex-recovery-header-repair.{}.{timestamp}.{sequence}.sqlite", + std::process::id() + )); + Ok(path.with_file_name(candidate)) +} + +fn detect_page_size(input: &mut File, file_len: u64) -> Result { + if file_len < SQLITE_MIN_PAGE_SIZE as u64 { + anyhow::bail!("database file is too small to repair its SQLite header"); + } + + input.seek(SeekFrom::Start(0))?; + let scan_len = file_len.min((SQLITE_MAX_PAGE_SIZE * 4) as u64) as usize; + let mut prefix = vec![0; scan_len]; + input.read_exact(prefix.as_mut_slice())?; + + let mut best_page_size = None; + let mut best_valid_pages = 0; + let mut page_size = SQLITE_MIN_PAGE_SIZE; + while page_size <= SQLITE_MAX_PAGE_SIZE { + if file_len.is_multiple_of(page_size as u64) { + let valid_pages = count_valid_btree_pages(prefix.as_slice(), page_size); + if valid_pages > best_valid_pages { + best_valid_pages = valid_pages; + best_page_size = Some(page_size); + } + } + page_size *= 2; + } + + best_page_size.context("failed to infer SQLite page size for header repair") +} + +fn count_valid_btree_pages(prefix: &[u8], page_size: usize) -> usize { + prefix + .chunks_exact(page_size) + .take(HEADER_REPAIR_SCAN_PAGES) + .filter(|page| is_plausible_btree_page(page)) + .count() +} + +fn is_plausible_btree_page(page: &[u8]) -> bool { + let page_type = page[0]; + let header_size = match page_type { + 0x02 | 0x05 => 12, + 0x0a | 0x0d => 8, + _ => return false, + }; + let first_freeblock = read_u16(&page[1..3]) as usize; + let cell_count = read_u16(&page[3..5]) as usize; + let cell_content_start = read_u16(&page[5..7]) as usize; + let cell_content_start = if cell_content_start == 0 && page.len() == SQLITE_MAX_PAGE_SIZE { + SQLITE_MAX_PAGE_SIZE + } else { + cell_content_start + }; + let pointer_array_end = header_size + cell_count.saturating_mul(2); + + if first_freeblock >= page.len() + || cell_content_start > page.len() + || pointer_array_end > page.len() + || cell_count > (page.len() - header_size) / 2 + { + return false; + } + + for index in 0..cell_count { + let offset = header_size + index * 2; + let cell_offset = read_u16(&page[offset..offset + 2]) as usize; + if cell_offset < pointer_array_end || cell_offset >= page.len() { + return false; + } + } + + true +} + +fn synthesized_header_page(page_size: usize, page_count: u32) -> Result> { + if !(SQLITE_MIN_PAGE_SIZE..=SQLITE_MAX_PAGE_SIZE).contains(&page_size) + || !page_size.is_power_of_two() + { + anyhow::bail!("invalid SQLite page size inferred for header repair: {page_size}"); + } + + let mut page = vec![0; page_size]; + page[0..16].copy_from_slice(SQLITE_HEADER); + let header_page_size = if page_size == SQLITE_MAX_PAGE_SIZE { + SQLITE_MAX_U16_PAGE_SIZE_SENTINEL + } else { + page_size as u32 + }; + write_u16(&mut page[16..18], header_page_size); + page[18] = 1; + page[19] = 1; + page[20] = 0; + page[21] = 64; + page[22] = 32; + page[23] = 32; + write_u32(&mut page[28..32], page_count); + write_u32(&mut page[44..48], SQLITE_SCHEMA_FORMAT_4); + write_u32(&mut page[56..60], SQLITE_UTF8_ENCODING); + + // Use an empty sqlite_schema b-tree. The damaged page 1 cannot be trusted, + // and orphaned schema rows will be recovered through lost_and_found. + page[SQLITE_PAGE1_BTREE_HEADER_OFFSET] = SQLITE_LEAF_TABLE_PAGE; + let btree_content_start = if page_size == SQLITE_MAX_PAGE_SIZE { + 0 + } else { + page_size as u32 + }; + write_u16( + &mut page[SQLITE_PAGE1_BTREE_HEADER_OFFSET + 5..SQLITE_PAGE1_BTREE_HEADER_OFFSET + 7], + btree_content_start, + ); + Ok(page) +} + +fn read_u16(bytes: &[u8]) -> u16 { + u16::from_be_bytes([bytes[0], bytes[1]]) +} + +fn write_u16(bytes: &mut [u8], value: u32) { + bytes.copy_from_slice(&(value as u16).to_be_bytes()); +} + +fn write_u32(bytes: &mut [u8], value: u32) { + bytes.copy_from_slice(&value.to_be_bytes()); +} + struct SqliteHandle { db: *mut ffi::sqlite3, }