mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
feat: add log db (#10086)
Add a log DB. The goal is just to store our logs in a `.sqlite` DB to make it easier to crawl them and drop the oldest ones.
This commit is contained in:
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1859,6 +1859,7 @@ dependencies = [
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -1897,6 +1898,7 @@ dependencies = [
|
||||
"codex-login",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"codex-state",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-pty",
|
||||
|
||||
@@ -12,6 +12,7 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::DB_METRIC_BACKFILL;
|
||||
pub use codex_state::LogEntry;
|
||||
use codex_state::STATE_DB_FILENAME;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use serde_json::Value;
|
||||
|
||||
@@ -14,6 +14,7 @@ serde_json = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
tokio = { workspace = true, features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
14
codex-rs/state/migrations/0002_logs.sql
Normal file
14
codex-rs/state/migrations/0002_logs.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
CREATE TABLE logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
ts_nanos INTEGER NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
message TEXT,
|
||||
fields_json TEXT NOT NULL,
|
||||
module_path TEXT,
|
||||
file TEXT,
|
||||
line INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
|
||||
@@ -5,11 +5,13 @@
|
||||
//! orchestration and rollout scanning live in `codex-core`.
|
||||
|
||||
mod extract;
|
||||
pub mod log_db;
|
||||
mod migrations;
|
||||
mod model;
|
||||
mod paths;
|
||||
mod runtime;
|
||||
|
||||
pub use model::LogEntry;
|
||||
/// Preferred entrypoint: owns configuration and metrics.
|
||||
pub use runtime::StateRuntime;
|
||||
|
||||
|
||||
162
codex-rs/state/src/log_db.rs
Normal file
162
codex-rs/state/src/log_db.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
//! Tracing log export into the state SQLite database.
|
||||
//!
|
||||
//! This module provides a `tracing_subscriber::Layer` that captures events and
|
||||
//! inserts them into the `logs` table in `state.sqlite`. The writer runs in a
|
||||
//! background task and batches inserts to keep logging overhead low.
|
||||
//!
|
||||
//! ## Usage
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use codex_state::log_db;
|
||||
//! use tracing_subscriber::prelude::*;
|
||||
//!
|
||||
//! # async fn example(state_db: std::sync::Arc<codex_state::StateRuntime>) {
|
||||
//! let layer = log_db::start(state_db);
|
||||
//! let _ = tracing_subscriber::registry()
|
||||
//! .with(layer)
|
||||
//! .try_init();
|
||||
//! # }
|
||||
//! ```
|
||||
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use serde_json::Value;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::Event;
|
||||
use tracing::field::Field;
|
||||
use tracing::field::Visit;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
use crate::LogEntry;
|
||||
use crate::StateRuntime;
|
||||
|
||||
const LOG_QUEUE_CAPACITY: usize = 512;
|
||||
const LOG_BATCH_SIZE: usize = 64;
|
||||
const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250);
|
||||
|
||||
pub struct LogDbLayer {
|
||||
sender: mpsc::Sender<LogEntry>,
|
||||
}
|
||||
|
||||
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
|
||||
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
|
||||
tokio::spawn(run_inserter(state_db, receiver));
|
||||
|
||||
LogDbLayer { sender }
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for LogDbLayer
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
|
||||
let metadata = event.metadata();
|
||||
let mut visitor = JsonVisitor::default();
|
||||
event.record(&mut visitor);
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| Duration::from_secs(0));
|
||||
let entry = LogEntry {
|
||||
ts: now.as_secs() as i64,
|
||||
ts_nanos: now.subsec_nanos() as i64,
|
||||
level: metadata.level().as_str().to_string(),
|
||||
target: metadata.target().to_string(),
|
||||
message: visitor.message,
|
||||
fields_json: Value::Object(visitor.fields).to_string(),
|
||||
module_path: metadata.module_path().map(ToString::to_string),
|
||||
file: metadata.file().map(ToString::to_string),
|
||||
line: metadata.line().map(|line| line as i64),
|
||||
};
|
||||
|
||||
let _ = self.sender.try_send(entry);
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_inserter(
|
||||
state_db: std::sync::Arc<StateRuntime>,
|
||||
mut receiver: mpsc::Receiver<LogEntry>,
|
||||
) {
|
||||
let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE);
|
||||
let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL);
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe_entry = receiver.recv() => {
|
||||
match maybe_entry {
|
||||
Some(entry) => {
|
||||
buffer.push(entry);
|
||||
if buffer.len() >= LOG_BATCH_SIZE {
|
||||
flush(&state_db, &mut buffer).await;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
flush(&state_db, &mut buffer).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
flush(&state_db, &mut buffer).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntry>) {
|
||||
if buffer.is_empty() {
|
||||
return;
|
||||
}
|
||||
let entries = buffer.split_off(0);
|
||||
let _ = state_db.insert_logs(entries.as_slice()).await;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct JsonVisitor {
|
||||
fields: serde_json::Map<String, Value>,
|
||||
message: Option<String>,
|
||||
}
|
||||
|
||||
impl JsonVisitor {
|
||||
fn record_value(&mut self, field: &Field, value: Value) {
|
||||
if field.name() == "message" && self.message.is_none() {
|
||||
self.message = Some(match &value {
|
||||
Value::String(message) => message.clone(),
|
||||
_ => value.to_string(),
|
||||
});
|
||||
}
|
||||
self.fields.insert(field.name().to_string(), value);
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for JsonVisitor {
|
||||
fn record_i64(&mut self, field: &Field, value: i64) {
|
||||
self.record_value(field, Value::from(value));
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &Field, value: u64) {
|
||||
self.record_value(field, Value::from(value));
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &Field, value: bool) {
|
||||
self.record_value(field, Value::from(value));
|
||||
}
|
||||
|
||||
fn record_f64(&mut self, field: &Field, value: f64) {
|
||||
self.record_value(field, Value::from(value));
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &Field, value: &str) {
|
||||
self.record_value(field, Value::from(value));
|
||||
}
|
||||
|
||||
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
|
||||
self.record_value(field, Value::from(value.to_string()));
|
||||
}
|
||||
|
||||
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
|
||||
self.record_value(field, Value::from(format!("{value:?}")));
|
||||
}
|
||||
}
|
||||
14
codex-rs/state/src/model/log.rs
Normal file
14
codex-rs/state/src/model/log.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct LogEntry {
|
||||
pub ts: i64,
|
||||
pub ts_nanos: i64,
|
||||
pub level: String,
|
||||
pub target: String,
|
||||
pub message: Option<String>,
|
||||
pub fields_json: String,
|
||||
pub module_path: Option<String>,
|
||||
pub file: Option<String>,
|
||||
pub line: Option<i64>,
|
||||
}
|
||||
15
codex-rs/state/src/model/mod.rs
Normal file
15
codex-rs/state/src/model/mod.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
mod log;
|
||||
mod thread_metadata;
|
||||
|
||||
pub use log::LogEntry;
|
||||
pub use thread_metadata::Anchor;
|
||||
pub use thread_metadata::BackfillStats;
|
||||
pub use thread_metadata::ExtractionOutcome;
|
||||
pub use thread_metadata::SortKey;
|
||||
pub use thread_metadata::ThreadMetadata;
|
||||
pub use thread_metadata::ThreadMetadataBuilder;
|
||||
pub use thread_metadata::ThreadsPage;
|
||||
|
||||
pub(crate) use thread_metadata::ThreadRow;
|
||||
pub(crate) use thread_metadata::anchor_from_item;
|
||||
pub(crate) use thread_metadata::datetime_to_epoch_seconds;
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::DB_ERROR_METRIC;
|
||||
use crate::LogEntry;
|
||||
use crate::SortKey;
|
||||
use crate::ThreadMetadata;
|
||||
use crate::ThreadMetadataBuilder;
|
||||
@@ -201,6 +202,35 @@ FROM threads
|
||||
})
|
||||
}
|
||||
|
||||
/// Insert one log entry into the logs table.
|
||||
pub async fn insert_log(&self, entry: &LogEntry) -> anyhow::Result<()> {
|
||||
self.insert_logs(std::slice::from_ref(entry)).await
|
||||
}
|
||||
|
||||
/// Insert a batch of log entries into the logs table.
|
||||
pub async fn insert_logs(&self, entries: &[LogEntry]) -> anyhow::Result<()> {
|
||||
if entries.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, fields_json, module_path, file, line) ",
|
||||
);
|
||||
builder.push_values(entries, |mut row, entry| {
|
||||
row.push_bind(entry.ts)
|
||||
.push_bind(entry.ts_nanos)
|
||||
.push_bind(&entry.level)
|
||||
.push_bind(&entry.target)
|
||||
.push_bind(&entry.message)
|
||||
.push_bind(&entry.fields_json)
|
||||
.push_bind(&entry.module_path)
|
||||
.push_bind(&entry.file)
|
||||
.push_bind(entry.line);
|
||||
});
|
||||
builder.build().execute(self.pool.as_ref()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List thread ids using the underlying database (no rollout scanning).
|
||||
pub async fn list_thread_ids(
|
||||
&self,
|
||||
|
||||
@@ -42,6 +42,7 @@ codex-file-search = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
crossterm = { workspace = true, features = ["bracketed-paste", "event-stream"] }
|
||||
|
||||
@@ -36,6 +36,7 @@ use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_state::log_db;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use cwd_prompt::CwdPromptAction;
|
||||
use cwd_prompt::CwdSelection;
|
||||
@@ -351,10 +352,15 @@ pub async fn run_main(
|
||||
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
|
||||
let log_db_layer = codex_core::state_db::init_if_enabled(&config, None)
|
||||
.await
|
||||
.map(|db| log_db::start(db).with_filter(env_filter()));
|
||||
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(file_layer)
|
||||
.with(feedback_layer)
|
||||
.with(feedback_metadata_layer)
|
||||
.with(log_db_layer)
|
||||
.with(otel_logger_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.try_init();
|
||||
|
||||
Reference in New Issue
Block a user