chore: unify log queries (#10152)

Unify log queries to only have SQLX code in the runtime and use it for
both the log client and for tests
This commit is contained in:
jif-oai
2026-01-29 17:28:15 +01:00
committed by GitHub
parent d6631fb5a9
commit e6c4f548ab
9 changed files with 225 additions and 139 deletions

View File

@@ -5,11 +5,9 @@ CREATE TABLE logs (
level TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT,
thread_id TEXT,
module_path TEXT,
file TEXT,
line INTEGER
);
CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
CREATE INDEX idx_logs_thread_id ON logs(thread_id);

View File

@@ -0,0 +1,3 @@
ALTER TABLE logs ADD COLUMN thread_id TEXT;
CREATE INDEX idx_logs_thread_id ON logs(thread_id);

View File

@@ -1,4 +1,3 @@
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
@@ -7,15 +6,12 @@ use chrono::DateTime;
use chrono::SecondsFormat;
use chrono::Utc;
use clap::Parser;
use codex_state::LogQuery;
use codex_state::LogRow;
use codex_state::STATE_DB_FILENAME;
use codex_state::StateRuntime;
use dirs::home_dir;
use owo_colors::OwoColorize;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqlitePoolOptions;
#[derive(Debug, Parser)]
#[command(name = "codex-state-logs")]
@@ -62,18 +58,6 @@ struct Args {
poll_ms: u64,
}
#[derive(Debug, Clone, sqlx::FromRow)]
struct LogRow {
id: i64,
ts: i64,
ts_nanos: i64,
level: String,
message: Option<String>,
thread_id: Option<String>,
file: Option<String>,
line: Option<i64>,
}
#[derive(Debug, Clone)]
struct LogFilter {
level_upper: Option<String>,
@@ -89,16 +73,20 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let db_path = resolve_db_path(&args)?;
let filter = build_filter(&args)?;
let pool = open_read_only_pool(db_path.as_path()).await?;
let codex_home = db_path
.parent()
.map(ToOwned::to_owned)
.unwrap_or_else(|| PathBuf::from("."));
let runtime = StateRuntime::init(codex_home, "logs-client".to_string(), None).await?;
let mut last_id = print_backfill(&pool, &filter, args.backfill).await?;
let mut last_id = print_backfill(runtime.as_ref(), &filter, args.backfill).await?;
if last_id == 0 {
last_id = fetch_max_id(&pool, &filter).await?;
last_id = fetch_max_id(runtime.as_ref(), &filter).await?;
}
let poll_interval = Duration::from_millis(args.poll_ms);
loop {
let rows = fetch_new_rows(&pool, &filter, last_id).await?;
let rows = fetch_new_rows(runtime.as_ref(), &filter, last_id).await?;
for row in rows {
last_id = last_id.max(row.id);
println!("{}", format_row(&row));
@@ -159,23 +147,8 @@ fn parse_timestamp(value: &str) -> anyhow::Result<i64> {
Ok(dt.timestamp())
}
async fn open_read_only_pool(path: &Path) -> anyhow::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false)
.read_only(true)
.busy_timeout(Duration::from_secs(5));
let display = path.display();
SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await
.with_context(|| format!("failed to open sqlite db at {display}"))
}
async fn print_backfill(
pool: &SqlitePool,
runtime: &StateRuntime,
filter: &LogFilter,
backfill: usize,
) -> anyhow::Result<i64> {
@@ -183,7 +156,7 @@ async fn print_backfill(
return Ok(0);
}
let mut rows = fetch_backfill(pool, filter, backfill).await?;
let mut rows = fetch_backfill(runtime, filter, backfill).await?;
rows.reverse();
let mut last_id = 0;
@@ -195,86 +168,53 @@ async fn print_backfill(
}
async fn fetch_backfill(
pool: &SqlitePool,
runtime: &StateRuntime,
filter: &LogFilter,
backfill: usize,
) -> anyhow::Result<Vec<LogRow>> {
let mut builder = base_select_builder();
push_filters(&mut builder, filter);
builder.push(" ORDER BY id DESC");
builder.push(" LIMIT ").push_bind(backfill as i64);
builder
.build_query_as::<LogRow>()
.fetch_all(pool)
let query = to_log_query(filter, Some(backfill), None, true);
runtime
.query_logs(&query)
.await
.context("failed to fetch backfill logs")
}
async fn fetch_new_rows(
pool: &SqlitePool,
runtime: &StateRuntime,
filter: &LogFilter,
last_id: i64,
) -> anyhow::Result<Vec<LogRow>> {
let mut builder = base_select_builder();
push_filters(&mut builder, filter);
builder.push(" AND id > ").push_bind(last_id);
builder.push(" ORDER BY id ASC");
builder
.build_query_as::<LogRow>()
.fetch_all(pool)
let query = to_log_query(filter, None, Some(last_id), false);
runtime
.query_logs(&query)
.await
.context("failed to fetch new logs")
}
async fn fetch_max_id(pool: &SqlitePool, filter: &LogFilter) -> anyhow::Result<i64> {
let mut builder = QueryBuilder::<Sqlite>::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1");
push_filters(&mut builder, filter);
let row = builder
.build()
.fetch_one(pool)
async fn fetch_max_id(runtime: &StateRuntime, filter: &LogFilter) -> anyhow::Result<i64> {
let query = to_log_query(filter, None, None, false);
runtime
.max_log_id(&query)
.await
.context("failed to fetch max log id")?;
let max_id: Option<i64> = row.try_get("max_id")?;
Ok(max_id.unwrap_or(0))
.context("failed to fetch max log id")
}
fn base_select_builder<'a>() -> QueryBuilder<'a, Sqlite> {
QueryBuilder::<Sqlite>::new(
"SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1",
)
}
fn push_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, filter: &'a LogFilter) {
if let Some(level_upper) = filter.level_upper.as_ref() {
builder
.push(" AND UPPER(level) = ")
.push_bind(level_upper.as_str());
}
if let Some(from_ts) = filter.from_ts {
builder.push(" AND ts >= ").push_bind(from_ts);
}
if let Some(to_ts) = filter.to_ts {
builder.push(" AND ts <= ").push_bind(to_ts);
}
if let Some(module_like) = filter.module_like.as_ref() {
builder
.push(" AND module_path LIKE '%' || ")
.push_bind(module_like.as_str())
.push(" || '%'");
}
if let Some(file_like) = filter.file_like.as_ref() {
builder
.push(" AND file LIKE '%' || ")
.push_bind(file_like.as_str())
.push(" || '%'");
}
if let Some(thread_id) = filter.thread_id.as_ref() {
builder
.push(" AND thread_id = ")
.push_bind(thread_id.as_str());
fn to_log_query(
filter: &LogFilter,
limit: Option<usize>,
after_id: Option<i64>,
descending: bool,
) -> LogQuery {
LogQuery {
level_upper: filter.level_upper.clone(),
from_ts: filter.from_ts,
to_ts: filter.to_ts,
module_like: filter.module_like.clone(),
file_like: filter.file_like.clone(),
thread_id: filter.thread_id.clone(),
after_id,
limit,
descending,
}
}

View File

@@ -12,6 +12,8 @@ mod paths;
mod runtime;
pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;

View File

@@ -1,4 +1,5 @@
use serde::Serialize;
use sqlx::FromRow;
#[derive(Clone, Debug, Serialize)]
pub struct LogEntry {
@@ -12,3 +13,28 @@ pub struct LogEntry {
pub file: Option<String>,
pub line: Option<i64>,
}
#[derive(Clone, Debug, FromRow)]
pub struct LogRow {
pub id: i64,
pub ts: i64,
pub ts_nanos: i64,
pub level: String,
pub message: Option<String>,
pub thread_id: Option<String>,
pub file: Option<String>,
pub line: Option<i64>,
}
#[derive(Clone, Debug, Default)]
pub struct LogQuery {
pub level_upper: Option<String>,
pub from_ts: Option<i64>,
pub to_ts: Option<i64>,
pub module_like: Option<String>,
pub file_like: Option<String>,
pub thread_id: Option<String>,
pub after_id: Option<i64>,
pub limit: Option<usize>,
pub descending: bool,
}

View File

@@ -2,6 +2,8 @@ mod log;
mod thread_metadata;
pub use log::LogEntry;
pub use log::LogQuery;
pub use log::LogRow;
pub use thread_metadata::Anchor;
pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;

View File

@@ -1,5 +1,7 @@
use crate::DB_ERROR_METRIC;
use crate::LogEntry;
use crate::LogQuery;
use crate::LogRow;
use crate::SortKey;
use crate::ThreadMetadata;
use crate::ThreadMetadataBuilder;
@@ -239,6 +241,38 @@ FROM threads
Ok(result.rows_affected())
}
/// Query logs with optional filters.
pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result<Vec<LogRow>> {
let mut builder = QueryBuilder::<Sqlite>::new(
"SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1",
);
push_log_filters(&mut builder, query);
if query.descending {
builder.push(" ORDER BY id DESC");
} else {
builder.push(" ORDER BY id ASC");
}
if let Some(limit) = query.limit {
builder.push(" LIMIT ").push_bind(limit as i64);
}
let rows = builder
.build_query_as::<LogRow>()
.fetch_all(self.pool.as_ref())
.await?;
Ok(rows)
}
/// Return the max log id matching optional filters.
pub async fn max_log_id(&self, query: &LogQuery) -> anyhow::Result<i64> {
let mut builder =
QueryBuilder::<Sqlite>::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1");
push_log_filters(&mut builder, query);
let row = builder.build().fetch_one(self.pool.as_ref()).await?;
let max_id: Option<i64> = row.try_get("max_id")?;
Ok(max_id.unwrap_or(0))
}
/// List thread ids using the underlying database (no rollout scanning).
pub async fn list_thread_ids(
&self,
@@ -411,6 +445,40 @@ ON CONFLICT(id) DO UPDATE SET
}
}
fn push_log_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, query: &'a LogQuery) {
if let Some(level_upper) = query.level_upper.as_ref() {
builder
.push(" AND UPPER(level) = ")
.push_bind(level_upper.as_str());
}
if let Some(from_ts) = query.from_ts {
builder.push(" AND ts >= ").push_bind(from_ts);
}
if let Some(to_ts) = query.to_ts {
builder.push(" AND ts <= ").push_bind(to_ts);
}
if let Some(module_like) = query.module_like.as_ref() {
builder
.push(" AND module_path LIKE '%' || ")
.push_bind(module_like.as_str())
.push(" || '%'");
}
if let Some(file_like) = query.file_like.as_ref() {
builder
.push(" AND file LIKE '%' || ")
.push_bind(file_like.as_str())
.push(" || '%'");
}
if let Some(thread_id) = query.thread_id.as_ref() {
builder
.push(" AND thread_id = ")
.push_bind(thread_id.as_str());
}
if let Some(after_id) = query.after_id {
builder.push(" AND id > ").push_bind(after_id);
}
}
async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
.filename(path)