feat: repair DB in case of missing lines (#10751)

This commit is contained in:
jif-oai
2026-02-05 16:21:49 +00:00
committed by GitHub
parent 41f3b1ba0b
commit 901215e310
4 changed files with 114 additions and 4 deletions

View File

@@ -1088,9 +1088,10 @@ async fn find_thread_path_by_id_str_in_subdir(
ARCHIVED_SESSIONS_SUBDIR => Some(true),
_ => None,
};
let thread_id = ThreadId::from_string(id_str).ok();
let state_db_ctx = state_db::open_if_present(codex_home, "").await;
if let Some(state_db_ctx) = state_db_ctx.as_deref()
&& let Ok(thread_id) = ThreadId::from_string(id_str)
&& let Some(thread_id) = thread_id
&& let Some(db_path) = state_db::find_rollout_path_by_id(
Some(state_db_ctx),
thread_id,
@@ -1128,9 +1129,16 @@ async fn find_thread_path_by_id_str_in_subdir(
.map_err(|e| io::Error::other(format!("file search failed: {e}")))?;
let found = results.matches.into_iter().next().map(|m| m.full_path());
if found.is_some() {
if let Some(found_path) = found.as_ref() {
tracing::error!("state db missing rollout path for thread {id_str}");
state_db::record_discrepancy("find_thread_path_by_id_str_in_subdir", "falling_back");
state_db::read_repair_rollout_path(
state_db_ctx.as_deref(),
thread_id,
archived_only,
found_path.as_path(),
)
.await;
}
Ok(found)

View File

@@ -568,6 +568,7 @@ async fn rollout_writer(
default_provider.as_str(),
state_builder.as_ref(),
std::slice::from_ref(&rollout_item),
None,
)
.await;
}

View File

@@ -250,7 +250,31 @@ async fn find_thread_path_falls_back_when_db_path_is_stale() {
let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string())
.await
.expect("lookup should succeed");
assert_eq!(found, Some(fs_rollout_path));
assert_eq!(found, Some(fs_rollout_path.clone()));
assert_state_db_rollout_path(home, thread_id, Some(fs_rollout_path.as_path())).await;
}
#[tokio::test]
async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let uuid = Uuid::from_u128(303);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let ts = "2025-01-03T13-00-00";
write_session_file(home, ts, uuid, 1, Some(SessionSource::Cli)).unwrap();
let fs_rollout_path = home.join(format!("sessions/2025/01/03/rollout-{ts}-{uuid}.jsonl"));
// Create an empty state DB so lookup takes the DB-first path and then falls back to files.
let _runtime =
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string())
.await
.expect("lookup should succeed");
assert_eq!(found, Some(fs_rollout_path.clone()));
assert_state_db_rollout_path(home, thread_id, Some(fs_rollout_path.as_path())).await;
}
#[test]
@@ -263,6 +287,22 @@ fn rollout_date_parts_extracts_directory_components() {
);
}
async fn assert_state_db_rollout_path(
home: &Path,
thread_id: ThreadId,
expected_path: Option<&Path>,
) {
let runtime =
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
let path = runtime
.find_rollout_path_by_id(thread_id, Some(false))
.await
.expect("state db lookup should succeed");
assert_eq!(path.as_deref(), expected_path);
}
fn write_session_file(
root: &Path,
ts_str: &str,

View File

@@ -349,6 +349,7 @@ pub async fn reconcile_rollout(
default_provider: &str,
builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
archived_only: Option<bool>,
) {
let Some(ctx) = context else {
return;
@@ -376,7 +377,17 @@ pub async fn reconcile_rollout(
return;
}
};
if let Err(err) = ctx.upsert_thread(&outcome.metadata).await {
let mut metadata = outcome.metadata;
match archived_only {
Some(true) if metadata.archived_at.is_none() => {
metadata.archived_at = Some(metadata.updated_at);
}
Some(false) => {
metadata.archived_at = None;
}
Some(true) | None => {}
}
if let Err(err) = ctx.upsert_thread(&metadata).await {
warn!(
"state db reconcile_rollout upsert failed {}: {err}",
rollout_path.display()
@@ -399,6 +410,56 @@ pub async fn reconcile_rollout(
}
}
/// Repair a thread's rollout path after filesystem fallback succeeds.
pub async fn read_repair_rollout_path(
context: Option<&codex_state::StateRuntime>,
thread_id: Option<ThreadId>,
archived_only: Option<bool>,
rollout_path: &Path,
) {
let Some(ctx) = context else {
return;
};
if let Some(thread_id) = thread_id
&& let Ok(Some(mut metadata)) = ctx.get_thread(thread_id).await
{
metadata.rollout_path = rollout_path.to_path_buf();
match archived_only {
Some(true) if metadata.archived_at.is_none() => {
metadata.archived_at = Some(metadata.updated_at);
}
Some(false) => {
metadata.archived_at = None;
}
Some(true) | None => {}
}
if let Err(err) = ctx.upsert_thread(&metadata).await {
warn!(
"state db read-repair upsert failed for {}: {err}",
rollout_path.display()
);
} else {
return;
}
}
let default_provider = crate::rollout::list::read_session_meta_line(rollout_path)
.await
.ok()
.and_then(|meta| meta.meta.model_provider)
.unwrap_or_default();
reconcile_rollout(
Some(ctx),
rollout_path,
default_provider.as_str(),
None,
&[],
archived_only,
)
.await;
}
/// Apply rollout items incrementally to SQLite.
pub async fn apply_rollout_items(
context: Option<&codex_state::StateRuntime>,