diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index b800d21128..49ec31cca5 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2248,6 +2248,7 @@ dependencies = [ "codex-protocol", "codex-responses-api-proxy", "codex-rmcp-client", + "codex-rollout", "codex-rollout-trace", "codex-sandboxing", "codex-state", diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index 1740a9e51c..1a1b852667 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -49,6 +49,7 @@ codex-plugin = { workspace = true } codex-protocol = { workspace = true } codex-responses-api-proxy = { workspace = true } codex-rmcp-client = { workspace = true } +codex-rollout = { workspace = true } codex-rollout-trace = { workspace = true } codex-sandboxing = { workspace = true } codex-state = { workspace = true } diff --git a/codex-rs/cli/src/doctor.rs b/codex-rs/cli/src/doctor.rs index 48a28c50d4..d2422d05de 100644 --- a/codex-rs/cli/src/doctor.rs +++ b/codex-rs/cli/src/doctor.rs @@ -71,6 +71,7 @@ mod output; mod progress; mod runtime; mod system; +mod thread_inventory; mod title; mod updates; @@ -84,6 +85,7 @@ use progress::doctor_progress; use runtime::runtime_check; use runtime::search_check; use system::system_check; +use thread_inventory::thread_inventory_check; use title::terminal_title_check; use updates::updates_check; @@ -362,6 +364,7 @@ async fn build_report( git_check, terminal_title_check, state_check, + thread_inventory_check, background_server_check, reachability_check, ) = tokio::join!( @@ -392,6 +395,11 @@ async fn build_report( }) }, run_async_check("state", progress.clone(), state_check(config)), + run_async_check( + "thread inventory", + progress.clone(), + thread_inventory_check(config), + ), run_async_check( "app-server", progress.clone(), @@ -415,6 +423,7 @@ async fn build_report( git_check, terminal_title_check, state_check, + thread_inventory_check, background_server_check, reachability_check, ]); diff --git a/codex-rs/cli/src/doctor/output.rs b/codex-rs/cli/src/doctor/output.rs index 2734bd16fc..ec09a4d69f 100644 --- a/codex-rs/cli/src/doctor/output.rs +++ b/codex-rs/cli/src/doctor/output.rs @@ -27,6 +27,7 @@ const GROUPS: &[OutputGroup] = &[ title: "Environment", keys: &[ "system", "runtime", "install", "search", "git", "terminal", "title", "state", + "threads", ], }, OutputGroup { @@ -1340,6 +1341,28 @@ Run codex doctor without --summary for detailed diagnostics. assert_eq!(rendered, expected); } + #[test] + fn render_human_report_includes_threads_row_in_environment() { + let mut report = sample_report(); + report.checks.push(DoctorCheck::new( + "state.rollout_db_parity", + "threads", + CheckStatus::Warning, + "rollout files and state DB thread inventory differ", + )); + + let rendered = render_human_report(&report, summary_no_color_unicode_options()); + + let threads_line = rendered + .lines() + .find(|line| line.contains("threads")) + .expect("threads row should be rendered"); + assert!( + threads_line.contains("rollout files and state DB thread inventory differ"), + "{threads_line}" + ); + } + #[test] fn render_human_report_supports_ascii_output() { let rendered = render_human_report( diff --git a/codex-rs/cli/src/doctor/thread_inventory.rs b/codex-rs/cli/src/doctor/thread_inventory.rs new file mode 100644 index 0000000000..9a8eef6ee4 --- /dev/null +++ b/codex-rs/cli/src/doctor/thread_inventory.rs @@ -0,0 +1,921 @@ +//! Doctor check that compares rollout files against the SQLite thread inventory. + +use super::CheckStatus; +use super::Config; +use super::DoctorCheck; +use super::DoctorIssue; +use codex_protocol::protocol::InternalSessionSource; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use codex_rollout::RolloutRecorder; +use codex_state::ThreadStateAuditRow; +use codex_utils_path::normalize_for_path_comparison; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::HashSet; +use std::ffi::OsStr; +use std::path::Path; +use std::path::PathBuf; + +const MAX_PARITY_SCAN_FILES: usize = 10_000; +const SAMPLE_LIMIT: usize = 5; +const SUMMARY_LIMIT: usize = 8; +const CHECK_ID: &str = "state.rollout_db_parity"; +const CHECK_CATEGORY: &str = "threads"; + +#[derive(Clone, Debug)] +struct RolloutAuditFile { + path: PathBuf, + key: PathBuf, + archived: bool, + thread_id: String, +} + +#[derive(Default)] +struct RolloutScan { + files: Vec, + scan_errors: Vec, + malformed_names: Vec, + reached_scan_cap: bool, +} + +enum RolloutThreadId { + Id(String), + MalformedName, + Unusable(String), +} + +impl RolloutScan { + fn candidate_count(&self) -> usize { + self.files.len() + self.malformed_names.len() + self.scan_errors.len() + } + + fn reached_candidate_cap(&self) -> bool { + self.candidate_count() >= MAX_PARITY_SCAN_FILES + } + + fn record_malformed_name(&mut self, path: PathBuf) { + if self.reached_candidate_cap() { + self.reached_scan_cap = true; + return; + } + self.malformed_names.push(path); + self.reached_scan_cap = self.reached_candidate_cap(); + } + + fn record_scan_error(&mut self, message: String) { + if self.reached_candidate_cap() { + self.reached_scan_cap = true; + return; + } + self.scan_errors.push(message); + self.reached_scan_cap = self.reached_candidate_cap(); + } + + fn active_count(&self) -> usize { + self.files.iter().filter(|file| !file.archived).count() + } + + fn archived_count(&self) -> usize { + self.files.iter().filter(|file| file.archived).count() + } +} + +pub(super) async fn thread_inventory_check(config: &Config) -> DoctorCheck { + thread_inventory_check_for_roots( + config.codex_home.as_path(), + config.sqlite_home.as_path(), + config.model_provider_id.as_str(), + ) + .await +} + +async fn thread_inventory_check_for_roots( + codex_home: &Path, + sqlite_home: &Path, + default_provider: &str, +) -> DoctorCheck { + let scan = scan_rollout_files(codex_home).await; + let state_db_path = codex_state::state_db_path(sqlite_home); + + let mut details = vec![ + format!("default model provider: {default_provider}"), + format!("rollout DB active files: {}", scan.active_count()), + format!("rollout DB archived files: {}", scan.archived_count()), + format!("rollout DB scan errors: {}", scan.scan_errors.len()), + format!( + "rollout DB malformed file names: {}", + scan.malformed_names.len() + ), + format!("rollout DB scan cap reached: {}", scan.reached_scan_cap), + ]; + push_samples( + &mut details, + "rollout DB scan error sample", + scan.scan_errors.iter().map(String::as_str), + ); + push_samples( + &mut details, + "rollout DB malformed file sample", + scan.malformed_names + .iter() + .map(|path| path.display().to_string()), + ); + + if !state_db_path.is_file() { + details.push("rollout DB rows: skipped (state DB missing)".to_string()); + return missing_state_db_check(scan, details); + } + + let rows = match codex_state::read_thread_state_audit_rows(&state_db_path).await { + Ok(rows) => rows, + Err(err) => { + details.push(format!("rollout DB read error: {err}")); + return DoctorCheck::new( + CHECK_ID, + CHECK_CATEGORY, + CheckStatus::Warning, + "state database thread inventory could not be read", + ) + .details(details) + .issue( + DoctorIssue::new( + CheckStatus::Warning, + "state DB thread rows could not be queried", + ) + .measured(err.to_string()) + .expected("readable threads table"), + ); + } + }; + + parity_check_from_scan_and_rows(codex_home, scan, rows, details) +} + +fn missing_state_db_check(scan: RolloutScan, details: Vec) -> DoctorCheck { + if scan.files.is_empty() + && scan.scan_errors.is_empty() + && scan.malformed_names.is_empty() + && !scan.reached_scan_cap + { + return DoctorCheck::new( + CHECK_ID, + CHECK_CATEGORY, + CheckStatus::Ok, + "no rollout/state DB inventory to compare", + ) + .details(details); + } + + let summary = if scan.files.is_empty() { + "rollout scan was incomplete or found bad files" + } else { + "state DB is missing while rollout files exist" + }; + let mut check = + DoctorCheck::new(CHECK_ID, CHECK_CATEGORY, CheckStatus::Warning, summary).details(details); + + if !scan.files.is_empty() { + check = check + .issue( + DoctorIssue::new( + CheckStatus::Warning, + "rollout files exist but the state DB is missing", + ) + .measured(format!("{} rollout files", scan.files.len())) + .expected("state DB contains matching thread rows") + .remedy("Start Codex with no state DB present so startup backfill can create it from rollout files."), + ) + .remediation( + "Start Codex with no state DB present so startup backfill can create it from rollout files.", + ); + } + if !scan.scan_errors.is_empty() || !scan.malformed_names.is_empty() || scan.reached_scan_cap { + check = check.issue( + DoctorIssue::new( + CheckStatus::Warning, + "rollout scan was incomplete or found bad files", + ) + .measured(format!( + "{} scan errors, {} malformed names, scan cap reached: {}", + scan.scan_errors.len(), + scan.malformed_names.len(), + scan.reached_scan_cap + )) + .expected("rollout directories are fully scannable") + .remedy("Check file permissions and unexpected files under CODEX_HOME sessions."), + ); + } + check +} + +fn parity_check_from_scan_and_rows( + codex_home: &Path, + scan: RolloutScan, + rows: Vec, + mut details: Vec, +) -> DoctorCheck { + let rollout_by_key = scan + .files + .iter() + .map(|file| (file.key.clone(), file)) + .collect::>(); + let mut rows_by_key: HashMap> = HashMap::new(); + for row in &rows { + rows_by_key + .entry(path_key(&row.rollout_path)) + .or_default() + .push(row); + } + + let missing_active = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ false); + let missing_archived = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ true); + let scan_complete = !scan.reached_scan_cap; + let stale_rows = if scan_complete { + rows.iter() + .filter(|row| !row.rollout_path.is_file()) + .collect::>() + } else { + Vec::new() + }; + let archive_mismatches = if scan_complete { + rows.iter() + .filter_map(|row| { + let expected_archived = rollout_by_key + .get(&path_key(&row.rollout_path)) + .map(|file| file.archived) + .or_else(|| { + row.rollout_path + .is_file() + .then(|| archived_from_rollout_path(codex_home, &row.rollout_path)) + .flatten() + })?; + (expected_archived != row.archived).then_some(row) + }) + .collect::>() + } else { + Vec::new() + }; + let duplicate_rollout_thread_ids = duplicate_rollout_thread_ids(&scan.files); + let duplicate_db_paths = duplicate_db_paths(&rows_by_key); + let archived_rows = rows.iter().filter(|row| row.archived).count(); + let active_rows = rows.len() - archived_rows; + + details.extend([ + format!("rollout DB rows: {}", rows.len()), + format!("rollout DB active rows: {active_rows}"), + format!("rollout DB archived rows: {archived_rows}"), + format!("rollout DB missing active rows: {}", missing_active.len()), + format!( + "rollout DB missing archived rows: {}", + missing_archived.len() + ), + format!( + "rollout DB stale rows: {}", + count_or_skipped(stale_rows.len(), scan_complete) + ), + format!( + "rollout DB archive mismatches: {}", + count_or_skipped(archive_mismatches.len(), scan_complete) + ), + format!( + "rollout DB duplicate rollout thread ids: {}", + duplicate_rollout_thread_ids.len() + ), + format!( + "rollout DB duplicate DB paths: {}", + duplicate_db_paths.len() + ), + format!( + "rollout DB model providers: {}", + count_summary(rows.iter().map(|row| row.model_provider.as_str())) + ), + format!( + "rollout DB sources: {}", + count_summary(rows.iter().map(|row| source_category(&row.source))) + ), + ]); + push_path_samples( + &mut details, + "rollout DB missing active sample", + missing_active.iter().copied(), + ); + push_path_samples( + &mut details, + "rollout DB missing archived sample", + missing_archived.iter().copied(), + ); + push_path_samples( + &mut details, + "rollout DB stale row sample", + stale_rows.iter().map(|row| row.rollout_path.as_path()), + ); + push_path_samples( + &mut details, + "rollout DB archive mismatch sample", + archive_mismatches + .iter() + .map(|row| row.rollout_path.as_path()), + ); + push_samples( + &mut details, + "rollout DB duplicate rollout thread id sample", + duplicate_rollout_thread_ids.iter().map(String::as_str), + ); + push_path_samples( + &mut details, + "rollout DB duplicate DB path sample", + duplicate_db_paths.iter().map(PathBuf::as_path), + ); + + let status = if scan.scan_errors.is_empty() + && scan.malformed_names.is_empty() + && !scan.reached_scan_cap + && missing_active.is_empty() + && missing_archived.is_empty() + && stale_rows.is_empty() + && archive_mismatches.is_empty() + && duplicate_rollout_thread_ids.is_empty() + && duplicate_db_paths.is_empty() + { + CheckStatus::Ok + } else { + CheckStatus::Warning + }; + + let summary = if status == CheckStatus::Ok { + "rollout files and state DB thread inventory agree" + } else { + "rollout files and state DB thread inventory differ" + }; + let mut check = DoctorCheck::new(CHECK_ID, CHECK_CATEGORY, status, summary).details(details); + + if !missing_active.is_empty() || !missing_archived.is_empty() { + check = check.issue( + DoctorIssue::new( + CheckStatus::Warning, + "rollout files are missing from the state DB", + ) + .measured(format!( + "{} active, {} archived", + missing_active.len(), + missing_archived.len() + )) + .expected("every rollout file has a matching threads row"), + ); + } + if !stale_rows.is_empty() { + check = check.issue( + DoctorIssue::new( + CheckStatus::Warning, + "state DB rows point at missing or unusable rollout files", + ) + .measured(format!("{} stale rows", stale_rows.len())) + .expected("every state DB rollout path is a file on disk"), + ); + } + if !archive_mismatches.is_empty() { + check = check.issue( + DoctorIssue::new( + CheckStatus::Warning, + "state DB archive flags disagree with rollout file locations", + ) + .measured(format!("{} mismatched rows", archive_mismatches.len())) + .expected( + "rows under archived_sessions are archived and rows under sessions are active", + ), + ); + } + if !duplicate_rollout_thread_ids.is_empty() || !duplicate_db_paths.is_empty() { + check = check.issue( + DoctorIssue::new( + CheckStatus::Warning, + "duplicate thread inventory entries found", + ) + .measured(format!( + "{} duplicate rollout thread ids, {} duplicate DB paths", + duplicate_rollout_thread_ids.len(), + duplicate_db_paths.len() + )) + .expected("one rollout path and thread id per thread") + .remedy("Attach the doctor report to a bug report so support can inspect samples."), + ); + } + if !scan.scan_errors.is_empty() || !scan.malformed_names.is_empty() || scan.reached_scan_cap { + check = check.issue( + DoctorIssue::new( + CheckStatus::Warning, + "rollout scan was incomplete or found bad files", + ) + .measured(format!( + "{} scan errors, {} malformed names, scan cap reached: {}", + scan.scan_errors.len(), + scan.malformed_names.len(), + scan.reached_scan_cap + )) + .expected("rollout directories are fully scannable") + .remedy("Check file permissions and unexpected files under CODEX_HOME sessions."), + ); + } + check +} + +async fn scan_rollout_files(codex_home: &Path) -> RolloutScan { + let mut scan = RolloutScan::default(); + scan_rollout_root( + &codex_home.join("sessions"), + /*archived*/ false, + &mut scan, + ) + .await; + scan_rollout_root( + &codex_home.join("archived_sessions"), + /*archived*/ true, + &mut scan, + ) + .await; + scan +} + +async fn scan_rollout_root(root: &Path, archived: bool, scan: &mut RolloutScan) { + let mut dirs = vec![root.to_path_buf()]; + while let Some(dir) = dirs.pop() { + if scan.reached_scan_cap { + return; + } + let entries = match std::fs::read_dir(&dir) { + Ok(entries) => entries, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, + Err(err) => { + scan.record_scan_error(format!("{} ({err})", dir.display())); + continue; + } + }; + for entry in entries { + if scan.reached_scan_cap { + return; + } + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + scan.record_scan_error(format!("{} ({err})", dir.display())); + continue; + } + }; + let path = entry.path(); + let file_type = match entry.file_type() { + Ok(file_type) => file_type, + Err(err) => { + scan.record_scan_error(format!("{} ({err})", path.display())); + continue; + } + }; + if file_type.is_dir() { + dirs.push(path); + continue; + } + if !file_type.is_file() || !is_rollout_file(&path) { + continue; + } + if scan.candidate_count() >= MAX_PARITY_SCAN_FILES { + scan.reached_scan_cap = true; + return; + } + let thread_id = match thread_id_from_rollout(&path).await { + RolloutThreadId::Id(thread_id) => thread_id, + RolloutThreadId::MalformedName => { + scan.record_malformed_name(path.clone()); + continue; + } + RolloutThreadId::Unusable(reason) => { + scan.record_scan_error(format!("{} ({reason})", path.display())); + continue; + } + }; + scan.files.push(RolloutAuditFile { + key: path_key(&path), + path, + archived, + thread_id, + }); + } + } +} + +async fn thread_id_from_rollout(path: &Path) -> RolloutThreadId { + let items = match RolloutRecorder::load_rollout_items(path).await { + Ok((items, _, _)) => items, + Err(err) => return RolloutThreadId::Unusable(err.to_string()), + }; + if items.is_empty() { + return RolloutThreadId::Unusable("no parseable rollout items".to_string()); + } + codex_rollout::builder_from_items(items.as_slice(), path) + .map(|builder| RolloutThreadId::Id(builder.id.to_string())) + .unwrap_or(RolloutThreadId::MalformedName) +} + +fn is_rollout_file(path: &Path) -> bool { + path.extension() == Some(OsStr::new("jsonl")) + && path + .file_name() + .and_then(OsStr::to_str) + .is_some_and(|name| name.starts_with("rollout-")) +} + +fn count_or_skipped(count: usize, complete: bool) -> String { + if complete { + count.to_string() + } else { + "skipped (scan cap reached)".to_string() + } +} + +fn path_key(path: &Path) -> PathBuf { + normalize_for_path_comparison(path).unwrap_or_else(|_| path.to_path_buf()) +} + +fn archived_from_rollout_path(codex_home: &Path, path: &Path) -> Option { + let key = path_key(path); + if key.starts_with(path_key(&codex_home.join("archived_sessions"))) { + return Some(true); + } + if key.starts_with(path_key(&codex_home.join("sessions"))) { + return Some(false); + } + None +} + +fn missing_rollout_paths<'a>( + files: &'a [RolloutAuditFile], + rows_by_key: &HashMap>, + archived: bool, +) -> Vec<&'a Path> { + files + .iter() + .filter(|file| file.archived == archived && !has_matching_thread_row(file, rows_by_key)) + .map(|file| file.path.as_path()) + .collect() +} + +fn has_matching_thread_row( + file: &RolloutAuditFile, + rows_by_key: &HashMap>, +) -> bool { + let Some(rows) = rows_by_key.get(&file.key) else { + return false; + }; + rows.iter().any(|row| row.id == file.thread_id.as_str()) +} + +fn duplicate_rollout_thread_ids(files: &[RolloutAuditFile]) -> Vec { + let mut seen = HashSet::new(); + let mut duplicates = HashSet::new(); + for thread_id in files.iter().map(|file| file.thread_id.as_str()) { + if !seen.insert(thread_id) { + duplicates.insert(thread_id.to_string()); + } + } + let mut duplicates = duplicates.into_iter().collect::>(); + duplicates.sort(); + duplicates +} + +fn duplicate_db_paths(rows_by_key: &HashMap>) -> Vec { + let mut paths = rows_by_key + .iter() + .filter(|(_, rows)| rows.len() > 1) + .map(|(path, _)| path.clone()) + .collect::>(); + paths.sort(); + paths +} + +fn source_category(source: &str) -> &'static str { + let parsed = serde_json::from_str::(source) + .or_else(|_| serde_json::from_value(serde_json::Value::String(source.to_string()))); + let Ok(source) = parsed else { + return "unparsable"; + }; + + match source { + SessionSource::Cli => "cli", + SessionSource::VSCode => "vscode", + SessionSource::Exec => "exec", + SessionSource::Mcp => "mcp", + SessionSource::Custom(_) => "custom", + SessionSource::Internal(InternalSessionSource::MemoryConsolidation) => { + "internal:memory_consolidation" + } + SessionSource::SubAgent(SubAgentSource::Review) => "subagent:review", + SessionSource::SubAgent(SubAgentSource::Compact) => "subagent:compact", + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) => "subagent:thread_spawn", + SessionSource::SubAgent(SubAgentSource::MemoryConsolidation) => { + "subagent:memory_consolidation" + } + SessionSource::SubAgent(SubAgentSource::Other(_)) => "subagent:other", + SessionSource::Unknown => "unknown", + } +} + +fn count_summary(values: I) -> String +where + I: Iterator, + V: Into, +{ + let mut counts = BTreeMap::::new(); + for value in values { + *counts.entry(value.into()).or_default() += 1; + } + if counts.is_empty() { + return "none".to_string(); + } + + let mut entries = counts.into_iter().collect::>(); + entries.sort_by(|(left_value, left_count), (right_value, right_count)| { + right_count + .cmp(left_count) + .then_with(|| left_value.cmp(right_value)) + }); + let omitted_categories = entries.len().saturating_sub(SUMMARY_LIMIT); + let omitted_rows = entries + .iter() + .skip(SUMMARY_LIMIT) + .map(|(_, count)| count) + .sum::(); + let mut parts = entries + .into_iter() + .take(SUMMARY_LIMIT) + .map(|(value, count)| format!("{value}={count}")) + .collect::>(); + if omitted_categories > 0 { + parts.push(format!( + "other={omitted_rows} across {omitted_categories} categories" + )); + } + parts.join(", ") +} + +fn push_path_samples<'a>( + details: &mut Vec, + label: &str, + paths: impl Iterator, +) { + push_samples(details, label, paths.map(|path| path.display().to_string())); +} + +fn push_samples(details: &mut Vec, label: &str, values: I) +where + I: Iterator, + V: ToString, +{ + for value in values.take(SAMPLE_LIMIT) { + details.push(format!("{label}: {}", value.to_string())); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::ThreadId; + use codex_protocol::protocol::RolloutItem; + use codex_protocol::protocol::RolloutLine; + use pretty_assertions::assert_eq; + use sqlx::sqlite::SqliteConnectOptions; + use sqlx::sqlite::SqlitePoolOptions; + use tempfile::TempDir; + + #[tokio::test] + async fn thread_inventory_check_ok_when_rollouts_match_db() { + let fixture = Fixture::new().await; + let active_path = fixture.write_rollout( + /*archived*/ false, + "2025-01-02T10-00-00", + "00000000-0000-0000-0000-000000000001", + ); + let archived_path = fixture.write_rollout( + /*archived*/ true, + "2025-01-02T11-00-00", + "00000000-0000-0000-0000-000000000002", + ); + fixture + .insert_thread_row( + "00000000-0000-0000-0000-000000000001", + active_path.as_path(), + /*archived*/ false, + ) + .await; + fixture + .insert_thread_row( + "00000000-0000-0000-0000-000000000002", + archived_path.as_path(), + /*archived*/ true, + ) + .await; + + let check = thread_inventory_check_for_roots( + fixture.codex_home.path(), + fixture.sqlite_home.path(), + "test-provider", + ) + .await; + + assert_eq!(check.status, CheckStatus::Ok); + assert_eq!(check.category, CHECK_CATEGORY); + assert_detail(&check, "rollout DB missing active rows", "0"); + assert_detail(&check, "rollout DB missing archived rows", "0"); + assert_detail(&check, "rollout DB stale rows", "0"); + assert_detail(&check, "rollout DB archive mismatches", "0"); + } + + #[tokio::test] + async fn thread_inventory_check_warns_for_missing_stale_and_mismatched_rows() { + let fixture = Fixture::new().await; + let missing_path = fixture.write_rollout( + /*archived*/ false, + "2025-01-02T10-00-00", + "00000000-0000-0000-0000-000000000001", + ); + let mismatched_path = fixture.write_rollout( + /*archived*/ true, + "2025-01-02T11-00-00", + "00000000-0000-0000-0000-000000000002", + ); + let stale_path = fixture + .codex_home + .path() + .join("sessions/2025/01/02/rollout-2025-01-02T12-00-00-00000000-0000-0000-0000-000000000003.jsonl"); + fixture + .insert_thread_row( + "00000000-0000-0000-0000-000000000002", + mismatched_path.as_path(), + /*archived*/ false, + ) + .await; + fixture + .insert_thread_row( + "00000000-0000-0000-0000-000000000003", + stale_path.as_path(), + /*archived*/ false, + ) + .await; + + let check = thread_inventory_check_for_roots( + fixture.codex_home.path(), + fixture.sqlite_home.path(), + "test-provider", + ) + .await; + + assert_eq!(check.status, CheckStatus::Warning); + assert_eq!(check.issues.len(), 3); + assert_detail(&check, "rollout DB missing active rows", "1"); + assert_detail(&check, "rollout DB stale rows", "1"); + assert_detail(&check, "rollout DB archive mismatches", "1"); + assert_eq!(check.remediation, None); + assert!(check.issues.iter().all(|issue| { + !issue + .remedy + .as_deref() + .is_some_and(|remedy| remedy.starts_with("Restart Codex")) + })); + assert!( + check + .details + .iter() + .any(|detail| detail.contains(missing_path.to_string_lossy().as_ref())) + ); + } + + struct Fixture { + codex_home: TempDir, + sqlite_home: TempDir, + } + + impl Fixture { + async fn new() -> Self { + let codex_home = TempDir::new().expect("codex home"); + let sqlite_home = TempDir::new().expect("sqlite home"); + let _runtime = codex_state::StateRuntime::init( + sqlite_home.path().to_path_buf(), + "test-provider".to_string(), + ) + .await + .expect("state runtime"); + Self { + codex_home, + sqlite_home, + } + } + + fn write_rollout(&self, archived: bool, timestamp: &str, thread_id: &str) -> PathBuf { + let root = if archived { + self.codex_home.path().join("archived_sessions") + } else { + self.codex_home.path().join("sessions/2025/01/02") + }; + std::fs::create_dir_all(&root).expect("rollout dir"); + let path = root.join(format!("rollout-{timestamp}-{thread_id}.jsonl")); + let rollout_line = RolloutLine { + timestamp: timestamp.to_string(), + item: RolloutItem::SessionMeta(codex_protocol::protocol::SessionMetaLine { + meta: codex_protocol::protocol::SessionMeta { + id: ThreadId::from_string(thread_id).expect("thread id"), + timestamp: timestamp.to_string(), + cwd: self.codex_home.path().to_path_buf(), + originator: "test".to_string(), + cli_version: "test".to_string(), + source: SessionSource::Cli, + model_provider: Some("test-provider".to_string()), + ..Default::default() + }, + git: None, + }), + }; + let contents = serde_json::to_string(&rollout_line).expect("rollout line"); + std::fs::write(&path, format!("{contents}\n")).expect("rollout file"); + path + } + + async fn insert_thread_row(&self, id: &str, rollout_path: &Path, archived: bool) { + let state_db_path = codex_state::state_db_path(self.sqlite_home.path()); + let options = SqliteConnectOptions::new() + .filename(state_db_path) + .create_if_missing(false); + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await + .expect("sqlite pool"); + sqlx::query( + r#" +INSERT INTO threads ( + id, + rollout_path, + created_at, + updated_at, + source, + model_provider, + cwd, + title, + sandbox_policy, + approval_mode, + archived, + archived_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(id) + .bind(rollout_path.display().to_string()) + .bind(1_i64) + .bind(1_i64) + .bind("cli") + .bind("test-provider") + .bind(self.codex_home.path().display().to_string()) + .bind("test title") + .bind("read-only") + .bind("on-request") + .bind(if archived { 1_i64 } else { 0_i64 }) + .bind(archived.then_some(1_i64)) + .execute(&pool) + .await + .expect("insert thread row"); + pool.close().await; + } + } + + fn assert_detail(check: &DoctorCheck, label: &str, expected: &str) { + let prefix = format!("{label}: "); + let actual = check + .details + .iter() + .find_map(|detail| detail.strip_prefix(&prefix)) + .expect("detail should exist"); + assert_eq!(actual, expected); + } + + #[test] + fn source_category_coarsens_structured_sources() { + assert_eq!(source_category("cli"), "cli"); + assert_eq!( + source_category(r#"{"subagent":"memory_consolidation"}"#), + "subagent:memory_consolidation" + ); + assert_eq!( + source_category( + r#"{"subagent":{"thread_spawn":{"parent_thread_id":"00000000-0000-0000-0000-000000000001","depth":2}}}"#, + ), + "subagent:thread_spawn" + ); + } + + #[test] + fn count_summary_caps_distinct_values() { + let summary = count_summary(["a", "b", "c", "d", "e", "f", "g", "h", "i"].into_iter()); + + assert_eq!( + summary, + "a=1, b=1, c=1, d=1, e=1, f=1, g=1, h=1, other=1 across 1 categories" + ); + } +} diff --git a/codex-rs/state/src/audit.rs b/codex-rs/state/src/audit.rs new file mode 100644 index 0000000000..0ac4aadfaf --- /dev/null +++ b/codex-rs/state/src/audit.rs @@ -0,0 +1,55 @@ +//! Read-only state database queries for diagnostics. + +use anyhow::Result; +use log::LevelFilter; +use sqlx::ConnectOptions; +use sqlx::Row; +use sqlx::sqlite::SqliteConnectOptions; +use sqlx::sqlite::SqlitePoolOptions; +use std::path::Path; +use std::path::PathBuf; + +/// Minimal thread metadata used by read-only state database audits. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ThreadStateAuditRow { + pub id: String, + pub rollout_path: PathBuf, + pub archived: bool, + pub source: String, + pub model_provider: String, +} + +/// Read persisted thread rows from a state DB without creating, migrating, or repairing it. +pub async fn read_thread_state_audit_rows(path: &Path) -> Result> { + let options = SqliteConnectOptions::new() + .filename(path) + .create_if_missing(false) + .read_only(true) + .log_statements(LevelFilter::Off); + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await?; + let rows = sqlx::query( + r#" +SELECT id, rollout_path, archived, source, model_provider +FROM threads + "#, + ) + .fetch_all(&pool) + .await?; + pool.close().await; + + rows.into_iter() + .map(|row| { + let archived: i64 = row.try_get("archived")?; + Ok(ThreadStateAuditRow { + id: row.try_get("id")?, + rollout_path: PathBuf::from(row.try_get::("rollout_path")?), + archived: archived != 0, + source: row.try_get("source")?, + model_provider: row.try_get("model_provider")?, + }) + }) + .collect() +} diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index f35b3420f6..678ee11eee 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -4,6 +4,7 @@ //! from JSONL rollouts and mirrors it into a local SQLite database. Backfill //! orchestration and rollout scanning live in `codex-core`. +mod audit; mod extract; pub mod log_db; mod migrations; @@ -19,6 +20,8 @@ pub use model::Phase2JobClaimOutcome; /// Preferred entrypoint: owns configuration and metrics. pub use runtime::StateRuntime; +pub use audit::ThreadStateAuditRow; +pub use audit::read_thread_state_audit_rows; /// Low-level storage engine: useful for focused tests. /// /// Most consumers should prefer [`StateRuntime`].