mirror of
https://github.com/openai/codex.git
synced 2026-05-28 15:00:16 +00:00
Add doctor thread inventory audit (#24305)
## Why
Users have been reporting missing sessions in the app. The app server
thread listing is backed by the SQLite state DB, but the durable source
of truth for a thread still exists on disk as rollout JSONL. When the
state DB is incomplete, doctor should be able to show the mismatch
directly instead of leaving users with a generic state health result.
## What changed
This adds a `threads` doctor check that compares active and archived
rollout files under `CODEX_HOME` with rows in the SQLite `threads`
table. The check reports missing rollout rows, stale DB rows, archive
flag mismatches, duplicate rollout thread IDs, duplicate DB paths,
source/provider summaries, and bounded samples of affected rollout
paths.
It also adds a read-only state audit helper in `codex-rs/state` so
doctor can inspect thread rows without creating, migrating, or repairing
the database.
## Sample output
```text
⚠ threads rollout files are missing from the state DB
default model provider openai
rollout DB active files 3910
rollout DB archived files 2037
rollout DB scan errors 0
rollout DB malformed file names 0
rollout DB scan cap reached false
rollout DB rows 5499
rollout DB active rows 3462
rollout DB archived rows 2037
rollout DB missing active rows 448
rollout DB missing archived rows 0
rollout DB stale rows 0
rollout DB archive mismatches 0
rollout DB duplicate rollout thread ids 0
rollout DB duplicate DB paths 0
rollout DB model providers openai=5359, lmstudio=35, mock_provider=33, lite_llm=26, proxy=26, ollama=15, lms=4, local-usage-limit=1
rollout DB sources vscode=2587, cli=1494, subagent:thread_spawn=577, subagent:other=502, exec=281, subagent:memory_consolidation=46, subagent:review=9, unknown=3
rollout DB missing active sample ~/.codex/sessions/2026/0…857e-a923c712e066.jsonl
rollout DB missing active sample ~/.codex/sessions/2025/0…877a-766dff25c68d.jsonl
rollout DB missing active sample ~/.codex/sessions/2025/0…a8b1-7bbadc836f6e.jsonl
rollout DB missing active sample ~/.codex/sessions/2025/0…a218-e6197f3f62f8.jsonl
rollout DB missing active sample ~/.codex/sessions/2025/0…9011-7e30784f9932.jsonl
```
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2248,6 +2248,7 @@ dependencies = [
|
||||
"codex-protocol",
|
||||
"codex-responses-api-proxy",
|
||||
"codex-rmcp-client",
|
||||
"codex-rollout",
|
||||
"codex-rollout-trace",
|
||||
"codex-sandboxing",
|
||||
"codex-state",
|
||||
|
||||
@@ -49,6 +49,7 @@ codex-plugin = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-responses-api-proxy = { workspace = true }
|
||||
codex-rmcp-client = { workspace = true }
|
||||
codex-rollout = { workspace = true }
|
||||
codex-rollout-trace = { workspace = true }
|
||||
codex-sandboxing = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
|
||||
@@ -71,6 +71,7 @@ mod output;
|
||||
mod progress;
|
||||
mod runtime;
|
||||
mod system;
|
||||
mod thread_inventory;
|
||||
mod title;
|
||||
mod updates;
|
||||
|
||||
@@ -84,6 +85,7 @@ use progress::doctor_progress;
|
||||
use runtime::runtime_check;
|
||||
use runtime::search_check;
|
||||
use system::system_check;
|
||||
use thread_inventory::thread_inventory_check;
|
||||
use title::terminal_title_check;
|
||||
use updates::updates_check;
|
||||
|
||||
@@ -362,6 +364,7 @@ async fn build_report(
|
||||
git_check,
|
||||
terminal_title_check,
|
||||
state_check,
|
||||
thread_inventory_check,
|
||||
background_server_check,
|
||||
reachability_check,
|
||||
) = tokio::join!(
|
||||
@@ -392,6 +395,11 @@ async fn build_report(
|
||||
})
|
||||
},
|
||||
run_async_check("state", progress.clone(), state_check(config)),
|
||||
run_async_check(
|
||||
"thread inventory",
|
||||
progress.clone(),
|
||||
thread_inventory_check(config),
|
||||
),
|
||||
run_async_check(
|
||||
"app-server",
|
||||
progress.clone(),
|
||||
@@ -415,6 +423,7 @@ async fn build_report(
|
||||
git_check,
|
||||
terminal_title_check,
|
||||
state_check,
|
||||
thread_inventory_check,
|
||||
background_server_check,
|
||||
reachability_check,
|
||||
]);
|
||||
|
||||
@@ -27,6 +27,7 @@ const GROUPS: &[OutputGroup] = &[
|
||||
title: "Environment",
|
||||
keys: &[
|
||||
"system", "runtime", "install", "search", "git", "terminal", "title", "state",
|
||||
"threads",
|
||||
],
|
||||
},
|
||||
OutputGroup {
|
||||
@@ -1340,6 +1341,28 @@ Run codex doctor without --summary for detailed diagnostics.
|
||||
assert_eq!(rendered, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn render_human_report_includes_threads_row_in_environment() {
|
||||
let mut report = sample_report();
|
||||
report.checks.push(DoctorCheck::new(
|
||||
"state.rollout_db_parity",
|
||||
"threads",
|
||||
CheckStatus::Warning,
|
||||
"rollout files and state DB thread inventory differ",
|
||||
));
|
||||
|
||||
let rendered = render_human_report(&report, summary_no_color_unicode_options());
|
||||
|
||||
let threads_line = rendered
|
||||
.lines()
|
||||
.find(|line| line.contains("threads"))
|
||||
.expect("threads row should be rendered");
|
||||
assert!(
|
||||
threads_line.contains("rollout files and state DB thread inventory differ"),
|
||||
"{threads_line}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn render_human_report_supports_ascii_output() {
|
||||
let rendered = render_human_report(
|
||||
|
||||
921
codex-rs/cli/src/doctor/thread_inventory.rs
Normal file
921
codex-rs/cli/src/doctor/thread_inventory.rs
Normal file
@@ -0,0 +1,921 @@
|
||||
//! Doctor check that compares rollout files against the SQLite thread inventory.
|
||||
|
||||
use super::CheckStatus;
|
||||
use super::Config;
|
||||
use super::DoctorCheck;
|
||||
use super::DoctorIssue;
|
||||
use codex_protocol::protocol::InternalSessionSource;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_rollout::RolloutRecorder;
|
||||
use codex_state::ThreadStateAuditRow;
|
||||
use codex_utils_path::normalize_for_path_comparison;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::ffi::OsStr;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
const MAX_PARITY_SCAN_FILES: usize = 10_000;
|
||||
const SAMPLE_LIMIT: usize = 5;
|
||||
const SUMMARY_LIMIT: usize = 8;
|
||||
const CHECK_ID: &str = "state.rollout_db_parity";
|
||||
const CHECK_CATEGORY: &str = "threads";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct RolloutAuditFile {
|
||||
path: PathBuf,
|
||||
key: PathBuf,
|
||||
archived: bool,
|
||||
thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct RolloutScan {
|
||||
files: Vec<RolloutAuditFile>,
|
||||
scan_errors: Vec<String>,
|
||||
malformed_names: Vec<PathBuf>,
|
||||
reached_scan_cap: bool,
|
||||
}
|
||||
|
||||
enum RolloutThreadId {
|
||||
Id(String),
|
||||
MalformedName,
|
||||
Unusable(String),
|
||||
}
|
||||
|
||||
impl RolloutScan {
|
||||
fn candidate_count(&self) -> usize {
|
||||
self.files.len() + self.malformed_names.len() + self.scan_errors.len()
|
||||
}
|
||||
|
||||
fn reached_candidate_cap(&self) -> bool {
|
||||
self.candidate_count() >= MAX_PARITY_SCAN_FILES
|
||||
}
|
||||
|
||||
fn record_malformed_name(&mut self, path: PathBuf) {
|
||||
if self.reached_candidate_cap() {
|
||||
self.reached_scan_cap = true;
|
||||
return;
|
||||
}
|
||||
self.malformed_names.push(path);
|
||||
self.reached_scan_cap = self.reached_candidate_cap();
|
||||
}
|
||||
|
||||
fn record_scan_error(&mut self, message: String) {
|
||||
if self.reached_candidate_cap() {
|
||||
self.reached_scan_cap = true;
|
||||
return;
|
||||
}
|
||||
self.scan_errors.push(message);
|
||||
self.reached_scan_cap = self.reached_candidate_cap();
|
||||
}
|
||||
|
||||
fn active_count(&self) -> usize {
|
||||
self.files.iter().filter(|file| !file.archived).count()
|
||||
}
|
||||
|
||||
fn archived_count(&self) -> usize {
|
||||
self.files.iter().filter(|file| file.archived).count()
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn thread_inventory_check(config: &Config) -> DoctorCheck {
|
||||
thread_inventory_check_for_roots(
|
||||
config.codex_home.as_path(),
|
||||
config.sqlite_home.as_path(),
|
||||
config.model_provider_id.as_str(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn thread_inventory_check_for_roots(
|
||||
codex_home: &Path,
|
||||
sqlite_home: &Path,
|
||||
default_provider: &str,
|
||||
) -> DoctorCheck {
|
||||
let scan = scan_rollout_files(codex_home).await;
|
||||
let state_db_path = codex_state::state_db_path(sqlite_home);
|
||||
|
||||
let mut details = vec![
|
||||
format!("default model provider: {default_provider}"),
|
||||
format!("rollout DB active files: {}", scan.active_count()),
|
||||
format!("rollout DB archived files: {}", scan.archived_count()),
|
||||
format!("rollout DB scan errors: {}", scan.scan_errors.len()),
|
||||
format!(
|
||||
"rollout DB malformed file names: {}",
|
||||
scan.malformed_names.len()
|
||||
),
|
||||
format!("rollout DB scan cap reached: {}", scan.reached_scan_cap),
|
||||
];
|
||||
push_samples(
|
||||
&mut details,
|
||||
"rollout DB scan error sample",
|
||||
scan.scan_errors.iter().map(String::as_str),
|
||||
);
|
||||
push_samples(
|
||||
&mut details,
|
||||
"rollout DB malformed file sample",
|
||||
scan.malformed_names
|
||||
.iter()
|
||||
.map(|path| path.display().to_string()),
|
||||
);
|
||||
|
||||
if !state_db_path.is_file() {
|
||||
details.push("rollout DB rows: skipped (state DB missing)".to_string());
|
||||
return missing_state_db_check(scan, details);
|
||||
}
|
||||
|
||||
let rows = match codex_state::read_thread_state_audit_rows(&state_db_path).await {
|
||||
Ok(rows) => rows,
|
||||
Err(err) => {
|
||||
details.push(format!("rollout DB read error: {err}"));
|
||||
return DoctorCheck::new(
|
||||
CHECK_ID,
|
||||
CHECK_CATEGORY,
|
||||
CheckStatus::Warning,
|
||||
"state database thread inventory could not be read",
|
||||
)
|
||||
.details(details)
|
||||
.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"state DB thread rows could not be queried",
|
||||
)
|
||||
.measured(err.to_string())
|
||||
.expected("readable threads table"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
parity_check_from_scan_and_rows(codex_home, scan, rows, details)
|
||||
}
|
||||
|
||||
fn missing_state_db_check(scan: RolloutScan, details: Vec<String>) -> DoctorCheck {
|
||||
if scan.files.is_empty()
|
||||
&& scan.scan_errors.is_empty()
|
||||
&& scan.malformed_names.is_empty()
|
||||
&& !scan.reached_scan_cap
|
||||
{
|
||||
return DoctorCheck::new(
|
||||
CHECK_ID,
|
||||
CHECK_CATEGORY,
|
||||
CheckStatus::Ok,
|
||||
"no rollout/state DB inventory to compare",
|
||||
)
|
||||
.details(details);
|
||||
}
|
||||
|
||||
let summary = if scan.files.is_empty() {
|
||||
"rollout scan was incomplete or found bad files"
|
||||
} else {
|
||||
"state DB is missing while rollout files exist"
|
||||
};
|
||||
let mut check =
|
||||
DoctorCheck::new(CHECK_ID, CHECK_CATEGORY, CheckStatus::Warning, summary).details(details);
|
||||
|
||||
if !scan.files.is_empty() {
|
||||
check = check
|
||||
.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"rollout files exist but the state DB is missing",
|
||||
)
|
||||
.measured(format!("{} rollout files", scan.files.len()))
|
||||
.expected("state DB contains matching thread rows")
|
||||
.remedy("Start Codex with no state DB present so startup backfill can create it from rollout files."),
|
||||
)
|
||||
.remediation(
|
||||
"Start Codex with no state DB present so startup backfill can create it from rollout files.",
|
||||
);
|
||||
}
|
||||
if !scan.scan_errors.is_empty() || !scan.malformed_names.is_empty() || scan.reached_scan_cap {
|
||||
check = check.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"rollout scan was incomplete or found bad files",
|
||||
)
|
||||
.measured(format!(
|
||||
"{} scan errors, {} malformed names, scan cap reached: {}",
|
||||
scan.scan_errors.len(),
|
||||
scan.malformed_names.len(),
|
||||
scan.reached_scan_cap
|
||||
))
|
||||
.expected("rollout directories are fully scannable")
|
||||
.remedy("Check file permissions and unexpected files under CODEX_HOME sessions."),
|
||||
);
|
||||
}
|
||||
check
|
||||
}
|
||||
|
||||
fn parity_check_from_scan_and_rows(
|
||||
codex_home: &Path,
|
||||
scan: RolloutScan,
|
||||
rows: Vec<ThreadStateAuditRow>,
|
||||
mut details: Vec<String>,
|
||||
) -> DoctorCheck {
|
||||
let rollout_by_key = scan
|
||||
.files
|
||||
.iter()
|
||||
.map(|file| (file.key.clone(), file))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let mut rows_by_key: HashMap<PathBuf, Vec<&ThreadStateAuditRow>> = HashMap::new();
|
||||
for row in &rows {
|
||||
rows_by_key
|
||||
.entry(path_key(&row.rollout_path))
|
||||
.or_default()
|
||||
.push(row);
|
||||
}
|
||||
|
||||
let missing_active = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ false);
|
||||
let missing_archived = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ true);
|
||||
let scan_complete = !scan.reached_scan_cap;
|
||||
let stale_rows = if scan_complete {
|
||||
rows.iter()
|
||||
.filter(|row| !row.rollout_path.is_file())
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let archive_mismatches = if scan_complete {
|
||||
rows.iter()
|
||||
.filter_map(|row| {
|
||||
let expected_archived = rollout_by_key
|
||||
.get(&path_key(&row.rollout_path))
|
||||
.map(|file| file.archived)
|
||||
.or_else(|| {
|
||||
row.rollout_path
|
||||
.is_file()
|
||||
.then(|| archived_from_rollout_path(codex_home, &row.rollout_path))
|
||||
.flatten()
|
||||
})?;
|
||||
(expected_archived != row.archived).then_some(row)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let duplicate_rollout_thread_ids = duplicate_rollout_thread_ids(&scan.files);
|
||||
let duplicate_db_paths = duplicate_db_paths(&rows_by_key);
|
||||
let archived_rows = rows.iter().filter(|row| row.archived).count();
|
||||
let active_rows = rows.len() - archived_rows;
|
||||
|
||||
details.extend([
|
||||
format!("rollout DB rows: {}", rows.len()),
|
||||
format!("rollout DB active rows: {active_rows}"),
|
||||
format!("rollout DB archived rows: {archived_rows}"),
|
||||
format!("rollout DB missing active rows: {}", missing_active.len()),
|
||||
format!(
|
||||
"rollout DB missing archived rows: {}",
|
||||
missing_archived.len()
|
||||
),
|
||||
format!(
|
||||
"rollout DB stale rows: {}",
|
||||
count_or_skipped(stale_rows.len(), scan_complete)
|
||||
),
|
||||
format!(
|
||||
"rollout DB archive mismatches: {}",
|
||||
count_or_skipped(archive_mismatches.len(), scan_complete)
|
||||
),
|
||||
format!(
|
||||
"rollout DB duplicate rollout thread ids: {}",
|
||||
duplicate_rollout_thread_ids.len()
|
||||
),
|
||||
format!(
|
||||
"rollout DB duplicate DB paths: {}",
|
||||
duplicate_db_paths.len()
|
||||
),
|
||||
format!(
|
||||
"rollout DB model providers: {}",
|
||||
count_summary(rows.iter().map(|row| row.model_provider.as_str()))
|
||||
),
|
||||
format!(
|
||||
"rollout DB sources: {}",
|
||||
count_summary(rows.iter().map(|row| source_category(&row.source)))
|
||||
),
|
||||
]);
|
||||
push_path_samples(
|
||||
&mut details,
|
||||
"rollout DB missing active sample",
|
||||
missing_active.iter().copied(),
|
||||
);
|
||||
push_path_samples(
|
||||
&mut details,
|
||||
"rollout DB missing archived sample",
|
||||
missing_archived.iter().copied(),
|
||||
);
|
||||
push_path_samples(
|
||||
&mut details,
|
||||
"rollout DB stale row sample",
|
||||
stale_rows.iter().map(|row| row.rollout_path.as_path()),
|
||||
);
|
||||
push_path_samples(
|
||||
&mut details,
|
||||
"rollout DB archive mismatch sample",
|
||||
archive_mismatches
|
||||
.iter()
|
||||
.map(|row| row.rollout_path.as_path()),
|
||||
);
|
||||
push_samples(
|
||||
&mut details,
|
||||
"rollout DB duplicate rollout thread id sample",
|
||||
duplicate_rollout_thread_ids.iter().map(String::as_str),
|
||||
);
|
||||
push_path_samples(
|
||||
&mut details,
|
||||
"rollout DB duplicate DB path sample",
|
||||
duplicate_db_paths.iter().map(PathBuf::as_path),
|
||||
);
|
||||
|
||||
let status = if scan.scan_errors.is_empty()
|
||||
&& scan.malformed_names.is_empty()
|
||||
&& !scan.reached_scan_cap
|
||||
&& missing_active.is_empty()
|
||||
&& missing_archived.is_empty()
|
||||
&& stale_rows.is_empty()
|
||||
&& archive_mismatches.is_empty()
|
||||
&& duplicate_rollout_thread_ids.is_empty()
|
||||
&& duplicate_db_paths.is_empty()
|
||||
{
|
||||
CheckStatus::Ok
|
||||
} else {
|
||||
CheckStatus::Warning
|
||||
};
|
||||
|
||||
let summary = if status == CheckStatus::Ok {
|
||||
"rollout files and state DB thread inventory agree"
|
||||
} else {
|
||||
"rollout files and state DB thread inventory differ"
|
||||
};
|
||||
let mut check = DoctorCheck::new(CHECK_ID, CHECK_CATEGORY, status, summary).details(details);
|
||||
|
||||
if !missing_active.is_empty() || !missing_archived.is_empty() {
|
||||
check = check.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"rollout files are missing from the state DB",
|
||||
)
|
||||
.measured(format!(
|
||||
"{} active, {} archived",
|
||||
missing_active.len(),
|
||||
missing_archived.len()
|
||||
))
|
||||
.expected("every rollout file has a matching threads row"),
|
||||
);
|
||||
}
|
||||
if !stale_rows.is_empty() {
|
||||
check = check.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"state DB rows point at missing or unusable rollout files",
|
||||
)
|
||||
.measured(format!("{} stale rows", stale_rows.len()))
|
||||
.expected("every state DB rollout path is a file on disk"),
|
||||
);
|
||||
}
|
||||
if !archive_mismatches.is_empty() {
|
||||
check = check.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"state DB archive flags disagree with rollout file locations",
|
||||
)
|
||||
.measured(format!("{} mismatched rows", archive_mismatches.len()))
|
||||
.expected(
|
||||
"rows under archived_sessions are archived and rows under sessions are active",
|
||||
),
|
||||
);
|
||||
}
|
||||
if !duplicate_rollout_thread_ids.is_empty() || !duplicate_db_paths.is_empty() {
|
||||
check = check.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"duplicate thread inventory entries found",
|
||||
)
|
||||
.measured(format!(
|
||||
"{} duplicate rollout thread ids, {} duplicate DB paths",
|
||||
duplicate_rollout_thread_ids.len(),
|
||||
duplicate_db_paths.len()
|
||||
))
|
||||
.expected("one rollout path and thread id per thread")
|
||||
.remedy("Attach the doctor report to a bug report so support can inspect samples."),
|
||||
);
|
||||
}
|
||||
if !scan.scan_errors.is_empty() || !scan.malformed_names.is_empty() || scan.reached_scan_cap {
|
||||
check = check.issue(
|
||||
DoctorIssue::new(
|
||||
CheckStatus::Warning,
|
||||
"rollout scan was incomplete or found bad files",
|
||||
)
|
||||
.measured(format!(
|
||||
"{} scan errors, {} malformed names, scan cap reached: {}",
|
||||
scan.scan_errors.len(),
|
||||
scan.malformed_names.len(),
|
||||
scan.reached_scan_cap
|
||||
))
|
||||
.expected("rollout directories are fully scannable")
|
||||
.remedy("Check file permissions and unexpected files under CODEX_HOME sessions."),
|
||||
);
|
||||
}
|
||||
check
|
||||
}
|
||||
|
||||
async fn scan_rollout_files(codex_home: &Path) -> RolloutScan {
|
||||
let mut scan = RolloutScan::default();
|
||||
scan_rollout_root(
|
||||
&codex_home.join("sessions"),
|
||||
/*archived*/ false,
|
||||
&mut scan,
|
||||
)
|
||||
.await;
|
||||
scan_rollout_root(
|
||||
&codex_home.join("archived_sessions"),
|
||||
/*archived*/ true,
|
||||
&mut scan,
|
||||
)
|
||||
.await;
|
||||
scan
|
||||
}
|
||||
|
||||
async fn scan_rollout_root(root: &Path, archived: bool, scan: &mut RolloutScan) {
|
||||
let mut dirs = vec![root.to_path_buf()];
|
||||
while let Some(dir) = dirs.pop() {
|
||||
if scan.reached_scan_cap {
|
||||
return;
|
||||
}
|
||||
let entries = match std::fs::read_dir(&dir) {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
|
||||
Err(err) => {
|
||||
scan.record_scan_error(format!("{} ({err})", dir.display()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
for entry in entries {
|
||||
if scan.reached_scan_cap {
|
||||
return;
|
||||
}
|
||||
let entry = match entry {
|
||||
Ok(entry) => entry,
|
||||
Err(err) => {
|
||||
scan.record_scan_error(format!("{} ({err})", dir.display()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let path = entry.path();
|
||||
let file_type = match entry.file_type() {
|
||||
Ok(file_type) => file_type,
|
||||
Err(err) => {
|
||||
scan.record_scan_error(format!("{} ({err})", path.display()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if file_type.is_dir() {
|
||||
dirs.push(path);
|
||||
continue;
|
||||
}
|
||||
if !file_type.is_file() || !is_rollout_file(&path) {
|
||||
continue;
|
||||
}
|
||||
if scan.candidate_count() >= MAX_PARITY_SCAN_FILES {
|
||||
scan.reached_scan_cap = true;
|
||||
return;
|
||||
}
|
||||
let thread_id = match thread_id_from_rollout(&path).await {
|
||||
RolloutThreadId::Id(thread_id) => thread_id,
|
||||
RolloutThreadId::MalformedName => {
|
||||
scan.record_malformed_name(path.clone());
|
||||
continue;
|
||||
}
|
||||
RolloutThreadId::Unusable(reason) => {
|
||||
scan.record_scan_error(format!("{} ({reason})", path.display()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
scan.files.push(RolloutAuditFile {
|
||||
key: path_key(&path),
|
||||
path,
|
||||
archived,
|
||||
thread_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_id_from_rollout(path: &Path) -> RolloutThreadId {
|
||||
let items = match RolloutRecorder::load_rollout_items(path).await {
|
||||
Ok((items, _, _)) => items,
|
||||
Err(err) => return RolloutThreadId::Unusable(err.to_string()),
|
||||
};
|
||||
if items.is_empty() {
|
||||
return RolloutThreadId::Unusable("no parseable rollout items".to_string());
|
||||
}
|
||||
codex_rollout::builder_from_items(items.as_slice(), path)
|
||||
.map(|builder| RolloutThreadId::Id(builder.id.to_string()))
|
||||
.unwrap_or(RolloutThreadId::MalformedName)
|
||||
}
|
||||
|
||||
fn is_rollout_file(path: &Path) -> bool {
|
||||
path.extension() == Some(OsStr::new("jsonl"))
|
||||
&& path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.is_some_and(|name| name.starts_with("rollout-"))
|
||||
}
|
||||
|
||||
fn count_or_skipped(count: usize, complete: bool) -> String {
|
||||
if complete {
|
||||
count.to_string()
|
||||
} else {
|
||||
"skipped (scan cap reached)".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
fn path_key(path: &Path) -> PathBuf {
|
||||
normalize_for_path_comparison(path).unwrap_or_else(|_| path.to_path_buf())
|
||||
}
|
||||
|
||||
fn archived_from_rollout_path(codex_home: &Path, path: &Path) -> Option<bool> {
|
||||
let key = path_key(path);
|
||||
if key.starts_with(path_key(&codex_home.join("archived_sessions"))) {
|
||||
return Some(true);
|
||||
}
|
||||
if key.starts_with(path_key(&codex_home.join("sessions"))) {
|
||||
return Some(false);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn missing_rollout_paths<'a>(
|
||||
files: &'a [RolloutAuditFile],
|
||||
rows_by_key: &HashMap<PathBuf, Vec<&ThreadStateAuditRow>>,
|
||||
archived: bool,
|
||||
) -> Vec<&'a Path> {
|
||||
files
|
||||
.iter()
|
||||
.filter(|file| file.archived == archived && !has_matching_thread_row(file, rows_by_key))
|
||||
.map(|file| file.path.as_path())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn has_matching_thread_row(
|
||||
file: &RolloutAuditFile,
|
||||
rows_by_key: &HashMap<PathBuf, Vec<&ThreadStateAuditRow>>,
|
||||
) -> bool {
|
||||
let Some(rows) = rows_by_key.get(&file.key) else {
|
||||
return false;
|
||||
};
|
||||
rows.iter().any(|row| row.id == file.thread_id.as_str())
|
||||
}
|
||||
|
||||
fn duplicate_rollout_thread_ids(files: &[RolloutAuditFile]) -> Vec<String> {
|
||||
let mut seen = HashSet::new();
|
||||
let mut duplicates = HashSet::new();
|
||||
for thread_id in files.iter().map(|file| file.thread_id.as_str()) {
|
||||
if !seen.insert(thread_id) {
|
||||
duplicates.insert(thread_id.to_string());
|
||||
}
|
||||
}
|
||||
let mut duplicates = duplicates.into_iter().collect::<Vec<_>>();
|
||||
duplicates.sort();
|
||||
duplicates
|
||||
}
|
||||
|
||||
fn duplicate_db_paths(rows_by_key: &HashMap<PathBuf, Vec<&ThreadStateAuditRow>>) -> Vec<PathBuf> {
|
||||
let mut paths = rows_by_key
|
||||
.iter()
|
||||
.filter(|(_, rows)| rows.len() > 1)
|
||||
.map(|(path, _)| path.clone())
|
||||
.collect::<Vec<_>>();
|
||||
paths.sort();
|
||||
paths
|
||||
}
|
||||
|
||||
fn source_category(source: &str) -> &'static str {
|
||||
let parsed = serde_json::from_str::<SessionSource>(source)
|
||||
.or_else(|_| serde_json::from_value(serde_json::Value::String(source.to_string())));
|
||||
let Ok(source) = parsed else {
|
||||
return "unparsable";
|
||||
};
|
||||
|
||||
match source {
|
||||
SessionSource::Cli => "cli",
|
||||
SessionSource::VSCode => "vscode",
|
||||
SessionSource::Exec => "exec",
|
||||
SessionSource::Mcp => "mcp",
|
||||
SessionSource::Custom(_) => "custom",
|
||||
SessionSource::Internal(InternalSessionSource::MemoryConsolidation) => {
|
||||
"internal:memory_consolidation"
|
||||
}
|
||||
SessionSource::SubAgent(SubAgentSource::Review) => "subagent:review",
|
||||
SessionSource::SubAgent(SubAgentSource::Compact) => "subagent:compact",
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) => "subagent:thread_spawn",
|
||||
SessionSource::SubAgent(SubAgentSource::MemoryConsolidation) => {
|
||||
"subagent:memory_consolidation"
|
||||
}
|
||||
SessionSource::SubAgent(SubAgentSource::Other(_)) => "subagent:other",
|
||||
SessionSource::Unknown => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
fn count_summary<I, V>(values: I) -> String
|
||||
where
|
||||
I: Iterator<Item = V>,
|
||||
V: Into<String>,
|
||||
{
|
||||
let mut counts = BTreeMap::<String, usize>::new();
|
||||
for value in values {
|
||||
*counts.entry(value.into()).or_default() += 1;
|
||||
}
|
||||
if counts.is_empty() {
|
||||
return "none".to_string();
|
||||
}
|
||||
|
||||
let mut entries = counts.into_iter().collect::<Vec<_>>();
|
||||
entries.sort_by(|(left_value, left_count), (right_value, right_count)| {
|
||||
right_count
|
||||
.cmp(left_count)
|
||||
.then_with(|| left_value.cmp(right_value))
|
||||
});
|
||||
let omitted_categories = entries.len().saturating_sub(SUMMARY_LIMIT);
|
||||
let omitted_rows = entries
|
||||
.iter()
|
||||
.skip(SUMMARY_LIMIT)
|
||||
.map(|(_, count)| count)
|
||||
.sum::<usize>();
|
||||
let mut parts = entries
|
||||
.into_iter()
|
||||
.take(SUMMARY_LIMIT)
|
||||
.map(|(value, count)| format!("{value}={count}"))
|
||||
.collect::<Vec<_>>();
|
||||
if omitted_categories > 0 {
|
||||
parts.push(format!(
|
||||
"other={omitted_rows} across {omitted_categories} categories"
|
||||
));
|
||||
}
|
||||
parts.join(", ")
|
||||
}
|
||||
|
||||
fn push_path_samples<'a>(
|
||||
details: &mut Vec<String>,
|
||||
label: &str,
|
||||
paths: impl Iterator<Item = &'a Path>,
|
||||
) {
|
||||
push_samples(details, label, paths.map(|path| path.display().to_string()));
|
||||
}
|
||||
|
||||
fn push_samples<I, V>(details: &mut Vec<String>, label: &str, values: I)
|
||||
where
|
||||
I: Iterator<Item = V>,
|
||||
V: ToString,
|
||||
{
|
||||
for value in values.take(SAMPLE_LIMIT) {
|
||||
details.push(format!("{label}: {}", value.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use pretty_assertions::assert_eq;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::sqlite::SqlitePoolOptions;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_inventory_check_ok_when_rollouts_match_db() {
|
||||
let fixture = Fixture::new().await;
|
||||
let active_path = fixture.write_rollout(
|
||||
/*archived*/ false,
|
||||
"2025-01-02T10-00-00",
|
||||
"00000000-0000-0000-0000-000000000001",
|
||||
);
|
||||
let archived_path = fixture.write_rollout(
|
||||
/*archived*/ true,
|
||||
"2025-01-02T11-00-00",
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
);
|
||||
fixture
|
||||
.insert_thread_row(
|
||||
"00000000-0000-0000-0000-000000000001",
|
||||
active_path.as_path(),
|
||||
/*archived*/ false,
|
||||
)
|
||||
.await;
|
||||
fixture
|
||||
.insert_thread_row(
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
archived_path.as_path(),
|
||||
/*archived*/ true,
|
||||
)
|
||||
.await;
|
||||
|
||||
let check = thread_inventory_check_for_roots(
|
||||
fixture.codex_home.path(),
|
||||
fixture.sqlite_home.path(),
|
||||
"test-provider",
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(check.status, CheckStatus::Ok);
|
||||
assert_eq!(check.category, CHECK_CATEGORY);
|
||||
assert_detail(&check, "rollout DB missing active rows", "0");
|
||||
assert_detail(&check, "rollout DB missing archived rows", "0");
|
||||
assert_detail(&check, "rollout DB stale rows", "0");
|
||||
assert_detail(&check, "rollout DB archive mismatches", "0");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_inventory_check_warns_for_missing_stale_and_mismatched_rows() {
|
||||
let fixture = Fixture::new().await;
|
||||
let missing_path = fixture.write_rollout(
|
||||
/*archived*/ false,
|
||||
"2025-01-02T10-00-00",
|
||||
"00000000-0000-0000-0000-000000000001",
|
||||
);
|
||||
let mismatched_path = fixture.write_rollout(
|
||||
/*archived*/ true,
|
||||
"2025-01-02T11-00-00",
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
);
|
||||
let stale_path = fixture
|
||||
.codex_home
|
||||
.path()
|
||||
.join("sessions/2025/01/02/rollout-2025-01-02T12-00-00-00000000-0000-0000-0000-000000000003.jsonl");
|
||||
fixture
|
||||
.insert_thread_row(
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
mismatched_path.as_path(),
|
||||
/*archived*/ false,
|
||||
)
|
||||
.await;
|
||||
fixture
|
||||
.insert_thread_row(
|
||||
"00000000-0000-0000-0000-000000000003",
|
||||
stale_path.as_path(),
|
||||
/*archived*/ false,
|
||||
)
|
||||
.await;
|
||||
|
||||
let check = thread_inventory_check_for_roots(
|
||||
fixture.codex_home.path(),
|
||||
fixture.sqlite_home.path(),
|
||||
"test-provider",
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(check.status, CheckStatus::Warning);
|
||||
assert_eq!(check.issues.len(), 3);
|
||||
assert_detail(&check, "rollout DB missing active rows", "1");
|
||||
assert_detail(&check, "rollout DB stale rows", "1");
|
||||
assert_detail(&check, "rollout DB archive mismatches", "1");
|
||||
assert_eq!(check.remediation, None);
|
||||
assert!(check.issues.iter().all(|issue| {
|
||||
!issue
|
||||
.remedy
|
||||
.as_deref()
|
||||
.is_some_and(|remedy| remedy.starts_with("Restart Codex"))
|
||||
}));
|
||||
assert!(
|
||||
check
|
||||
.details
|
||||
.iter()
|
||||
.any(|detail| detail.contains(missing_path.to_string_lossy().as_ref()))
|
||||
);
|
||||
}
|
||||
|
||||
struct Fixture {
|
||||
codex_home: TempDir,
|
||||
sqlite_home: TempDir,
|
||||
}
|
||||
|
||||
impl Fixture {
|
||||
async fn new() -> Self {
|
||||
let codex_home = TempDir::new().expect("codex home");
|
||||
let sqlite_home = TempDir::new().expect("sqlite home");
|
||||
let _runtime = codex_state::StateRuntime::init(
|
||||
sqlite_home.path().to_path_buf(),
|
||||
"test-provider".to_string(),
|
||||
)
|
||||
.await
|
||||
.expect("state runtime");
|
||||
Self {
|
||||
codex_home,
|
||||
sqlite_home,
|
||||
}
|
||||
}
|
||||
|
||||
fn write_rollout(&self, archived: bool, timestamp: &str, thread_id: &str) -> PathBuf {
|
||||
let root = if archived {
|
||||
self.codex_home.path().join("archived_sessions")
|
||||
} else {
|
||||
self.codex_home.path().join("sessions/2025/01/02")
|
||||
};
|
||||
std::fs::create_dir_all(&root).expect("rollout dir");
|
||||
let path = root.join(format!("rollout-{timestamp}-{thread_id}.jsonl"));
|
||||
let rollout_line = RolloutLine {
|
||||
timestamp: timestamp.to_string(),
|
||||
item: RolloutItem::SessionMeta(codex_protocol::protocol::SessionMetaLine {
|
||||
meta: codex_protocol::protocol::SessionMeta {
|
||||
id: ThreadId::from_string(thread_id).expect("thread id"),
|
||||
timestamp: timestamp.to_string(),
|
||||
cwd: self.codex_home.path().to_path_buf(),
|
||||
originator: "test".to_string(),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
model_provider: Some("test-provider".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
let contents = serde_json::to_string(&rollout_line).expect("rollout line");
|
||||
std::fs::write(&path, format!("{contents}\n")).expect("rollout file");
|
||||
path
|
||||
}
|
||||
|
||||
async fn insert_thread_row(&self, id: &str, rollout_path: &Path, archived: bool) {
|
||||
let state_db_path = codex_state::state_db_path(self.sqlite_home.path());
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(state_db_path)
|
||||
.create_if_missing(false);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(1)
|
||||
.connect_with(options)
|
||||
.await
|
||||
.expect("sqlite pool");
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO threads (
|
||||
id,
|
||||
rollout_path,
|
||||
created_at,
|
||||
updated_at,
|
||||
source,
|
||||
model_provider,
|
||||
cwd,
|
||||
title,
|
||||
sandbox_policy,
|
||||
approval_mode,
|
||||
archived,
|
||||
archived_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.bind(rollout_path.display().to_string())
|
||||
.bind(1_i64)
|
||||
.bind(1_i64)
|
||||
.bind("cli")
|
||||
.bind("test-provider")
|
||||
.bind(self.codex_home.path().display().to_string())
|
||||
.bind("test title")
|
||||
.bind("read-only")
|
||||
.bind("on-request")
|
||||
.bind(if archived { 1_i64 } else { 0_i64 })
|
||||
.bind(archived.then_some(1_i64))
|
||||
.execute(&pool)
|
||||
.await
|
||||
.expect("insert thread row");
|
||||
pool.close().await;
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_detail(check: &DoctorCheck, label: &str, expected: &str) {
|
||||
let prefix = format!("{label}: ");
|
||||
let actual = check
|
||||
.details
|
||||
.iter()
|
||||
.find_map(|detail| detail.strip_prefix(&prefix))
|
||||
.expect("detail should exist");
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn source_category_coarsens_structured_sources() {
|
||||
assert_eq!(source_category("cli"), "cli");
|
||||
assert_eq!(
|
||||
source_category(r#"{"subagent":"memory_consolidation"}"#),
|
||||
"subagent:memory_consolidation"
|
||||
);
|
||||
assert_eq!(
|
||||
source_category(
|
||||
r#"{"subagent":{"thread_spawn":{"parent_thread_id":"00000000-0000-0000-0000-000000000001","depth":2}}}"#,
|
||||
),
|
||||
"subagent:thread_spawn"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn count_summary_caps_distinct_values() {
|
||||
let summary = count_summary(["a", "b", "c", "d", "e", "f", "g", "h", "i"].into_iter());
|
||||
|
||||
assert_eq!(
|
||||
summary,
|
||||
"a=1, b=1, c=1, d=1, e=1, f=1, g=1, h=1, other=1 across 1 categories"
|
||||
);
|
||||
}
|
||||
}
|
||||
55
codex-rs/state/src/audit.rs
Normal file
55
codex-rs/state/src/audit.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
//! Read-only state database queries for diagnostics.
|
||||
|
||||
use anyhow::Result;
|
||||
use log::LevelFilter;
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::sqlite::SqlitePoolOptions;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Minimal thread metadata used by read-only state database audits.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadStateAuditRow {
|
||||
pub id: String,
|
||||
pub rollout_path: PathBuf,
|
||||
pub archived: bool,
|
||||
pub source: String,
|
||||
pub model_provider: String,
|
||||
}
|
||||
|
||||
/// Read persisted thread rows from a state DB without creating, migrating, or repairing it.
|
||||
pub async fn read_thread_state_audit_rows(path: &Path) -> Result<Vec<ThreadStateAuditRow>> {
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
.create_if_missing(false)
|
||||
.read_only(true)
|
||||
.log_statements(LevelFilter::Off);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(1)
|
||||
.connect_with(options)
|
||||
.await?;
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT id, rollout_path, archived, source, model_provider
|
||||
FROM threads
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&pool)
|
||||
.await?;
|
||||
pool.close().await;
|
||||
|
||||
rows.into_iter()
|
||||
.map(|row| {
|
||||
let archived: i64 = row.try_get("archived")?;
|
||||
Ok(ThreadStateAuditRow {
|
||||
id: row.try_get("id")?,
|
||||
rollout_path: PathBuf::from(row.try_get::<String, _>("rollout_path")?),
|
||||
archived: archived != 0,
|
||||
source: row.try_get("source")?,
|
||||
model_provider: row.try_get("model_provider")?,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@@ -4,6 +4,7 @@
|
||||
//! from JSONL rollouts and mirrors it into a local SQLite database. Backfill
|
||||
//! orchestration and rollout scanning live in `codex-core`.
|
||||
|
||||
mod audit;
|
||||
mod extract;
|
||||
pub mod log_db;
|
||||
mod migrations;
|
||||
@@ -19,6 +20,8 @@ pub use model::Phase2JobClaimOutcome;
|
||||
/// Preferred entrypoint: owns configuration and metrics.
|
||||
pub use runtime::StateRuntime;
|
||||
|
||||
pub use audit::ThreadStateAuditRow;
|
||||
pub use audit::read_thread_state_audit_rows;
|
||||
/// Low-level storage engine: useful for focused tests.
|
||||
///
|
||||
/// Most consumers should prefer [`StateRuntime`].
|
||||
|
||||
Reference in New Issue
Block a user