mirror of
https://github.com/openai/codex.git
synced 2026-05-29 23:40:29 +00:00
PR feedback
This commit is contained in:
@@ -18,9 +18,7 @@ pub(crate) fn is_locked(detail: &str) -> bool {
|
||||
|
||||
pub(crate) fn confirm_repair(startup_error: &LocalStateDbStartupError) -> std::io::Result<bool> {
|
||||
eprintln!("Codex couldn't start because its local database appears to be damaged.");
|
||||
eprintln!(
|
||||
"Codex can try a last-resort startup repair by backing up those files and rebuilding empty local databases."
|
||||
);
|
||||
eprintln!("Codex can try to repair by backing up and rebuilding those files.");
|
||||
print_technical_details(startup_error);
|
||||
crate::confirm("Repair Codex local data now? [y/N]: ")
|
||||
}
|
||||
|
||||
@@ -95,3 +95,7 @@ pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count";
|
||||
pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms";
|
||||
/// Rollout fallback attempts. Tags: [caller, reason]
|
||||
pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count";
|
||||
/// SQLite automatic recovery attempts. Tags: [status, db, error, trigger_error]
|
||||
pub const DB_RECOVERY_METRIC: &str = "codex.sqlite.recovery.count";
|
||||
/// SQLite automatic recovery latency. Tags: [status, db, error, trigger_error]
|
||||
pub const DB_RECOVERY_DURATION_METRIC: &str = "codex.sqlite.recovery.duration_ms";
|
||||
|
||||
@@ -361,7 +361,7 @@ async fn open_sqlite(
|
||||
if !recovery::is_malformed_sqlite_error(&err) {
|
||||
return Err(err);
|
||||
}
|
||||
recovery::recover_database(path, spec, migrator, &err).await?;
|
||||
recovery::recover_database(path, spec, migrator, &err, telemetry_override).await?;
|
||||
connect_sqlite(options.clone(), spec, telemetry_override).await?
|
||||
}
|
||||
};
|
||||
@@ -373,7 +373,7 @@ async fn open_sqlite(
|
||||
return Err(err);
|
||||
}
|
||||
pool.close().await;
|
||||
recovery::recover_database(path, spec, migrator, &err).await?;
|
||||
recovery::recover_database(path, spec, migrator, &err, telemetry_override).await?;
|
||||
let pool = connect_sqlite(options, spec, telemetry_override).await?;
|
||||
migrate_sqlite(&pool, migrator, spec, telemetry_override).await?;
|
||||
Ok(pool)
|
||||
@@ -507,6 +507,7 @@ mod tests {
|
||||
use super::state_db_path;
|
||||
use super::test_support::unique_temp_dir;
|
||||
use crate::DB_INIT_METRIC;
|
||||
use crate::DB_RECOVERY_METRIC;
|
||||
use crate::DbTelemetry;
|
||||
use crate::migrations::STATE_MIGRATOR;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -515,6 +516,10 @@ mod tests {
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Seek;
|
||||
use std::io::SeekFrom;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
|
||||
@@ -659,6 +664,102 @@ mod tests {
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn open_state_sqlite_recovers_malformed_database_on_startup() {
|
||||
let codex_home = unique_temp_dir();
|
||||
tokio::fs::create_dir_all(&codex_home)
|
||||
.await
|
||||
.expect("create codex home");
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let pool = SqlitePool::connect_with(
|
||||
SqliteConnectOptions::new()
|
||||
.filename(&state_path)
|
||||
.create_if_missing(true),
|
||||
)
|
||||
.await
|
||||
.expect("open state db");
|
||||
STATE_MIGRATOR
|
||||
.run(&pool)
|
||||
.await
|
||||
.expect("apply current state schema");
|
||||
let thread_id = "00000000-0000-0000-0000-000000000123";
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO threads (
|
||||
id,
|
||||
rollout_path,
|
||||
created_at,
|
||||
updated_at,
|
||||
source,
|
||||
model_provider,
|
||||
cwd,
|
||||
title,
|
||||
sandbox_policy,
|
||||
approval_mode
|
||||
) VALUES (?, ?, 1, 1, 'cli', 'test-provider', ?, 'startup recovery', 'read-only', 'on-request')
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id)
|
||||
.bind(codex_home.join("session.jsonl").display().to_string())
|
||||
.bind(codex_home.as_path().display().to_string())
|
||||
.execute(&pool)
|
||||
.await
|
||||
.expect("insert thread");
|
||||
let page_size: i64 = sqlx::query_scalar("PRAGMA page_size")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("read page size");
|
||||
let migration_root_page: i64 = sqlx::query_scalar(
|
||||
"SELECT rootpage FROM sqlite_schema WHERE name = '_sqlx_migrations'",
|
||||
)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("read migration root page");
|
||||
pool.close().await;
|
||||
corrupt_page(
|
||||
state_path.as_path(),
|
||||
page_size.try_into().expect("page size should fit u64"),
|
||||
migration_root_page
|
||||
.try_into()
|
||||
.expect("root page should fit u64"),
|
||||
)
|
||||
.expect("corrupt migration table page");
|
||||
|
||||
let telemetry = TestTelemetry::default();
|
||||
let tolerant_migrator = runtime_state_migrator();
|
||||
let recovered_pool =
|
||||
open_state_sqlite(state_path.as_path(), &tolerant_migrator, Some(&telemetry))
|
||||
.await
|
||||
.expect("startup should recover malformed state db");
|
||||
let title: String = sqlx::query_scalar("SELECT title FROM threads WHERE id = ?")
|
||||
.bind(thread_id)
|
||||
.fetch_one(&recovered_pool)
|
||||
.await
|
||||
.expect("recovered thread should exist");
|
||||
recovered_pool.close().await;
|
||||
|
||||
assert_eq!(title, "startup recovery");
|
||||
let integrity = sqlite_integrity_check(state_path.as_path())
|
||||
.await
|
||||
.expect("integrity check should run");
|
||||
assert_eq!(integrity, vec!["ok".to_string()]);
|
||||
let recovery_event = telemetry
|
||||
.counters()
|
||||
.into_iter()
|
||||
.find(|event| event.name == DB_RECOVERY_METRIC)
|
||||
.expect("recovery metric should be recorded");
|
||||
assert_eq!(
|
||||
recovery_event.tags,
|
||||
BTreeMap::from([
|
||||
("db".to_string(), "state".to_string()),
|
||||
("error".to_string(), "none".to_string()),
|
||||
("status".to_string(), "success".to_string()),
|
||||
("trigger_error".to_string(), "corrupt".to_string()),
|
||||
])
|
||||
);
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_records_successful_sqlite_init_phases_to_explicit_telemetry() {
|
||||
let codex_home = unique_temp_dir();
|
||||
@@ -700,4 +801,14 @@ mod tests {
|
||||
runtime.logs_pool.close().await;
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
fn corrupt_page(path: &Path, page_size: u64, page_number: u64) -> std::io::Result<()> {
|
||||
let offset = page_size
|
||||
.checked_mul(page_number.saturating_sub(1))
|
||||
.ok_or_else(|| std::io::Error::other("corrupt page offset overflowed"))?;
|
||||
let mut file = OpenOptions::new().write(true).open(path)?;
|
||||
file.seek(SeekFrom::Start(offset))?;
|
||||
file.write_all(&[0; 16])?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use super::RuntimeDbSpec;
|
||||
use crate::telemetry::DbTelemetry;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use log::LevelFilter;
|
||||
use sqlx::AssertSqlSafe;
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::Row;
|
||||
use sqlx::SqlitePool;
|
||||
@@ -11,9 +13,14 @@ use sqlx::sqlite::SqlitePoolOptions;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeSet;
|
||||
use std::ffi::OsString;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::fs::TryLockError;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use tracing::warn;
|
||||
@@ -23,6 +30,8 @@ mod recover_api;
|
||||
|
||||
const SQLITE_CORRUPT: i32 = 11;
|
||||
const SQLITE_NOTADB: i32 = 26;
|
||||
const RECOVERY_LOCK_POLL: Duration = Duration::from_millis(100);
|
||||
const RECOVERY_LOCK_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RecoveryPaths {
|
||||
@@ -30,6 +39,11 @@ struct RecoveryPaths {
|
||||
backup_paths: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
struct RecoveryLock {
|
||||
_file: File,
|
||||
waited: bool,
|
||||
}
|
||||
|
||||
pub(super) fn is_malformed_sqlite_error(err: &anyhow::Error) -> bool {
|
||||
// Prefer SQLite result codes, but keep a message fallback for migration
|
||||
// wrappers that stringify the underlying database error.
|
||||
@@ -51,7 +65,42 @@ pub(super) async fn recover_database(
|
||||
spec: RuntimeDbSpec,
|
||||
migrator: &Migrator,
|
||||
original_error: &anyhow::Error,
|
||||
telemetry_override: Option<&dyn DbTelemetry>,
|
||||
) -> Result<()> {
|
||||
let started = Instant::now();
|
||||
let result = recover_database_inner(path, spec, migrator, original_error).await;
|
||||
crate::telemetry::record_recovery_result(
|
||||
telemetry_override,
|
||||
spec.kind,
|
||||
started.elapsed(),
|
||||
original_error,
|
||||
&result,
|
||||
);
|
||||
result
|
||||
}
|
||||
|
||||
async fn recover_database_inner(
|
||||
path: &Path,
|
||||
spec: RuntimeDbSpec,
|
||||
migrator: &Migrator,
|
||||
original_error: &anyhow::Error,
|
||||
) -> Result<()> {
|
||||
let recovery_lock = acquire_recovery_lock(path).await.with_context(|| {
|
||||
format!(
|
||||
"failed to lock automatic recovery for {} at {}",
|
||||
spec.label,
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
if recovery_lock.waited && database_is_healthy(path, migrator).await {
|
||||
warn!(
|
||||
"{} at {} was usable after waiting for the recovery lock; skipping duplicate recovery",
|
||||
spec.label,
|
||||
path.display()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let recovery = prepare_recovery_paths(path).await.with_context(|| {
|
||||
format!(
|
||||
"failed to prepare automatic recovery for {} at {}",
|
||||
@@ -123,6 +172,71 @@ fn print_status(message: String) {
|
||||
eprintln!("{message}");
|
||||
}
|
||||
|
||||
async fn acquire_recovery_lock(path: &Path) -> Result<RecoveryLock> {
|
||||
let lock_path = recovery_lock_path(path);
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(lock_path.as_path())
|
||||
.with_context(|| format!("failed to open {}", lock_path.display()))?;
|
||||
let started = Instant::now();
|
||||
let mut waited = false;
|
||||
loop {
|
||||
match file.try_lock() {
|
||||
Ok(()) => {
|
||||
return Ok(RecoveryLock {
|
||||
_file: file,
|
||||
waited,
|
||||
});
|
||||
}
|
||||
Err(TryLockError::WouldBlock) if started.elapsed() < RECOVERY_LOCK_TIMEOUT => {
|
||||
waited = true;
|
||||
thread::sleep(RECOVERY_LOCK_POLL);
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
anyhow::bail!(
|
||||
"timed out waiting for another Codex process to finish recovering {}",
|
||||
lock_path.display()
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(std::io::Error::from(err)).with_context(|| {
|
||||
format!("failed to lock recovery file {}", lock_path.display())
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context("recovery lock task panicked")?
|
||||
}
|
||||
|
||||
fn recovery_lock_path(path: &Path) -> PathBuf {
|
||||
let mut lock_path = OsString::from(path.as_os_str());
|
||||
lock_path.push(".codex-recovery.lock");
|
||||
PathBuf::from(lock_path)
|
||||
}
|
||||
|
||||
async fn database_is_healthy(path: &Path, migrator: &Migrator) -> bool {
|
||||
let Ok(pool) = open_recovered_pool(path).await else {
|
||||
return false;
|
||||
};
|
||||
let result = async {
|
||||
// Check integrity before migrations so a still-corrupt database is not
|
||||
// modified just because another process held the recovery lock first.
|
||||
assert_integrity_ok(&pool).await?;
|
||||
migrator.run(&pool).await?;
|
||||
assert_integrity_ok(&pool).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
.await;
|
||||
pool.close().await;
|
||||
result.is_ok()
|
||||
}
|
||||
|
||||
fn sqlx_error_is_malformed(err: &sqlx::Error) -> bool {
|
||||
match err {
|
||||
sqlx::Error::Database(database_error) => {
|
||||
@@ -212,6 +326,19 @@ async fn run_recovery(path: &Path, recovered_path: &Path, migrator: &Migrator) -
|
||||
assert_integrity_ok(&pool).await?;
|
||||
match migrator.run(&pool).await {
|
||||
Ok(()) => {
|
||||
if let Err(err) =
|
||||
assert_expected_schema(&pool, recovered_path.as_path(), migrator).await
|
||||
{
|
||||
pool.close().await;
|
||||
rebuild_recovered_database(recovered_path.as_path(), migrator)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to normalize recovered database after schema validation failure: {err}"
|
||||
)
|
||||
})?;
|
||||
return Ok(());
|
||||
}
|
||||
assert_integrity_ok(&pool).await?;
|
||||
pool.close().await;
|
||||
}
|
||||
@@ -338,7 +465,8 @@ ORDER BY name
|
||||
"#,
|
||||
quote_identifier(schema)
|
||||
);
|
||||
let rows = sqlx::query_scalar::<_, String>(sql.as_str())
|
||||
// Dynamic identifiers are quoted with quote_identifier before interpolation.
|
||||
let rows = sqlx::query_scalar::<_, String>(AssertSqlSafe(sql))
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(rows.into_iter().collect())
|
||||
@@ -365,14 +493,16 @@ async fn copy_current_schema_table(pool: &SqlitePool, table: &str) -> Result<boo
|
||||
let sql = format!(
|
||||
"INSERT OR REPLACE INTO main.{table_name} ({column_list}) SELECT {column_list} FROM recovered.{table_name}"
|
||||
);
|
||||
sqlx::query(sql.as_str()).execute(pool).await?;
|
||||
// Dynamic identifiers are quoted with quote_identifier before interpolation.
|
||||
sqlx::query(AssertSqlSafe(sql)).execute(pool).await?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn copy_extra_recovered_table(pool: &SqlitePool, table: &str) -> Result<()> {
|
||||
let table_name = quote_identifier(table);
|
||||
let sql = format!("CREATE TABLE main.{table_name} AS SELECT * FROM recovered.{table_name}");
|
||||
sqlx::query(sql.as_str()).execute(pool).await?;
|
||||
// Dynamic identifiers are quoted with quote_identifier before interpolation.
|
||||
sqlx::query(AssertSqlSafe(sql)).execute(pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -381,7 +511,8 @@ async fn table_columns(pool: &SqlitePool, schema: &str, table: &str) -> Result<V
|
||||
"SELECT name FROM {}.pragma_table_xinfo(?) WHERE hidden = 0 ORDER BY cid",
|
||||
quote_identifier(schema)
|
||||
);
|
||||
Ok(sqlx::query_scalar::<_, String>(sql.as_str())
|
||||
// Dynamic identifiers are quoted with quote_identifier before interpolation.
|
||||
Ok(sqlx::query_scalar::<_, String>(AssertSqlSafe(sql))
|
||||
.bind(table)
|
||||
.fetch_all(pool)
|
||||
.await?)
|
||||
@@ -412,6 +543,39 @@ WHERE type = 'table'
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_expected_schema(
|
||||
pool: &SqlitePool,
|
||||
recovered_path: &Path,
|
||||
migrator: &Migrator,
|
||||
) -> Result<()> {
|
||||
let expected_path =
|
||||
unique_sibling_path(recovered_path, "codex-recovery-schema-check", "sqlite").await?;
|
||||
let expected_pool = match open_migrated_pool(expected_path.as_path(), migrator).await {
|
||||
Ok(pool) => pool,
|
||||
Err(err) => {
|
||||
cleanup_sqlite_files(expected_path.as_path()).await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let expected_tables = user_tables(&expected_pool, "main").await;
|
||||
expected_pool.close().await;
|
||||
cleanup_sqlite_files(expected_path.as_path()).await;
|
||||
|
||||
let expected_tables = expected_tables?;
|
||||
let actual_tables = user_tables(pool, "main").await?;
|
||||
let missing_tables = expected_tables
|
||||
.difference(&actual_tables)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
if !missing_tables.is_empty() {
|
||||
anyhow::bail!(
|
||||
"recovered database is missing expected tables: {}",
|
||||
missing_tables.join(", ")
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_integrity_ok(pool: &SqlitePool) -> Result<()> {
|
||||
let rows = sqlx::query_scalar::<_, String>("PRAGMA integrity_check")
|
||||
.fetch_all(pool)
|
||||
@@ -425,6 +589,12 @@ async fn assert_integrity_ok(pool: &SqlitePool) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup_sqlite_files(path: &Path) {
|
||||
for sqlite_path in sqlite_paths(path) {
|
||||
let _ = tokio::fs::remove_file(sqlite_path).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn replace_with_recovered_database(path: &Path, recovered_path: &Path) -> Result<()> {
|
||||
// Stale WAL files belong to the damaged database and must not be replayed
|
||||
// against the recovered replacement.
|
||||
@@ -552,6 +722,9 @@ mod tests {
|
||||
tokio::fs::create_dir_all(temp_dir.as_path()).await?;
|
||||
let db_path = temp_dir.join("sample.sqlite");
|
||||
create_sample_db(db_path.as_path()).await?;
|
||||
let sidecars = sqlite_sidecar_paths(db_path.as_path());
|
||||
tokio::fs::write(sidecars[0].as_path(), b"stale wal").await?;
|
||||
tokio::fs::write(sidecars[1].as_path(), b"stale shm").await?;
|
||||
corrupt_first_table_page(db_path.as_path())?;
|
||||
|
||||
let err = anyhow::anyhow!("database disk image is malformed");
|
||||
@@ -560,6 +733,7 @@ mod tests {
|
||||
super::super::STATE_DB,
|
||||
&Migrator::DEFAULT,
|
||||
&err,
|
||||
/*telemetry_override*/ None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -575,7 +749,57 @@ mod tests {
|
||||
})
|
||||
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".bak"))
|
||||
.count();
|
||||
assert_eq!(backup_count, 1);
|
||||
assert_eq!(backup_count, 3);
|
||||
assert!(!tokio::fs::try_exists(sidecars[0].as_path()).await?);
|
||||
assert!(!tokio::fs::try_exists(sidecars[1].as_path()).await?);
|
||||
let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recovery_skips_when_database_is_already_healthy() -> Result<()> {
|
||||
let temp_dir = super::super::test_support::unique_temp_dir();
|
||||
tokio::fs::create_dir_all(temp_dir.as_path()).await?;
|
||||
let db_path = temp_dir.join("sample.sqlite");
|
||||
create_sample_db(db_path.as_path()).await?;
|
||||
let lock_path = recovery_lock_path(db_path.as_path());
|
||||
let lock_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(lock_path.as_path())?;
|
||||
lock_file.try_lock()?;
|
||||
|
||||
let db_path_for_task = db_path.clone();
|
||||
let recovery_task = tokio::spawn(async move {
|
||||
let err = anyhow::anyhow!("database disk image is malformed");
|
||||
let migrator = Migrator::DEFAULT;
|
||||
recover_database(
|
||||
db_path_for_task.as_path(),
|
||||
super::super::STATE_DB,
|
||||
&migrator,
|
||||
&err,
|
||||
/*telemetry_override*/ None,
|
||||
)
|
||||
.await
|
||||
});
|
||||
tokio::time::sleep(RECOVERY_LOCK_POLL.saturating_mul(2)).await;
|
||||
drop(lock_file);
|
||||
recovery_task.await??;
|
||||
|
||||
let pool = open_recovered_pool(db_path.as_path()).await?;
|
||||
let values: Vec<String> = sqlx::query_scalar("SELECT value FROM sample ORDER BY id")
|
||||
.fetch_all(&pool)
|
||||
.await?;
|
||||
pool.close().await;
|
||||
let backup_count = std::fs::read_dir(temp_dir.as_path())?
|
||||
.filter_map(std::result::Result::ok)
|
||||
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".bak"))
|
||||
.count();
|
||||
|
||||
assert_eq!(values, vec!["one".to_string(), "two".to_string()]);
|
||||
assert_eq!(backup_count, 0);
|
||||
let _ = tokio::fs::remove_dir_all(temp_dir.as_path()).await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -634,7 +858,14 @@ INSERT INTO threads (
|
||||
)?;
|
||||
|
||||
let err = anyhow::anyhow!("database disk image is malformed");
|
||||
recover_database(db_path.as_path(), super::super::STATE_DB, &migrator, &err).await?;
|
||||
recover_database(
|
||||
db_path.as_path(),
|
||||
super::super::STATE_DB,
|
||||
&migrator,
|
||||
&err,
|
||||
/*telemetry_override*/ None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pool = open_recovered_pool(db_path.as_path()).await?;
|
||||
let title: String = sqlx::query_scalar("SELECT title FROM threads WHERE id = ?")
|
||||
@@ -717,7 +948,14 @@ INSERT INTO thread_dynamic_tools (
|
||||
)?;
|
||||
|
||||
let err = anyhow::anyhow!("file is not a database");
|
||||
recover_database(db_path.as_path(), super::super::STATE_DB, &migrator, &err).await?;
|
||||
recover_database(
|
||||
db_path.as_path(),
|
||||
super::super::STATE_DB,
|
||||
&migrator,
|
||||
&err,
|
||||
/*telemetry_override*/ None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pool = open_recovered_pool(db_path.as_path()).await?;
|
||||
let title: String = sqlx::query_scalar("SELECT title FROM threads WHERE id = ?")
|
||||
@@ -754,6 +992,7 @@ INSERT INTO thread_dynamic_tools (
|
||||
super::super::STATE_DB,
|
||||
&Migrator::DEFAULT,
|
||||
&err,
|
||||
/*telemetry_override*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("recovery without schema should fail");
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
//! Rebuild recovered SQLite tables when the schema root page is gone.
|
||||
//!
|
||||
//! If page 1 is destroyed, SQLite's recovery extension cannot discover the
|
||||
//! sqlite_schema root. It can still recover orphaned sqlite_schema rows into
|
||||
//! lost_and_found, so this module rebuilds tables from those rows and then
|
||||
//! copies matching lost_and_found record groups back into the recreated tables.
|
||||
|
||||
use super::quote_identifier;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use sqlx::AssertSqlSafe;
|
||||
use sqlx::Row;
|
||||
use sqlx::SqlitePool;
|
||||
use tracing::warn;
|
||||
|
||||
// If page 1 is destroyed, SQLite's recovery extension cannot discover the
|
||||
// sqlite_schema root. It can still recover orphaned sqlite_schema rows into
|
||||
// lost_and_found, so this module rebuilds tables from those rows and then
|
||||
// copies matching lost_and_found record groups back into the recreated tables.
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SchemaObject {
|
||||
object_type: String,
|
||||
@@ -50,7 +53,9 @@ pub(super) async fn rebuild_from_recovered_schema_if_needed(pool: &SqlitePool) -
|
||||
.await?;
|
||||
|
||||
for object in schema.iter().filter(|object| object.object_type == "table") {
|
||||
sqlx::query(object.sql.as_str())
|
||||
// Recovered sqlite_schema SQL is local database metadata, not
|
||||
// interpolated user input.
|
||||
sqlx::query(AssertSqlSafe(object.sql.clone()))
|
||||
.execute(pool)
|
||||
.await
|
||||
.with_context(|| format!("failed to recreate recovered table {}", object.name))?;
|
||||
@@ -62,7 +67,12 @@ pub(super) async fn rebuild_from_recovered_schema_if_needed(pool: &SqlitePool) -
|
||||
}
|
||||
|
||||
for object in schema.iter().filter(|object| object.object_type != "table") {
|
||||
if let Err(err) = sqlx::query(object.sql.as_str()).execute(pool).await {
|
||||
// Recovered sqlite_schema SQL is local database metadata, not
|
||||
// interpolated user input.
|
||||
if let Err(err) = sqlx::query(AssertSqlSafe(object.sql.clone()))
|
||||
.execute(pool)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"skipping recovered {} {} during lost_and_found rebuild: {err}",
|
||||
object.object_type, object.name
|
||||
@@ -181,7 +191,8 @@ async fn copy_lost_and_found_rows(
|
||||
column_names.join(", "),
|
||||
value_expressions.join(", ")
|
||||
);
|
||||
sqlx::query(sql.as_str())
|
||||
// Dynamic identifiers are quoted with quote_identifier before interpolation.
|
||||
sqlx::query(AssertSqlSafe(sql))
|
||||
.bind(table.rootpage)
|
||||
.execute(pool)
|
||||
.await
|
||||
|
||||
@@ -6,6 +6,8 @@ use std::time::Duration;
|
||||
use crate::DB_FALLBACK_METRIC;
|
||||
use crate::DB_INIT_DURATION_METRIC;
|
||||
use crate::DB_INIT_METRIC;
|
||||
use crate::DB_RECOVERY_DURATION_METRIC;
|
||||
use crate::DB_RECOVERY_METRIC;
|
||||
use tracing::debug;
|
||||
|
||||
/// Low-cardinality sink for SQLite startup and fallback telemetry.
|
||||
@@ -92,6 +94,24 @@ pub fn record_fallback(
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn record_recovery_result<T>(
|
||||
telemetry: Option<&dyn DbTelemetry>,
|
||||
db: DbKind,
|
||||
duration: Duration,
|
||||
trigger_error: &anyhow::Error,
|
||||
result: &anyhow::Result<T>,
|
||||
) {
|
||||
let outcome = DbOutcomeTags::from_result(result);
|
||||
let tags = [
|
||||
("status", outcome.status),
|
||||
("db", db.as_str()),
|
||||
("error", outcome.error),
|
||||
("trigger_error", classify_error(trigger_error)),
|
||||
];
|
||||
record_counter(telemetry, DB_RECOVERY_METRIC, &tags);
|
||||
record_duration(telemetry, DB_RECOVERY_DURATION_METRIC, duration, &tags);
|
||||
}
|
||||
|
||||
fn record_counter(telemetry: Option<&dyn DbTelemetry>, name: &str, tags: &[(&str, &str)]) {
|
||||
if let Some(telemetry) = resolve_telemetry(telemetry) {
|
||||
telemetry.counter(name, /*inc*/ 1, tags);
|
||||
@@ -138,6 +158,9 @@ fn classify_error(err: &anyhow::Error) -> &'static str {
|
||||
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>() {
|
||||
return classify_sqlx_error(sqlx_err);
|
||||
}
|
||||
if error_message_is_corrupt(cause.to_string().as_str()) {
|
||||
return "corrupt";
|
||||
}
|
||||
if cause
|
||||
.downcast_ref::<sqlx::migrate::MigrateError>()
|
||||
.is_some()
|
||||
@@ -189,6 +212,15 @@ fn classify_sqlite_code(code: &str) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn error_message_is_corrupt(message: &str) -> bool {
|
||||
let message = message.to_ascii_lowercase();
|
||||
message.contains("database disk image is malformed")
|
||||
|| message.contains("file is not a database")
|
||||
|| message.contains("database schema is malformed")
|
||||
|| message.contains("database is corrupt")
|
||||
|| (message.contains("database") && message.contains("malformed"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
18
codex-rs/state/vendor/sqlite-recover/README.md
vendored
18
codex-rs/state/vendor/sqlite-recover/README.md
vendored
@@ -1,14 +1,22 @@
|
||||
Vendored from upstream SQLite Fossil trunk:
|
||||
Minimal SQLite recovery sources vendored from upstream SQLite:
|
||||
|
||||
- `ext/recover/sqlite3recover.c`
|
||||
- `ext/recover/sqlite3recover.h`
|
||||
- `ext/recover/dbdata.c`
|
||||
|
||||
`sqlite3.h` is a comment-stripped copy from the `libsqlite3-sys` 0.30.1
|
||||
bundled SQLite source so Cargo and Bazel compile these extension files with
|
||||
matching public SQLite declarations without adding a build-dependency that
|
||||
would compile SQLite a second time.
|
||||
`sqlite3.h` is a comment-stripped copy from the `libsqlite3-sys` 0.37.0
|
||||
bundled SQLite 3.51.3 source (`SQLITE_SOURCE_ID`
|
||||
`2026-03-13 10:38:09 737ae4a34738ffa0c3ff7f9bb18df914dd1cad163f28fd6b6e114a344fe6d618`)
|
||||
so Cargo and Bazel compile these extension files with matching public SQLite
|
||||
declarations without adding a build-dependency that would compile SQLite a
|
||||
second time.
|
||||
|
||||
These files implement SQLite's recover extension without invoking the
|
||||
`sqlite3` command-line shell. They are compiled into `codex-state` and link
|
||||
against the same `libsqlite3-sys` library that SQLx uses.
|
||||
|
||||
The recovery API is not exposed by `libsqlite3-sys`, and no published Rust
|
||||
crate in the dependency graph provides this extension without also building a
|
||||
second SQLite copy. When updating `libsqlite3-sys`, refresh these files from
|
||||
the matching SQLite source snapshot and keep the vendored set limited to the
|
||||
recover/dbdata extension sources plus the trimmed public header.
|
||||
|
||||
@@ -35,9 +35,9 @@ extern "C" {
|
||||
#ifdef SQLITE_VERSION_NUMBER
|
||||
# undef SQLITE_VERSION_NUMBER
|
||||
#endif
|
||||
#define SQLITE_VERSION "3.46.0"
|
||||
#define SQLITE_VERSION_NUMBER 3046000
|
||||
#define SQLITE_SOURCE_ID "2024-05-23 13:25:27 96c92aba00c8375bc32fafcdf12429c58bd8aabfcadab6683e35bbb9cdebf19e"
|
||||
#define SQLITE_VERSION "3.51.3"
|
||||
#define SQLITE_VERSION_NUMBER 3051003
|
||||
#define SQLITE_SOURCE_ID "2026-03-13 10:38:09 737ae4a34738ffa0c3ff7f9bb18df914dd1cad163f28fd6b6e114a344fe6d618"
|
||||
SQLITE_API SQLITE_EXTERN const char sqlite3_version[];
|
||||
SQLITE_API const char *sqlite3_libversion(void);
|
||||
SQLITE_API const char *sqlite3_sourceid(void);
|
||||
|
||||
Reference in New Issue
Block a user