diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 08cc850524..0ae6c60d42 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1859,6 +1859,7 @@ dependencies = [ "sqlx", "tokio", "tracing", + "tracing-subscriber", "uuid", ] @@ -1897,6 +1898,7 @@ dependencies = [ "codex-login", "codex-otel", "codex-protocol", + "codex-state", "codex-utils-absolute-path", "codex-utils-cargo-bin", "codex-utils-pty", diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index d859328e1c..74c9ed7383 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -12,6 +12,7 @@ use codex_protocol::ThreadId; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_state::DB_METRIC_BACKFILL; +pub use codex_state::LogEntry; use codex_state::STATE_DB_FILENAME; use codex_state::ThreadMetadataBuilder; use serde_json::Value; diff --git a/codex-rs/state/Cargo.toml b/codex-rs/state/Cargo.toml index 810e250ca7..b8c7fce0e3 100644 --- a/codex-rs/state/Cargo.toml +++ b/codex-rs/state/Cargo.toml @@ -14,6 +14,7 @@ serde_json = { workspace = true } sqlx = { workspace = true } tokio = { workspace = true, features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time"] } tracing = { workspace = true } +tracing-subscriber = { workspace = true } uuid = { workspace = true } [dev-dependencies] diff --git a/codex-rs/state/migrations/0002_logs.sql b/codex-rs/state/migrations/0002_logs.sql new file mode 100644 index 0000000000..4a193500b9 --- /dev/null +++ b/codex-rs/state/migrations/0002_logs.sql @@ -0,0 +1,14 @@ +CREATE TABLE logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + ts_nanos INTEGER NOT NULL, + level TEXT NOT NULL, + target TEXT NOT NULL, + message TEXT, + fields_json TEXT NOT NULL, + module_path TEXT, + file TEXT, + line INTEGER +); + +CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC); diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 67533a3791..91ebacea71 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -5,11 +5,13 @@ //! orchestration and rollout scanning live in `codex-core`. mod extract; +pub mod log_db; mod migrations; mod model; mod paths; mod runtime; +pub use model::LogEntry; /// Preferred entrypoint: owns configuration and metrics. pub use runtime::StateRuntime; diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs new file mode 100644 index 0000000000..c03b7977d3 --- /dev/null +++ b/codex-rs/state/src/log_db.rs @@ -0,0 +1,162 @@ +//! Tracing log export into the state SQLite database. +//! +//! This module provides a `tracing_subscriber::Layer` that captures events and +//! inserts them into the `logs` table in `state.sqlite`. The writer runs in a +//! background task and batches inserts to keep logging overhead low. +//! +//! ## Usage +//! +//! ```no_run +//! use codex_state::log_db; +//! use tracing_subscriber::prelude::*; +//! +//! # async fn example(state_db: std::sync::Arc) { +//! let layer = log_db::start(state_db); +//! let _ = tracing_subscriber::registry() +//! .with(layer) +//! .try_init(); +//! # } +//! ``` + +use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +use serde_json::Value; +use tokio::sync::mpsc; +use tracing::Event; +use tracing::field::Field; +use tracing::field::Visit; +use tracing_subscriber::Layer; +use tracing_subscriber::registry::LookupSpan; + +use crate::LogEntry; +use crate::StateRuntime; + +const LOG_QUEUE_CAPACITY: usize = 512; +const LOG_BATCH_SIZE: usize = 64; +const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250); + +pub struct LogDbLayer { + sender: mpsc::Sender, +} + +pub fn start(state_db: std::sync::Arc) -> LogDbLayer { + let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY); + tokio::spawn(run_inserter(state_db, receiver)); + + LogDbLayer { sender } +} + +impl Layer for LogDbLayer +where + S: tracing::Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { + let metadata = event.metadata(); + let mut visitor = JsonVisitor::default(); + event.record(&mut visitor); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)); + let entry = LogEntry { + ts: now.as_secs() as i64, + ts_nanos: now.subsec_nanos() as i64, + level: metadata.level().as_str().to_string(), + target: metadata.target().to_string(), + message: visitor.message, + fields_json: Value::Object(visitor.fields).to_string(), + module_path: metadata.module_path().map(ToString::to_string), + file: metadata.file().map(ToString::to_string), + line: metadata.line().map(|line| line as i64), + }; + + let _ = self.sender.try_send(entry); + } +} + +async fn run_inserter( + state_db: std::sync::Arc, + mut receiver: mpsc::Receiver, +) { + let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE); + let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL); + loop { + tokio::select! { + maybe_entry = receiver.recv() => { + match maybe_entry { + Some(entry) => { + buffer.push(entry); + if buffer.len() >= LOG_BATCH_SIZE { + flush(&state_db, &mut buffer).await; + } + } + None => { + flush(&state_db, &mut buffer).await; + break; + } + } + } + _ = ticker.tick() => { + flush(&state_db, &mut buffer).await; + } + } + } +} + +async fn flush(state_db: &std::sync::Arc, buffer: &mut Vec) { + if buffer.is_empty() { + return; + } + let entries = buffer.split_off(0); + let _ = state_db.insert_logs(entries.as_slice()).await; +} + +#[derive(Default)] +struct JsonVisitor { + fields: serde_json::Map, + message: Option, +} + +impl JsonVisitor { + fn record_value(&mut self, field: &Field, value: Value) { + if field.name() == "message" && self.message.is_none() { + self.message = Some(match &value { + Value::String(message) => message.clone(), + _ => value.to_string(), + }); + } + self.fields.insert(field.name().to_string(), value); + } +} + +impl Visit for JsonVisitor { + fn record_i64(&mut self, field: &Field, value: i64) { + self.record_value(field, Value::from(value)); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.record_value(field, Value::from(value)); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.record_value(field, Value::from(value)); + } + + fn record_f64(&mut self, field: &Field, value: f64) { + self.record_value(field, Value::from(value)); + } + + fn record_str(&mut self, field: &Field, value: &str) { + self.record_value(field, Value::from(value)); + } + + fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) { + self.record_value(field, Value::from(value.to_string())); + } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + self.record_value(field, Value::from(format!("{value:?}"))); + } +} diff --git a/codex-rs/state/src/model/log.rs b/codex-rs/state/src/model/log.rs new file mode 100644 index 0000000000..5662a3725c --- /dev/null +++ b/codex-rs/state/src/model/log.rs @@ -0,0 +1,14 @@ +use serde::Serialize; + +#[derive(Clone, Debug, Serialize)] +pub struct LogEntry { + pub ts: i64, + pub ts_nanos: i64, + pub level: String, + pub target: String, + pub message: Option, + pub fields_json: String, + pub module_path: Option, + pub file: Option, + pub line: Option, +} diff --git a/codex-rs/state/src/model/mod.rs b/codex-rs/state/src/model/mod.rs new file mode 100644 index 0000000000..4921e2357a --- /dev/null +++ b/codex-rs/state/src/model/mod.rs @@ -0,0 +1,15 @@ +mod log; +mod thread_metadata; + +pub use log::LogEntry; +pub use thread_metadata::Anchor; +pub use thread_metadata::BackfillStats; +pub use thread_metadata::ExtractionOutcome; +pub use thread_metadata::SortKey; +pub use thread_metadata::ThreadMetadata; +pub use thread_metadata::ThreadMetadataBuilder; +pub use thread_metadata::ThreadsPage; + +pub(crate) use thread_metadata::ThreadRow; +pub(crate) use thread_metadata::anchor_from_item; +pub(crate) use thread_metadata::datetime_to_epoch_seconds; diff --git a/codex-rs/state/src/model.rs b/codex-rs/state/src/model/thread_metadata.rs similarity index 100% rename from codex-rs/state/src/model.rs rename to codex-rs/state/src/model/thread_metadata.rs diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index f341ee3bb6..34add1347d 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -1,4 +1,5 @@ use crate::DB_ERROR_METRIC; +use crate::LogEntry; use crate::SortKey; use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; @@ -201,6 +202,35 @@ FROM threads }) } + /// Insert one log entry into the logs table. + pub async fn insert_log(&self, entry: &LogEntry) -> anyhow::Result<()> { + self.insert_logs(std::slice::from_ref(entry)).await + } + + /// Insert a batch of log entries into the logs table. + pub async fn insert_logs(&self, entries: &[LogEntry]) -> anyhow::Result<()> { + if entries.is_empty() { + return Ok(()); + } + + let mut builder = QueryBuilder::::new( + "INSERT INTO logs (ts, ts_nanos, level, target, message, fields_json, module_path, file, line) ", + ); + builder.push_values(entries, |mut row, entry| { + row.push_bind(entry.ts) + .push_bind(entry.ts_nanos) + .push_bind(&entry.level) + .push_bind(&entry.target) + .push_bind(&entry.message) + .push_bind(&entry.fields_json) + .push_bind(&entry.module_path) + .push_bind(&entry.file) + .push_bind(entry.line); + }); + builder.build().execute(self.pool.as_ref()).await?; + Ok(()) + } + /// List thread ids using the underlying database (no rollout scanning). pub async fn list_thread_ids( &self, diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 820fe20e93..6664e7f9c5 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -42,6 +42,7 @@ codex-file-search = { workspace = true } codex-login = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } +codex-state = { workspace = true } codex-utils-absolute-path = { workspace = true } color-eyre = { workspace = true } crossterm = { workspace = true, features = ["bracketed-paste", "event-stream"] } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index bced1e0653..a48b806129 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -36,6 +36,7 @@ use codex_protocol::config_types::SandboxMode; use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; +use codex_state::log_db; use codex_utils_absolute_path::AbsolutePathBuf; use cwd_prompt::CwdPromptAction; use cwd_prompt::CwdSelection; @@ -351,10 +352,15 @@ pub async fn run_main( let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); + let log_db_layer = codex_core::state_db::init_if_enabled(&config, None) + .await + .map(|db| log_db::start(db).with_filter(env_filter())); + let _ = tracing_subscriber::registry() .with(file_layer) .with(feedback_layer) .with(feedback_metadata_layer) + .with(log_db_layer) .with(otel_logger_layer) .with(otel_tracing_layer) .try_init();