feat: improve thread listing (#11429)

Improve listing by doing:
1. List using the rollout file system
2. Upsert the result in the DB (if present)
3. Return the result of a DB listing
4. Fallback on the result of 1 

+ some metrics on top of this
This commit is contained in:
jif-oai
2026-02-11 11:22:05 +00:00
committed by GitHub
parent 2c5eeb6b1f
commit 3d0ead8db8
2 changed files with 248 additions and 35 deletions

View File

@@ -13,6 +13,7 @@ use codex_protocol::models::BaseInstructions;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
@@ -32,6 +33,8 @@ use super::list::ThreadSortKey;
use super::list::ThreadsPage;
use super::list::get_threads;
use super::list::get_threads_in_root;
use super::list::parse_cursor;
use super::list::parse_timestamp_uuid_from_filename;
use super::metadata;
use super::policy::is_persisted_response_item;
use crate::config::Config;
@@ -175,7 +178,55 @@ impl RolloutRecorder {
archived: bool,
) -> std::io::Result<ThreadsPage> {
let codex_home = config.codex_home.as_path();
// Filesystem-first listing intentionally overfetches so we can repair stale/missing
// SQLite rollout paths before the final DB-backed page is returned.
let fs_page_size = page_size.saturating_mul(2).max(page_size);
let fs_page = if archived {
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
get_threads_in_root(
root,
fs_page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
)
.await?
} else {
get_threads(
codex_home,
fs_page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
)
.await?
};
let state_db_ctx = state_db::get_state_db(config, None).await;
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
return Ok(truncate_fs_page(fs_page, page_size, sort_key));
}
// Warm the DB by repairing every filesystem hit before querying SQLite.
for item in &fs_page.items {
state_db::read_repair_rollout_path(
state_db_ctx.as_deref(),
item.thread_id,
Some(archived),
item.path.as_path(),
)
.await;
}
if let Some(db_page) = state_db::list_threads_db(
state_db_ctx.as_deref(),
codex_home,
@@ -190,36 +241,10 @@ impl RolloutRecorder {
{
return Ok(db_page.into());
}
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
state_db::record_discrepancy("list_threads_with_db_fallback", "falling_back");
if archived {
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
return get_threads_in_root(
root,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
)
.await;
}
get_threads(
codex_home,
page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
)
.await
Ok(truncate_fs_page(fs_page, page_size, sort_key))
}
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
@@ -529,6 +554,27 @@ impl RolloutRecorder {
}
}
fn truncate_fs_page(
mut page: ThreadsPage,
page_size: usize,
sort_key: ThreadSortKey,
) -> ThreadsPage {
if page.items.len() <= page_size {
return page;
}
page.items.truncate(page_size);
page.next_cursor = page.items.last().and_then(|item| {
let file_name = item.path.file_name()?.to_str()?;
let (created_at, id) = parse_timestamp_uuid_from_filename(file_name)?;
let cursor_token = match sort_key {
ThreadSortKey::CreatedAt => format!("{}|{id}", created_at.format(&Rfc3339).ok()?),
ThreadSortKey::UpdatedAt => format!("{}|{id}", item.updated_at.as_deref()?),
};
parse_cursor(cursor_token.as_str())
});
page
}
struct LogFileInfo {
/// Full path to the rollout file.
path: PathBuf,
@@ -895,10 +941,51 @@ fn cwd_matches(session_cwd: &Path, cwd: &Path) -> bool {
mod tests {
use super::*;
use crate::config::ConfigBuilder;
use crate::features::Feature;
use chrono::TimeZone;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use std::fs::File;
use std::fs::{self};
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use uuid::Uuid;
fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<PathBuf> {
let day_dir = root.join("sessions/2025/01/03");
fs::create_dir_all(&day_dir)?;
let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
let mut file = File::create(&path)?;
let meta = serde_json::json!({
"timestamp": ts,
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": ts,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "cli",
"model_provider": "test-provider",
},
});
writeln!(file, "{meta}")?;
let user_event = serde_json::json!({
"timestamp": ts,
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "Hello from user",
"kind": "plain",
},
});
writeln!(file, "{user_event}")?;
Ok(path)
}
#[tokio::test]
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
@@ -983,4 +1070,116 @@ mod tests {
recorder.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let mut config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
config.features.disable(Feature::Sqlite);
let newest = write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(9001))?;
let middle = write_session_file(home.path(), "2025-01-02T12-00-00", Uuid::from_u128(9002))?;
let _oldest =
write_session_file(home.path(), "2025-01-01T12-00-00", Uuid::from_u128(9003))?;
let default_provider = config.model_provider_id.clone();
let page1 = RolloutRecorder::list_threads(
&config,
1,
None,
ThreadSortKey::CreatedAt,
&[],
None,
default_provider.as_str(),
)
.await?;
assert_eq!(page1.items.len(), 1);
assert_eq!(page1.items[0].path, newest);
let cursor = page1.next_cursor.clone().expect("cursor should be present");
let page2 = RolloutRecorder::list_threads(
&config,
1,
Some(&cursor),
ThreadSortKey::CreatedAt,
&[],
None,
default_provider.as_str(),
)
.await?;
assert_eq!(page2.items.len(), 1);
assert_eq!(page2.items[0].path, middle);
Ok(())
}
#[tokio::test]
async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let mut config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
config.features.enable(Feature::Sqlite);
let uuid = Uuid::from_u128(9011);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let real_path = write_session_file(home.path(), "2025-01-03T13-00-00", uuid)?;
let stale_path = home.path().join(format!(
"sessions/2099/01/01/rollout-2099-01-01T00-00-00-{uuid}.jsonl"
));
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
None,
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(None)
.await
.expect("backfill should be complete");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 13, 0, 0)
.single()
.expect("valid datetime");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
stale_path,
created_at,
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = home.path().to_path_buf();
let mut metadata = builder.build(config.model_provider_id.as_str());
metadata.first_user_message = Some("Hello from user".to_string());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");
let default_provider = config.model_provider_id.clone();
let page = RolloutRecorder::list_threads(
&config,
1,
None,
ThreadSortKey::CreatedAt,
&[],
None,
default_provider.as_str(),
)
.await?;
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].path, real_path);
let repaired_path = runtime
.find_rollout_path_by_id(thread_id, Some(false))
.await
.expect("state db lookup should succeed");
assert_eq!(repaired_path, Some(real_path));
Ok(())
}
}

View File

@@ -395,21 +395,30 @@ pub async fn read_repair_rollout_path(
return;
};
// Fast path: update an existing metadata row in place, but avoid writes when
// read-repair computes no effective change.
let mut saw_existing_metadata = false;
if let Some(thread_id) = thread_id
&& let Ok(Some(mut metadata)) = ctx.get_thread(thread_id).await
&& let Ok(Some(metadata)) = ctx.get_thread(thread_id).await
{
metadata.rollout_path = rollout_path.to_path_buf();
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
saw_existing_metadata = true;
let mut repaired = metadata.clone();
repaired.rollout_path = rollout_path.to_path_buf();
repaired.cwd = normalize_cwd_for_state_db(&repaired.cwd);
match archived_only {
Some(true) if metadata.archived_at.is_none() => {
metadata.archived_at = Some(metadata.updated_at);
Some(true) if repaired.archived_at.is_none() => {
repaired.archived_at = Some(repaired.updated_at);
}
Some(false) => {
metadata.archived_at = None;
repaired.archived_at = None;
}
Some(true) | None => {}
}
if let Err(err) = ctx.upsert_thread(&metadata).await {
if repaired == metadata {
return;
}
record_discrepancy("read_repair_rollout_path", "upsert_needed");
if let Err(err) = ctx.upsert_thread(&repaired).await {
warn!(
"state db read-repair upsert failed for {}: {err}",
rollout_path.display()
@@ -419,6 +428,11 @@ pub async fn read_repair_rollout_path(
}
}
// Slow path: when the row is missing/unreadable (or direct upsert failed),
// rebuild metadata from rollout contents and reconcile it into SQLite.
if !saw_existing_metadata {
record_discrepancy("read_repair_rollout_path", "upsert_needed");
}
let default_provider = crate::rollout::list::read_session_meta_line(rollout_path)
.await
.ok()