From 2d5a5a1248df14bf3dcc4102c1947b2cea386e60 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 11 May 2026 14:36:31 +0100 Subject: [PATCH] Add process-scoped SQLite telemetry Co-authored-by: Codex --- codex-rs/app-server/src/lib.rs | 28 +-- codex-rs/app-server/src/message_processor.rs | 3 +- codex-rs/app-server/src/request_processors.rs | 1 - .../thread_goal_processor.rs | 6 +- codex-rs/core/src/otel_init.rs | 15 ++ codex-rs/exec/src/lib.rs | 2 + codex-rs/mcp-server/src/lib.rs | 27 +-- codex-rs/mcp-server/src/message_processor.rs | 3 +- codex-rs/otel/src/metrics/mod.rs | 4 + codex-rs/otel/src/metrics/names.rs | 1 + codex-rs/otel/src/metrics/process.rs | 27 +++ codex-rs/otel/src/metrics/tags.rs | 26 +++ codex-rs/rollout/src/lib.rs | 2 + codex-rs/rollout/src/list.rs | 75 ++++--- codex-rs/rollout/src/recorder.rs | 10 + codex-rs/rollout/src/sqlite_metrics.rs | 40 ++++ codex-rs/rollout/src/state_db.rs | 58 ++++-- codex-rs/state/src/lib.rs | 12 ++ codex-rs/state/src/runtime.rs | 81 ++++++-- codex-rs/state/src/runtime/backfill.rs | 14 +- codex-rs/state/src/telemetry.rs | 193 ++++++++++++++++++ codex-rs/tui/src/lib.rs | 56 ++--- 22 files changed, 556 insertions(+), 128 deletions(-) create mode 100644 codex-rs/otel/src/metrics/process.rs create mode 100644 codex-rs/rollout/src/sqlite_metrics.rs create mode 100644 codex-rs/state/src/telemetry.rs diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 53f02b2068..ca98dc2eff 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -109,6 +109,7 @@ pub use crate::transport::auth::AppServerWebsocketAuthSettings; pub use crate::transport::auth::WebsocketAuthCliMode; const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT"; +const OTEL_SERVICE_NAME: &str = "codex-app-server"; #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum LogFormat { @@ -512,6 +513,20 @@ pub async fn run_main_with_transport_options( } }; + let otel = codex_core::otel_init::build_provider( + &config, + env!("CARGO_PKG_VERSION"), + Some(OTEL_SERVICE_NAME), + default_analytics_enabled, + ) + .map_err(|e| { + std::io::Error::new( + ErrorKind::InvalidData, + format!("error loading otel config: {e}"), + ) + })?; + codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME); + codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME); let state_db_result = rollout_state_db::try_init(&config).await; let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string); let state_db = state_db_result.ok(); @@ -591,19 +606,6 @@ pub async fn run_main_with_transport_options( let feedback = CodexFeedback::new(); - let otel = codex_core::otel_init::build_provider( - &config, - env!("CARGO_PKG_VERSION"), - Some("codex-app-server"), - default_analytics_enabled, - ) - .map_err(|e| { - std::io::Error::new( - ErrorKind::InvalidData, - format!("error loading otel config: {e}"), - ) - })?; - // Install a simple subscriber so `tracing` output is visible. Users can // control the log level with `RUST_LOG` and switch to JSON logs with // `LOG_FORMAT=json`. diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 8418d10b43..4897684564 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -65,7 +65,6 @@ use codex_arg0::Arg0DispatchPaths; use codex_chatgpt::workspace_settings; use codex_core::ThreadManager; use codex_core::config::Config; -use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_login::AuthManager; @@ -298,7 +297,7 @@ impl MessageProcessor { // The thread store is intentionally process-scoped. Config reloads can // affect per-thread behavior, but they must not move newly started, // resumed, or forked threads to a different persistence backend/root. - let thread_store = thread_store_from_config(config.as_ref(), state_db.clone()); + let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone()); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 9de844c6cd..f2e3444f88 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -277,7 +277,6 @@ use codex_core::exec::ExecCapturePolicy; use codex_core::exec::ExecExpiration; use codex_core::exec::ExecParams; use codex_core::exec_env::create_env; -use codex_core::find_thread_path_by_id_str; use codex_core::path_utils; #[cfg(test)] use codex_core::read_head_for_summary; diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index 0e12e44ce5..607ce60a73 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -107,7 +107,7 @@ impl ThreadGoalRequestProcessor { "ephemeral thread does not support goals: {thread_id}" )) })?, - None => find_thread_path_by_id_str( + None => codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), self.state_db.as_deref(), @@ -272,7 +272,7 @@ impl ThreadGoalRequestProcessor { "ephemeral thread does not support goals: {thread_id}" )) })?, - None => find_thread_path_by_id_str( + None => codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), self.state_db.as_deref(), @@ -336,7 +336,7 @@ impl ThreadGoalRequestProcessor { return Ok(state_db); } } else { - find_thread_path_by_id_str( + codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), self.state_db.as_deref(), diff --git a/codex-rs/core/src/otel_init.rs b/codex-rs/core/src/otel_init.rs index 0cd1f06994..8b5aac5f78 100644 --- a/codex-rs/core/src/otel_init.rs +++ b/codex-rs/core/src/otel_init.rs @@ -99,3 +99,18 @@ pub fn build_provider( pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool { meta.target().starts_with("codex_otel") } + +pub fn record_process_start(otel: Option<&OtelProvider>, originator: &str) { + let Some(metrics) = otel.and_then(OtelProvider::metrics) else { + return; + }; + let _ = codex_otel::record_process_start_once(metrics, originator); +} + +pub fn install_sqlite_telemetry(otel: Option<&OtelProvider>, originator: &str) { + let Some(metrics) = otel.and_then(OtelProvider::metrics) else { + return; + }; + let telemetry = codex_rollout::sqlite_telemetry_recorder(metrics.clone(), originator); + let _ = codex_state::install_process_db_telemetry(telemetry); +} diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index e68c96d00b..3d49c40cc0 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -478,6 +478,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result None } }; + codex_core::otel_init::record_process_start(otel.as_ref(), "codex_exec"); + codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), "codex_exec"); let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index aa560bbe6a..45daf99a01 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -74,19 +74,6 @@ pub async fn run_main( std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}")) })?; set_default_client_residency_requirement(config.enforce_residency.value()); - let state_db = codex_core::init_state_db(&config).await; - let environment_manager = Arc::new( - EnvironmentManager::from_codex_home( - config.codex_home.clone(), - ExecServerRuntimePaths::from_optional_paths( - arg0_paths.codex_self_exe.clone(), - arg0_paths.codex_linux_sandbox_exe.clone(), - )?, - ) - .await - .map_err(std::io::Error::other)?, - ); - let otel = codex_core::otel_init::build_provider( &config, env!("CARGO_PKG_VERSION"), @@ -99,6 +86,20 @@ pub async fn run_main( format!("error loading otel config: {e}"), ) })?; + codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME); + codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME); + let state_db = codex_core::init_state_db(&config).await; + let environment_manager = Arc::new( + EnvironmentManager::from_codex_home( + config.codex_home.clone(), + ExecServerRuntimePaths::from_optional_paths( + arg0_paths.codex_self_exe.clone(), + arg0_paths.codex_linux_sandbox_exe.clone(), + )?, + ) + .await + .map_err(std::io::Error::other)?, + ); let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 7a06bcc02c..9e536d930c 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -5,7 +5,6 @@ use codex_arg0::Arg0DispatchPaths; use codex_core::StateDbHandle; use codex_core::ThreadManager; use codex_core::config::Config; -use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; use codex_extension_api::empty_extension_registry; use codex_login::AuthManager; @@ -71,7 +70,7 @@ impl MessageProcessor { environment_manager, empty_extension_registry(), /*analytics_events_client*/ None, - thread_store_from_config(config.as_ref(), state_db.clone()), + codex_core::thread_store_from_config(config.as_ref(), state_db.clone()), state_db.clone(), installation_id, /*attestation_provider*/ None, diff --git a/codex-rs/otel/src/metrics/mod.rs b/codex-rs/otel/src/metrics/mod.rs index e75840bf37..07b7554224 100644 --- a/codex-rs/otel/src/metrics/mod.rs +++ b/codex-rs/otel/src/metrics/mod.rs @@ -2,6 +2,7 @@ mod client; mod config; mod error; pub(crate) mod names; +mod process; pub(crate) mod runtime_metrics; pub(crate) mod tags; pub(crate) mod timer; @@ -13,9 +14,12 @@ pub use crate::metrics::config::MetricsConfig; pub use crate::metrics::config::MetricsExporter; pub use crate::metrics::error::MetricsError; pub use crate::metrics::error::Result; +pub use crate::metrics::process::record_process_start_once; pub use names::*; use std::sync::OnceLock; +pub use tags::ORIGINATOR_TAG; pub use tags::SessionMetricTagValues; +pub use tags::bounded_originator_tag_value; static GLOBAL_METRICS: OnceLock = OnceLock::new(); static GLOBAL_STATSIG_METRICS_SETTINGS: OnceLock = OnceLock::new(); diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index b2f2d639ce..67ac6e19ed 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -1,6 +1,7 @@ pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call"; pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms"; pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec"; +pub const PROCESS_START_METRIC: &str = "codex.process.start"; pub const API_CALL_COUNT_METRIC: &str = "codex.api_request"; pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms"; pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event"; diff --git a/codex-rs/otel/src/metrics/process.rs b/codex-rs/otel/src/metrics/process.rs new file mode 100644 index 0000000000..6c7cd18b4e --- /dev/null +++ b/codex-rs/otel/src/metrics/process.rs @@ -0,0 +1,27 @@ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use super::client::MetricsClient; +use super::error::Result; +use super::names::PROCESS_START_METRIC; +use super::tags::ORIGINATOR_TAG; +use super::tags::bounded_originator_tag_value; + +static PROCESS_START_RECORDED: AtomicBool = AtomicBool::new(false); + +/// Record the process start counter at most once for this process. +pub fn record_process_start_once(metrics: &MetricsClient, originator: &str) -> Result { + if PROCESS_START_RECORDED + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + return Ok(false); + } + + metrics.counter( + PROCESS_START_METRIC, + /*inc*/ 1, + &[(ORIGINATOR_TAG, bounded_originator_tag_value(originator))], + )?; + Ok(true) +} diff --git a/codex-rs/otel/src/metrics/tags.rs b/codex-rs/otel/src/metrics/tags.rs index 5c166e307e..e1baf285ae 100644 --- a/codex-rs/otel/src/metrics/tags.rs +++ b/codex-rs/otel/src/metrics/tags.rs @@ -1,6 +1,7 @@ use crate::metrics::Result; use crate::metrics::validation::validate_tag_key; use crate::metrics::validation::validate_tag_value; +use codex_utils_string::sanitize_metric_tag_value; pub const APP_VERSION_TAG: &str = "app.version"; pub const AUTH_MODE_TAG: &str = "auth_mode"; @@ -9,6 +10,31 @@ pub const ORIGINATOR_TAG: &str = "originator"; pub const SERVICE_NAME_TAG: &str = "service_name"; pub const SESSION_SOURCE_TAG: &str = "session_source"; +const OTHER_ORIGINATOR_TAG_VALUE: &str = "other"; +const KNOWN_ORIGINATOR_TAG_VALUES: &[&str] = &[ + "codex_desktop", + "codex-app-server", + "codex_mcp_server", + "codex_cli_rs", + "codex-tui", + "codex_vscode", + "none", + "codex_exec", + "codex-cli", + "codex_sdk_ts", + "codex-app-server-sdk", +]; + +/// Return a known low-cardinality originator tag value, or `other`. +pub fn bounded_originator_tag_value(originator: &str) -> &'static str { + let sanitized = sanitize_metric_tag_value(originator); + KNOWN_ORIGINATOR_TAG_VALUES + .iter() + .copied() + .find(|known| *known == sanitized.as_str()) + .unwrap_or(OTHER_ORIGINATOR_TAG_VALUE) +} + pub struct SessionMetricTagValues<'a> { pub auth_mode: Option<&'a str>, pub session_source: &'a str, diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs index d65ddd3d5b..d594292bc1 100644 --- a/codex-rs/rollout/src/lib.rs +++ b/codex-rs/rollout/src/lib.rs @@ -10,6 +10,7 @@ pub(crate) mod metadata; pub(crate) mod policy; pub(crate) mod recorder; pub(crate) mod session_index; +mod sqlite_metrics; pub mod state_db; pub(crate) mod default_client { @@ -63,6 +64,7 @@ pub use session_index::find_thread_meta_by_name_str; pub use session_index::find_thread_name_by_id; pub use session_index::find_thread_names_by_ids; pub use state_db::StateDbHandle; +pub use state_db::sqlite_telemetry_recorder; #[cfg(test)] mod tests; diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index 8ba63a713e..9279418622 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1255,47 +1255,57 @@ async fn find_thread_path_by_id_str_in_subdir( }; let thread_id = ThreadId::from_string(id_str).ok(); let mut unverified_db_path = None; + let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable"); if let Some(state_db_ctx) = state_db_ctx && let Some(thread_id) = thread_id - && let Some(db_path) = state_db::find_rollout_path_by_id( - Some(state_db_ctx), - thread_id, - archived_only, - "find_path_query", - ) - .await { - if tokio::fs::try_exists(&db_path).await.unwrap_or(false) { - match read_session_meta_line(&db_path).await { - Ok(meta_line) if meta_line.meta.id == thread_id => { - return Ok(Some(db_path)); - } - Ok(meta_line) => { + match state_db_ctx + .find_rollout_path_by_id(thread_id, archived_only) + .await + { + Ok(Some(db_path)) => { + if tokio::fs::try_exists(&db_path).await.unwrap_or(false) { + match read_session_meta_line(&db_path).await { + Ok(meta_line) if meta_line.meta.id == thread_id => { + return Ok(Some(db_path)); + } + Ok(meta_line) => { + tracing::error!( + "state db returned rollout path for thread {id_str} but file belongs to thread {}: {}", + meta_line.meta.id, + db_path.display() + ); + tracing::warn!( + "state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path" + ); + codex_state::record_fallback(None, "find_thread_path", "mismatch"); + } + Err(err) => { + tracing::debug!( + "state db returned rollout path for thread {id_str} that could not be verified: {}: {err}", + db_path.display() + ); + unverified_db_path = Some(db_path); + } + } + } else { tracing::error!( - "state db returned rollout path for thread {id_str} but file belongs to thread {}: {}", - meta_line.meta.id, + "state db returned stale rollout path for thread {id_str}: {}", db_path.display() ); tracing::warn!( - "state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path" + "state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path" ); - } - Err(err) => { - tracing::debug!( - "state db returned rollout path for thread {id_str} that could not be verified: {}: {err}", - db_path.display() - ); - unverified_db_path = Some(db_path); + codex_state::record_fallback(None, "find_thread_path", "stale_path"); } } - } else { - tracing::error!( - "state db returned stale rollout path for thread {id_str}: {}", - db_path.display() - ); - tracing::warn!( - "state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path" - ); + Ok(None) => fallback_reason = Some("missing_row"), + Err(err) => { + tracing::warn!( + "state db find_rollout_path_by_id failed during find_path_query: {err}" + ); + fallback_reason = Some("db_error"); + } } } @@ -1323,6 +1333,9 @@ async fn find_thread_path_by_id_str_in_subdir( tracing::warn!( "state db discrepancy during find_thread_path_by_id_str_in_subdir: falling_back" ); + if let Some(reason) = fallback_reason { + codex_state::record_fallback(None, "find_thread_path", reason); + } state_db::read_repair_rollout_path( state_db_ctx, thread_id, diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 6b549b745c..a612bf0412 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -450,6 +450,7 @@ impl RolloutRecorder { if state_db_ctx.is_none() { // Keep legacy behavior when SQLite is unavailable: return filesystem results // at the requested page size. + codex_state::record_fallback(None, "list_threads", "db_unavailable"); return Ok(page_from_filesystem_scan( fs_page, sort_direction, @@ -558,6 +559,7 @@ impl RolloutRecorder { ) .await; } + codex_state::record_fallback(None, "list_threads", "metadata_filter"); let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); return Ok(fill_missing_thread_item_metadata_from_state_db( state_db_ctx.as_deref(), @@ -569,6 +571,7 @@ impl RolloutRecorder { } if listing_has_metadata_filters { let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); + codex_state::record_fallback(None, "list_threads", "db_error"); return Ok(fill_missing_thread_item_metadata_from_state_db( state_db_ctx.as_deref(), page, @@ -578,6 +581,7 @@ impl RolloutRecorder { // If SQLite listing still fails, return the filesystem page rather than failing the list. tracing::error!("Falling back on rollout system"); tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back"); + codex_state::record_fallback(None, "list_threads", "db_error"); Ok(page_from_filesystem_scan( fs_page, sort_direction, @@ -601,6 +605,7 @@ impl RolloutRecorder { ) -> std::io::Result> { let codex_home = config.codex_home(); let cwd_filter = filter_cwd.map(Path::to_path_buf); + let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable"); if state_db_ctx.is_some() { let mut db_cursor = cursor.cloned(); loop { @@ -619,6 +624,7 @@ impl RolloutRecorder { ) .await else { + fallback_reason = Some("db_error"); break; }; if let Some(path) = @@ -628,10 +634,14 @@ impl RolloutRecorder { } db_cursor = db_page.next_anchor.map(Into::into); if db_cursor.is_none() { + fallback_reason = Some("missing_row"); break; } } } + if let Some(reason) = fallback_reason { + codex_state::record_fallback(None, "find_latest_thread_path", reason); + } let mut cursor = cursor.cloned(); loop { diff --git a/codex-rs/rollout/src/sqlite_metrics.rs b/codex-rs/rollout/src/sqlite_metrics.rs new file mode 100644 index 0000000000..8ce5087092 --- /dev/null +++ b/codex-rs/rollout/src/sqlite_metrics.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; +use std::time::Duration; + +use codex_otel::ORIGINATOR_TAG; +use codex_otel::bounded_originator_tag_value; +use codex_state::DbTelemetry; +use codex_state::DbTelemetryHandle; + +struct OtelSqliteTelemetry { + metrics: codex_otel::MetricsClient, + originator: &'static str, +} + +impl DbTelemetry for OtelSqliteTelemetry { + fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) { + let tags = with_originator(tags, self.originator); + let _ = self.metrics.counter(name, inc, &tags); + } + + fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) { + let tags = with_originator(tags, self.originator); + let _ = self.metrics.record_duration(name, duration, &tags); + } +} + +pub(crate) fn recorder(metrics: codex_otel::MetricsClient, originator: &str) -> DbTelemetryHandle { + Arc::new(OtelSqliteTelemetry { + metrics, + originator: bounded_originator_tag_value(originator), + }) +} + +fn with_originator<'a>( + tags: &[(&'a str, &'a str)], + originator: &'static str, +) -> Vec<(&'a str, &'a str)> { + let mut tags = tags.to_vec(); + tags.push((ORIGINATOR_TAG, originator)); + tags +} diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index d039e16d68..9f90dc140c 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -4,6 +4,7 @@ use crate::list::Cursor; use crate::list::SortDirection; use crate::list::ThreadSortKey; use crate::metadata; +use crate::sqlite_metrics; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; @@ -115,6 +116,25 @@ async fn try_init_with_roots_inner( sqlite_home.display() ) })?; + let backfill_gate_started = Instant::now(); + let backfill_gate_result = wait_for_backfill_gate( + runtime.as_ref(), + codex_home.as_path(), + default_model_provider_id.as_str(), + backfill_lease_seconds, + ) + .await; + codex_state::record_backfill_gate(None, backfill_gate_started.elapsed(), &backfill_gate_result); + backfill_gate_result?; + Ok(runtime) +} + +async fn wait_for_backfill_gate( + runtime: &codex_state::StateRuntime, + codex_home: &Path, + default_model_provider_id: &str, + backfill_lease_seconds: Option, +) -> anyhow::Result<()> { let wait_started = Instant::now(); let mut reported_wait = false; loop { @@ -125,24 +145,19 @@ async fn try_init_with_roots_inner( ) })?; if backfill_state.status == codex_state::BackfillStatus::Complete { - return Ok(runtime); + return Ok(()); } if let Some(backfill_lease_seconds) = backfill_lease_seconds { metadata::backfill_sessions_with_lease( - runtime.as_ref(), - codex_home.as_path(), - default_model_provider_id.as_str(), + runtime, + codex_home, + default_model_provider_id, backfill_lease_seconds, ) .await; } else { - metadata::backfill_sessions( - runtime.as_ref(), - codex_home.as_path(), - default_model_provider_id.as_str(), - ) - .await; + metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await; } let backfill_state = runtime.get_backfill_state().await.map_err(|err| { anyhow::anyhow!( @@ -151,7 +166,7 @@ async fn try_init_with_roots_inner( ) })?; if backfill_state.status == codex_state::BackfillStatus::Complete { - return Ok(runtime); + return Ok(()); } if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT { return Err(anyhow::anyhow!( @@ -195,17 +210,32 @@ fn emit_startup_warning(message: &str) { pub async fn get_state_db(config: &impl RolloutConfigView) -> Option { let state_path = codex_state::state_db_path(config.sqlite_home()); if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { + codex_state::record_fallback(None, "get_state_db", "db_unavailable"); return None; } - let runtime = codex_state::StateRuntime::init( + let runtime = match codex_state::StateRuntime::init( config.sqlite_home().to_path_buf(), config.model_provider_id().to_string(), ) .await - .ok()?; + { + Ok(runtime) => runtime, + Err(_) => { + codex_state::record_fallback(None, "get_state_db", "db_error"); + return None; + } + }; require_backfill_complete(runtime, config.sqlite_home()).await } +/// Build a SQLite telemetry recorder backed by an OTEL metrics client. +pub fn sqlite_telemetry_recorder( + metrics: codex_otel::MetricsClient, + originator: &str, +) -> codex_state::DbTelemetryHandle { + sqlite_metrics::recorder(metrics, originator) +} + async fn require_backfill_complete( runtime: StateDbHandle, codex_home: &Path, @@ -218,6 +248,7 @@ async fn require_backfill_complete( codex_home.display(), state.status.as_str() ); + codex_state::record_fallback(None, "get_state_db", "backfill_incomplete"); None } Err(err) => { @@ -225,6 +256,7 @@ async fn require_backfill_complete( "failed to read backfill state at {}: {err}", codex_home.display() ); + codex_state::record_fallback(None, "get_state_db", "db_error"); None } } diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index ea9a2b089d..3c98a77978 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -10,6 +10,7 @@ mod migrations; mod model; mod paths; mod runtime; +mod telemetry; pub use model::LogEntry; pub use model::LogQuery; @@ -56,6 +57,11 @@ pub use runtime::logs_db_filename; pub use runtime::logs_db_path; pub use runtime::state_db_filename; pub use runtime::state_db_path; +pub use telemetry::DbTelemetry; +pub use telemetry::DbTelemetryHandle; +pub use telemetry::install_process_db_telemetry; +pub use telemetry::record_backfill_gate; +pub use telemetry::record_fallback; /// Environment variable for overriding the SQLite state database home directory. pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME"; @@ -69,3 +75,9 @@ pub const DB_ERROR_METRIC: &str = "codex.db.error"; pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill"; /// Metrics on backfill duration. Tags: [status] pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms"; +/// SQLite initialization attempts. Tags: [status, phase, db, error] +pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count"; +/// SQLite initialization latency. Tags: [status, phase, db, error] +pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms"; +/// Rollout fallback attempts. Tags: [caller, reason] +pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count"; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index c683847492..12d16bdfdc 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -25,6 +25,7 @@ use crate::model::datetime_to_epoch_millis; use crate::model::datetime_to_epoch_seconds; use crate::model::epoch_millis_to_datetime; use crate::paths::file_modified_time_utc; +use crate::telemetry::DbKind; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; @@ -50,6 +51,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::time::Duration; +use std::time::Instant; use tracing::warn; mod agent_jobs; @@ -112,10 +114,30 @@ impl StateRuntime { return Err(err); } }; - let thread_updated_at_millis: Option = + let started = Instant::now(); + let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await; + crate::telemetry::record_init_result( + None, + DbKind::State, + "ensure_backfill_state", + started.elapsed(), + &backfill_state_result, + ); + backfill_state_result?; + let started = Instant::now(); + let thread_updated_at_millis_result: anyhow::Result> = sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads") .fetch_one(pool.as_ref()) - .await?; + .await + .map_err(anyhow::Error::from); + crate::telemetry::record_init_result( + None, + DbKind::State, + "post_init_query", + started.elapsed(), + &thread_updated_at_millis_result, + ); + let thread_updated_at_millis = thread_updated_at_millis_result?; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); let runtime = Arc::new(Self { pool, @@ -153,25 +175,60 @@ async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result anyhow::Result { + open_sqlite(path, migrator, DbKind::Logs, "open_logs", "migrate_logs").await +} + +async fn open_sqlite( + path: &Path, + migrator: &Migrator, + db: DbKind, + open_phase: &'static str, + migrate_phase: &'static str, +) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); - let pool = SqlitePoolOptions::new() + let started = Instant::now(); + let pool_result = SqlitePoolOptions::new() .max_connections(5) .connect_with(options) - .await?; - migrator.run(&pool).await?; + .await + .map_err(anyhow::Error::from); + crate::telemetry::record_init_result(None, db, open_phase, started.elapsed(), &pool_result); + let pool = pool_result?; + let started = Instant::now(); + let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from); + crate::telemetry::record_init_result( + None, + db, + migrate_phase, + started.elapsed(), + &migrate_result, + ); + migrate_result?; Ok(pool) } +pub(super) async fn ensure_backfill_state_row_in_pool( + pool: &sqlx::SqlitePool, +) -> anyhow::Result<()> { + sqlx::query( + r#" +INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) +VALUES (?, ?, NULL, NULL, ?) +ON CONFLICT(id) DO NOTHING + "#, + ) + .bind(1_i64) + .bind(crate::BackfillStatus::Pending.as_str()) + .bind(Utc::now().timestamp()) + .execute(pool) + .await?; + Ok(()) +} + pub fn state_db_filename() -> String { STATE_DB_FILENAME.to_string() } diff --git a/codex-rs/state/src/runtime/backfill.rs b/codex-rs/state/src/runtime/backfill.rs index 2bfec0a88f..77ab57fced 100644 --- a/codex-rs/state/src/runtime/backfill.rs +++ b/codex-rs/state/src/runtime/backfill.rs @@ -103,19 +103,7 @@ WHERE id = 1 } async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> { - sqlx::query( - r#" -INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) -VALUES (?, ?, NULL, NULL, ?) -ON CONFLICT(id) DO NOTHING - "#, - ) - .bind(1_i64) - .bind(crate::BackfillStatus::Pending.as_str()) - .bind(Utc::now().timestamp()) - .execute(self.pool.as_ref()) - .await?; - Ok(()) + ensure_backfill_state_row_in_pool(self.pool.as_ref()).await } } diff --git a/codex-rs/state/src/telemetry.rs b/codex-rs/state/src/telemetry.rs new file mode 100644 index 0000000000..8db73bb672 --- /dev/null +++ b/codex-rs/state/src/telemetry.rs @@ -0,0 +1,193 @@ +use std::borrow::Cow; +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; + +use crate::DB_FALLBACK_METRIC; +use crate::DB_INIT_DURATION_METRIC; +use crate::DB_INIT_METRIC; + +/// Low-cardinality sink for SQLite startup and fallback telemetry. +/// +/// Implementations should absorb delivery failures locally. Database behavior +/// must not depend on whether telemetry export succeeds. +pub trait DbTelemetry: Send + Sync + 'static { + fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]); + fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]); +} + +pub type DbTelemetryHandle = Arc; + +static PROCESS_DB_TELEMETRY: OnceLock = OnceLock::new(); + +/// Install the process-wide SQLite telemetry sink. +/// +/// Startup owners should call this once after OTEL initialization. Low-level +/// database paths will use the registered sink unless a test passes an +/// explicit sink for that call. +pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool { + PROCESS_DB_TELEMETRY.set(telemetry).is_ok() +} + +#[derive(Clone, Copy)] +pub(crate) enum DbKind { + State, + Logs, +} + +impl DbKind { + fn as_str(self) -> &'static str { + match self { + Self::State => "state", + Self::Logs => "logs", + } + } +} + +pub(crate) fn record_init_result( + telemetry: Option<&dyn DbTelemetry>, + db: DbKind, + phase: &'static str, + duration: Duration, + result: &anyhow::Result, +) { + let outcome = DbOutcomeTags::from_result(result); + let tags = [ + ("status", outcome.status), + ("phase", phase), + ("db", db.as_str()), + ("error", outcome.error), + ]; + record_counter(telemetry, DB_INIT_METRIC, &tags); + record_duration(telemetry, DB_INIT_DURATION_METRIC, duration, &tags); +} + +pub fn record_backfill_gate( + telemetry: Option<&dyn DbTelemetry>, + duration: Duration, + result: &anyhow::Result<()>, +) { + record_init_result(telemetry, DbKind::State, "backfill_gate", duration, result); +} + +pub fn record_fallback( + telemetry: Option<&dyn DbTelemetry>, + caller: &'static str, + reason: &'static str, +) { + record_counter( + telemetry, + DB_FALLBACK_METRIC, + &[("caller", caller), ("reason", reason)], + ); +} + +fn record_counter(telemetry: Option<&dyn DbTelemetry>, name: &str, tags: &[(&str, &str)]) { + if let Some(telemetry) = resolve_telemetry(telemetry) { + telemetry.counter(name, /*inc*/ 1, tags); + } +} + +fn record_duration( + telemetry: Option<&dyn DbTelemetry>, + name: &str, + duration: Duration, + tags: &[(&str, &str)], +) { + if let Some(telemetry) = resolve_telemetry(telemetry) { + telemetry.record_duration(name, duration, tags); + } +} + +fn resolve_telemetry(telemetry: Option<&dyn DbTelemetry>) -> Option<&dyn DbTelemetry> { + telemetry.or_else(|| PROCESS_DB_TELEMETRY.get().map(AsRef::as_ref)) +} + +struct DbOutcomeTags { + status: &'static str, + error: &'static str, +} + +impl DbOutcomeTags { + fn from_result(result: &anyhow::Result) -> Self { + match result { + Ok(_) => Self { + status: "success", + error: "none", + }, + Err(err) => Self { + status: "failed", + error: classify_error(err), + }, + } + } +} + +fn classify_error(err: &anyhow::Error) -> &'static str { + for cause in err.chain() { + if let Some(sqlx_err) = cause.downcast_ref::() { + return classify_sqlx_error(sqlx_err); + } + if cause + .downcast_ref::() + .is_some() + { + return "migration"; + } + if cause.downcast_ref::().is_some() { + return "serde"; + } + if cause.downcast_ref::().is_some() { + return "io"; + } + } + "unknown" +} + +fn classify_sqlx_error(err: &sqlx::Error) -> &'static str { + match err { + sqlx::Error::Database(database_error) => { + let code = database_error + .code() + .unwrap_or(Cow::Borrowed("none")) + .to_string(); + classify_sqlite_code(code.as_str()) + } + sqlx::Error::PoolTimedOut => "pool_timeout", + sqlx::Error::Io(_) => "io", + sqlx::Error::ColumnDecode { source, .. } if source.is::() => "serde", + sqlx::Error::Decode(source) if source.is::() => "serde", + _ => "unknown", + } +} + +fn classify_sqlite_code(code: &str) -> &'static str { + // SQLite result codes are documented at https://www.sqlite.org/rescode.html. + // Extended codes preserve the primary code in the low byte. + let primary_code = code.parse::().ok().map(|code| code & 0xff); + match primary_code { + Some(5) => "busy", + Some(6) => "locked", + Some(8) => "readonly", + Some(10) => "io", + Some(11) => "corrupt", + Some(13) => "full", + Some(14) => "cantopen", + Some(17) => "schema", + Some(19) => "constraint", + _ => "unknown", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn classifies_extended_sqlite_codes() { + assert_eq!(classify_sqlite_code("5"), "busy"); + assert_eq!(classify_sqlite_code("6"), "locked"); + assert_eq!(classify_sqlite_code("2067"), "constraint"); + } +} diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index ba7460ca47..df84dfc45a 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -42,6 +42,7 @@ use codex_config::format_config_error_with_source; use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecServerRuntimePaths; use codex_login::AuthConfig; +use codex_login::default_client::originator; use codex_login::default_client::set_default_client_residency_requirement; use codex_login::enforce_login_restrictions; use codex_protocol::ThreadId; @@ -892,6 +893,36 @@ pub async fn run_main( ) .await; + let otel_originator = originator().value; + let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + crate::legacy_core::otel_init::build_provider( + &config, + env!("CARGO_PKG_VERSION"), + /*service_name_override*/ None, + /*default_analytics_enabled*/ true, + ) + })) { + Ok(Ok(otel)) => otel, + Ok(Err(e)) => { + #[allow(clippy::print_stderr)] + { + eprintln!("Could not create otel exporter: {e}"); + } + None + } + Err(_) => { + #[allow(clippy::print_stderr)] + { + eprintln!("Could not create otel exporter: panicked during initialization"); + } + None + } + }; + crate::legacy_core::otel_init::record_process_start(otel.as_ref(), otel_originator.as_str()); + crate::legacy_core::otel_init::install_sqlite_telemetry( + otel.as_ref(), + otel_originator.as_str(), + ); let state_db = match &app_server_target { AppServerTarget::Embedded => state_db::init(&config).await, AppServerTarget::Remote { .. } => state_db::get_state_db(&config).await, @@ -1034,31 +1065,6 @@ pub async fn run_main( ensure_oss_provider_ready(provider_id, &config).await?; } - let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - crate::legacy_core::otel_init::build_provider( - &config, - env!("CARGO_PKG_VERSION"), - /*service_name_override*/ None, - /*default_analytics_enabled*/ true, - ) - })) { - Ok(Ok(otel)) => otel, - Ok(Err(e)) => { - #[allow(clippy::print_stderr)] - { - eprintln!("Could not create otel exporter: {e}"); - } - None - } - Err(_) => { - #[allow(clippy::print_stderr)] - { - eprintln!("Could not create otel exporter: panicked during initialization"); - } - None - } - }; - let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());