diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index c1366766e1..f655f65b91 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -104,6 +104,7 @@ pub use crate::transport::AppServerTransport; pub use crate::transport::app_server_control_socket_path; const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT"; +const OTEL_SERVICE_NAME: &str = "codex-app-server"; #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum LogFormat { @@ -503,6 +504,20 @@ pub async fn run_main_with_transport_options( } }; + let otel = codex_core::otel_init::build_provider( + &config, + env!("CARGO_PKG_VERSION"), + Some(OTEL_SERVICE_NAME), + default_analytics_enabled, + ) + .map_err(|e| { + std::io::Error::new( + ErrorKind::InvalidData, + format!("error loading otel config: {e}"), + ) + })?; + codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME); + codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME); let state_db_result = rollout_state_db::try_init(&config).await; let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string); let state_db = state_db_result.ok(); @@ -582,19 +597,6 @@ pub async fn run_main_with_transport_options( let feedback = CodexFeedback::new(); - let otel = codex_core::otel_init::build_provider( - &config, - env!("CARGO_PKG_VERSION"), - Some("codex-app-server"), - default_analytics_enabled, - ) - .map_err(|e| { - std::io::Error::new( - ErrorKind::InvalidData, - format!("error loading otel config: {e}"), - ) - })?; - // Install a simple subscriber so `tracing` output is visible. Users can // control the log level with `RUST_LOG` and switch to JSON logs with // `LOG_FORMAT=json`. diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 8418d10b43..4897684564 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -65,7 +65,6 @@ use codex_arg0::Arg0DispatchPaths; use codex_chatgpt::workspace_settings; use codex_core::ThreadManager; use codex_core::config::Config; -use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_login::AuthManager; @@ -298,7 +297,7 @@ impl MessageProcessor { // The thread store is intentionally process-scoped. Config reloads can // affect per-thread behavior, but they must not move newly started, // resumed, or forked threads to a different persistence backend/root. - let thread_store = thread_store_from_config(config.as_ref(), state_db.clone()); + let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone()); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 9de844c6cd..f2e3444f88 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -277,7 +277,6 @@ use codex_core::exec::ExecCapturePolicy; use codex_core::exec::ExecExpiration; use codex_core::exec::ExecParams; use codex_core::exec_env::create_env; -use codex_core::find_thread_path_by_id_str; use codex_core::path_utils; #[cfg(test)] use codex_core::read_head_for_summary; diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index e2acfb1732..57d2575590 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -107,7 +107,7 @@ impl ThreadGoalRequestProcessor { "ephemeral thread does not support goals: {thread_id}" )) })?, - None => find_thread_path_by_id_str( + None => codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), self.state_db.as_deref(), @@ -271,7 +271,7 @@ impl ThreadGoalRequestProcessor { "ephemeral thread does not support goals: {thread_id}" )) })?, - None => find_thread_path_by_id_str( + None => codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), self.state_db.as_deref(), @@ -335,7 +335,7 @@ impl ThreadGoalRequestProcessor { return Ok(state_db); } } else { - find_thread_path_by_id_str( + codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), self.state_db.as_deref(), diff --git a/codex-rs/core/src/otel_init.rs b/codex-rs/core/src/otel_init.rs index 0cd1f06994..8b5aac5f78 100644 --- a/codex-rs/core/src/otel_init.rs +++ b/codex-rs/core/src/otel_init.rs @@ -99,3 +99,18 @@ pub fn build_provider( pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool { meta.target().starts_with("codex_otel") } + +pub fn record_process_start(otel: Option<&OtelProvider>, originator: &str) { + let Some(metrics) = otel.and_then(OtelProvider::metrics) else { + return; + }; + let _ = codex_otel::record_process_start_once(metrics, originator); +} + +pub fn install_sqlite_telemetry(otel: Option<&OtelProvider>, originator: &str) { + let Some(metrics) = otel.and_then(OtelProvider::metrics) else { + return; + }; + let telemetry = codex_rollout::sqlite_telemetry_recorder(metrics.clone(), originator); + let _ = codex_state::install_process_db_telemetry(telemetry); +} diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index e68c96d00b..3d49c40cc0 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -478,6 +478,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result None } }; + codex_core::otel_init::record_process_start(otel.as_ref(), "codex_exec"); + codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), "codex_exec"); let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index aa560bbe6a..45daf99a01 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -74,19 +74,6 @@ pub async fn run_main( std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}")) })?; set_default_client_residency_requirement(config.enforce_residency.value()); - let state_db = codex_core::init_state_db(&config).await; - let environment_manager = Arc::new( - EnvironmentManager::from_codex_home( - config.codex_home.clone(), - ExecServerRuntimePaths::from_optional_paths( - arg0_paths.codex_self_exe.clone(), - arg0_paths.codex_linux_sandbox_exe.clone(), - )?, - ) - .await - .map_err(std::io::Error::other)?, - ); - let otel = codex_core::otel_init::build_provider( &config, env!("CARGO_PKG_VERSION"), @@ -99,6 +86,20 @@ pub async fn run_main( format!("error loading otel config: {e}"), ) })?; + codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME); + codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME); + let state_db = codex_core::init_state_db(&config).await; + let environment_manager = Arc::new( + EnvironmentManager::from_codex_home( + config.codex_home.clone(), + ExecServerRuntimePaths::from_optional_paths( + arg0_paths.codex_self_exe.clone(), + arg0_paths.codex_linux_sandbox_exe.clone(), + )?, + ) + .await + .map_err(std::io::Error::other)?, + ); let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 7a06bcc02c..9e536d930c 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -5,7 +5,6 @@ use codex_arg0::Arg0DispatchPaths; use codex_core::StateDbHandle; use codex_core::ThreadManager; use codex_core::config::Config; -use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; use codex_extension_api::empty_extension_registry; use codex_login::AuthManager; @@ -71,7 +70,7 @@ impl MessageProcessor { environment_manager, empty_extension_registry(), /*analytics_events_client*/ None, - thread_store_from_config(config.as_ref(), state_db.clone()), + codex_core::thread_store_from_config(config.as_ref(), state_db.clone()), state_db.clone(), installation_id, /*attestation_provider*/ None, diff --git a/codex-rs/otel/src/metrics/mod.rs b/codex-rs/otel/src/metrics/mod.rs index e75840bf37..07b7554224 100644 --- a/codex-rs/otel/src/metrics/mod.rs +++ b/codex-rs/otel/src/metrics/mod.rs @@ -2,6 +2,7 @@ mod client; mod config; mod error; pub(crate) mod names; +mod process; pub(crate) mod runtime_metrics; pub(crate) mod tags; pub(crate) mod timer; @@ -13,9 +14,12 @@ pub use crate::metrics::config::MetricsConfig; pub use crate::metrics::config::MetricsExporter; pub use crate::metrics::error::MetricsError; pub use crate::metrics::error::Result; +pub use crate::metrics::process::record_process_start_once; pub use names::*; use std::sync::OnceLock; +pub use tags::ORIGINATOR_TAG; pub use tags::SessionMetricTagValues; +pub use tags::bounded_originator_tag_value; static GLOBAL_METRICS: OnceLock = OnceLock::new(); static GLOBAL_STATSIG_METRICS_SETTINGS: OnceLock = OnceLock::new(); diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index b2f2d639ce..67ac6e19ed 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -1,6 +1,7 @@ pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call"; pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms"; pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec"; +pub const PROCESS_START_METRIC: &str = "codex.process.start"; pub const API_CALL_COUNT_METRIC: &str = "codex.api_request"; pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms"; pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event"; diff --git a/codex-rs/otel/src/metrics/process.rs b/codex-rs/otel/src/metrics/process.rs new file mode 100644 index 0000000000..6c7cd18b4e --- /dev/null +++ b/codex-rs/otel/src/metrics/process.rs @@ -0,0 +1,27 @@ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use super::client::MetricsClient; +use super::error::Result; +use super::names::PROCESS_START_METRIC; +use super::tags::ORIGINATOR_TAG; +use super::tags::bounded_originator_tag_value; + +static PROCESS_START_RECORDED: AtomicBool = AtomicBool::new(false); + +/// Record the process start counter at most once for this process. +pub fn record_process_start_once(metrics: &MetricsClient, originator: &str) -> Result { + if PROCESS_START_RECORDED + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + return Ok(false); + } + + metrics.counter( + PROCESS_START_METRIC, + /*inc*/ 1, + &[(ORIGINATOR_TAG, bounded_originator_tag_value(originator))], + )?; + Ok(true) +} diff --git a/codex-rs/otel/src/metrics/tags.rs b/codex-rs/otel/src/metrics/tags.rs index 5c166e307e..e1baf285ae 100644 --- a/codex-rs/otel/src/metrics/tags.rs +++ b/codex-rs/otel/src/metrics/tags.rs @@ -1,6 +1,7 @@ use crate::metrics::Result; use crate::metrics::validation::validate_tag_key; use crate::metrics::validation::validate_tag_value; +use codex_utils_string::sanitize_metric_tag_value; pub const APP_VERSION_TAG: &str = "app.version"; pub const AUTH_MODE_TAG: &str = "auth_mode"; @@ -9,6 +10,31 @@ pub const ORIGINATOR_TAG: &str = "originator"; pub const SERVICE_NAME_TAG: &str = "service_name"; pub const SESSION_SOURCE_TAG: &str = "session_source"; +const OTHER_ORIGINATOR_TAG_VALUE: &str = "other"; +const KNOWN_ORIGINATOR_TAG_VALUES: &[&str] = &[ + "codex_desktop", + "codex-app-server", + "codex_mcp_server", + "codex_cli_rs", + "codex-tui", + "codex_vscode", + "none", + "codex_exec", + "codex-cli", + "codex_sdk_ts", + "codex-app-server-sdk", +]; + +/// Return a known low-cardinality originator tag value, or `other`. +pub fn bounded_originator_tag_value(originator: &str) -> &'static str { + let sanitized = sanitize_metric_tag_value(originator); + KNOWN_ORIGINATOR_TAG_VALUES + .iter() + .copied() + .find(|known| *known == sanitized.as_str()) + .unwrap_or(OTHER_ORIGINATOR_TAG_VALUE) +} + pub struct SessionMetricTagValues<'a> { pub auth_mode: Option<&'a str>, pub session_source: &'a str, diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs index d65ddd3d5b..d594292bc1 100644 --- a/codex-rs/rollout/src/lib.rs +++ b/codex-rs/rollout/src/lib.rs @@ -10,6 +10,7 @@ pub(crate) mod metadata; pub(crate) mod policy; pub(crate) mod recorder; pub(crate) mod session_index; +mod sqlite_metrics; pub mod state_db; pub(crate) mod default_client { @@ -63,6 +64,7 @@ pub use session_index::find_thread_meta_by_name_str; pub use session_index::find_thread_name_by_id; pub use session_index::find_thread_names_by_ids; pub use state_db::StateDbHandle; +pub use state_db::sqlite_telemetry_recorder; #[cfg(test)] mod tests; diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index d5b353e32b..d4f0677898 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1289,47 +1289,65 @@ async fn find_thread_path_by_id_str_in_subdir( }; let thread_id = ThreadId::from_string(id_str).ok(); let mut unverified_db_path = None; + let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable"); if let Some(state_db_ctx) = state_db_ctx && let Some(thread_id) = thread_id - && let Some(db_path) = state_db::find_rollout_path_by_id( - Some(state_db_ctx), - thread_id, - archived_only, - "find_path_query", - ) - .await { - if tokio::fs::try_exists(&db_path).await.unwrap_or(false) { - match read_session_meta_line(&db_path).await { - Ok(meta_line) if meta_line.meta.id == thread_id => { - return Ok(Some(db_path)); - } - Ok(meta_line) => { + match state_db_ctx + .find_rollout_path_by_id(thread_id, archived_only) + .await + { + Ok(Some(db_path)) => { + if tokio::fs::try_exists(&db_path).await.unwrap_or(false) { + match read_session_meta_line(&db_path).await { + Ok(meta_line) if meta_line.meta.id == thread_id => { + return Ok(Some(db_path)); + } + Ok(meta_line) => { + tracing::error!( + "state db returned rollout path for thread {id_str} but file belongs to thread {}: {}", + meta_line.meta.id, + db_path.display() + ); + tracing::warn!( + "state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path" + ); + codex_state::record_fallback( + "find_thread_path", + "mismatch", + /*telemetry_override*/ None, + ); + } + Err(err) => { + tracing::debug!( + "state db returned rollout path for thread {id_str} that could not be verified: {}: {err}", + db_path.display() + ); + unverified_db_path = Some(db_path); + } + } + } else { tracing::error!( - "state db returned rollout path for thread {id_str} but file belongs to thread {}: {}", - meta_line.meta.id, + "state db returned stale rollout path for thread {id_str}: {}", db_path.display() ); tracing::warn!( - "state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path" + "state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path" ); - } - Err(err) => { - tracing::debug!( - "state db returned rollout path for thread {id_str} that could not be verified: {}: {err}", - db_path.display() + codex_state::record_fallback( + "find_thread_path", + "stale_path", + /*telemetry_override*/ None, ); - unverified_db_path = Some(db_path); } } - } else { - tracing::error!( - "state db returned stale rollout path for thread {id_str}: {}", - db_path.display() - ); - tracing::warn!( - "state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path" - ); + Ok(None) => fallback_reason = Some("missing_row"), + Err(err) => { + tracing::warn!( + "state db find_rollout_path_by_id failed during find_path_query: {err}" + ); + fallback_reason = Some("db_error"); + } } } @@ -1357,6 +1375,13 @@ async fn find_thread_path_by_id_str_in_subdir( tracing::warn!( "state db discrepancy during find_thread_path_by_id_str_in_subdir: falling_back" ); + if let Some(reason) = fallback_reason { + codex_state::record_fallback( + "find_thread_path", + reason, + /*telemetry_override*/ None, + ); + } state_db::read_repair_rollout_path( state_db_ctx, thread_id, diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 369c9f20a8..b4e0ab1e09 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -450,6 +450,11 @@ impl RolloutRecorder { if state_db_ctx.is_none() { // Keep legacy behavior when SQLite is unavailable: return filesystem results // at the requested page size. + codex_state::record_fallback( + "list_threads", + "db_unavailable", + /*telemetry_override*/ None, + ); return Ok(page_from_filesystem_scan( fs_page, sort_direction, @@ -558,6 +563,11 @@ impl RolloutRecorder { ) .await; } + codex_state::record_fallback( + "list_threads", + "metadata_filter", + /*telemetry_override*/ None, + ); let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); return Ok(fill_missing_thread_item_metadata_from_state_db( state_db_ctx.as_deref(), @@ -569,6 +579,11 @@ impl RolloutRecorder { } if listing_has_metadata_filters { let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); + codex_state::record_fallback( + "list_threads", + "db_error", + /*telemetry_override*/ None, + ); return Ok(fill_missing_thread_item_metadata_from_state_db( state_db_ctx.as_deref(), page, @@ -578,6 +593,7 @@ impl RolloutRecorder { // If SQLite listing still fails, return the filesystem page rather than failing the list. tracing::error!("Falling back on rollout system"); tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back"); + codex_state::record_fallback("list_threads", "db_error", /*telemetry_override*/ None); Ok(page_from_filesystem_scan( fs_page, sort_direction, @@ -601,6 +617,7 @@ impl RolloutRecorder { ) -> std::io::Result> { let codex_home = config.codex_home(); let cwd_filter = filter_cwd.map(Path::to_path_buf); + let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable"); if state_db_ctx.is_some() { let mut db_cursor = cursor.cloned(); loop { @@ -619,6 +636,7 @@ impl RolloutRecorder { ) .await else { + fallback_reason = Some("db_error"); break; }; if let Some(path) = @@ -628,10 +646,18 @@ impl RolloutRecorder { } db_cursor = db_page.next_anchor.map(Into::into); if db_cursor.is_none() { + fallback_reason = Some("missing_row"); break; } } } + if let Some(reason) = fallback_reason { + codex_state::record_fallback( + "find_latest_thread_path", + reason, + /*telemetry_override*/ None, + ); + } let mut cursor = cursor.cloned(); loop { diff --git a/codex-rs/rollout/src/sqlite_metrics.rs b/codex-rs/rollout/src/sqlite_metrics.rs new file mode 100644 index 0000000000..7601e79729 --- /dev/null +++ b/codex-rs/rollout/src/sqlite_metrics.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; +use std::time::Duration; + +use codex_otel::ORIGINATOR_TAG; +use codex_otel::bounded_originator_tag_value; +use codex_state::DbTelemetry; +use codex_state::DbTelemetryHandle; + +struct OtelDbTelemetry { + metrics: codex_otel::MetricsClient, + originator: &'static str, +} + +impl DbTelemetry for OtelDbTelemetry { + fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) { + let tags = with_originator(tags, self.originator); + let _ = self.metrics.counter(name, inc, &tags); + } + + fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) { + let tags = with_originator(tags, self.originator); + let _ = self.metrics.record_duration(name, duration, &tags); + } +} + +pub(crate) fn recorder(metrics: codex_otel::MetricsClient, originator: &str) -> DbTelemetryHandle { + Arc::new(OtelDbTelemetry { + metrics, + originator: bounded_originator_tag_value(originator), + }) +} + +fn with_originator<'a>( + tags: &[(&'a str, &'a str)], + originator: &'static str, +) -> Vec<(&'a str, &'a str)> { + let mut tags = tags.to_vec(); + tags.push((ORIGINATOR_TAG, originator)); + tags +} diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index d039e16d68..3bd8dd8e01 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -4,6 +4,7 @@ use crate::list::Cursor; use crate::list::SortDirection; use crate::list::ThreadSortKey; use crate::metadata; +use crate::sqlite_metrics; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; @@ -115,6 +116,29 @@ async fn try_init_with_roots_inner( sqlite_home.display() ) })?; + let backfill_gate_started = Instant::now(); + let backfill_gate_result = wait_for_backfill_gate( + runtime.as_ref(), + codex_home.as_path(), + default_model_provider_id.as_str(), + backfill_lease_seconds, + ) + .await; + codex_state::record_backfill_gate( + /*telemetry*/ None, + backfill_gate_started.elapsed(), + &backfill_gate_result, + ); + backfill_gate_result?; + Ok(runtime) +} + +async fn wait_for_backfill_gate( + runtime: &codex_state::StateRuntime, + codex_home: &Path, + default_model_provider_id: &str, + backfill_lease_seconds: Option, +) -> anyhow::Result<()> { let wait_started = Instant::now(); let mut reported_wait = false; loop { @@ -125,24 +149,19 @@ async fn try_init_with_roots_inner( ) })?; if backfill_state.status == codex_state::BackfillStatus::Complete { - return Ok(runtime); + return Ok(()); } if let Some(backfill_lease_seconds) = backfill_lease_seconds { metadata::backfill_sessions_with_lease( - runtime.as_ref(), - codex_home.as_path(), - default_model_provider_id.as_str(), + runtime, + codex_home, + default_model_provider_id, backfill_lease_seconds, ) .await; } else { - metadata::backfill_sessions( - runtime.as_ref(), - codex_home.as_path(), - default_model_provider_id.as_str(), - ) - .await; + metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await; } let backfill_state = runtime.get_backfill_state().await.map_err(|err| { anyhow::anyhow!( @@ -151,7 +170,7 @@ async fn try_init_with_roots_inner( ) })?; if backfill_state.status == codex_state::BackfillStatus::Complete { - return Ok(runtime); + return Ok(()); } if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT { return Err(anyhow::anyhow!( @@ -195,17 +214,40 @@ fn emit_startup_warning(message: &str) { pub async fn get_state_db(config: &impl RolloutConfigView) -> Option { let state_path = codex_state::state_db_path(config.sqlite_home()); if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { + codex_state::record_fallback( + "get_state_db", + "db_unavailable", + /*telemetry_override*/ None, + ); return None; } - let runtime = codex_state::StateRuntime::init( + let runtime = match codex_state::StateRuntime::init( config.sqlite_home().to_path_buf(), config.model_provider_id().to_string(), ) .await - .ok()?; + { + Ok(runtime) => runtime, + Err(_) => { + codex_state::record_fallback( + "get_state_db", + "db_error", + /*telemetry_override*/ None, + ); + return None; + } + }; require_backfill_complete(runtime, config.sqlite_home()).await } +/// Build a SQLite telemetry recorder backed by an OTEL metrics client. +pub fn sqlite_telemetry_recorder( + metrics: codex_otel::MetricsClient, + originator: &str, +) -> codex_state::DbTelemetryHandle { + sqlite_metrics::recorder(metrics, originator) +} + async fn require_backfill_complete( runtime: StateDbHandle, codex_home: &Path, @@ -218,6 +260,11 @@ async fn require_backfill_complete( codex_home.display(), state.status.as_str() ); + codex_state::record_fallback( + "get_state_db", + "backfill_incomplete", + /*telemetry_override*/ None, + ); None } Err(err) => { @@ -225,6 +272,11 @@ async fn require_backfill_complete( "failed to read backfill state at {}: {err}", codex_home.display() ); + codex_state::record_fallback( + "get_state_db", + "db_error", + /*telemetry_override*/ None, + ); None } } diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index ea9a2b089d..3c98a77978 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -10,6 +10,7 @@ mod migrations; mod model; mod paths; mod runtime; +mod telemetry; pub use model::LogEntry; pub use model::LogQuery; @@ -56,6 +57,11 @@ pub use runtime::logs_db_filename; pub use runtime::logs_db_path; pub use runtime::state_db_filename; pub use runtime::state_db_path; +pub use telemetry::DbTelemetry; +pub use telemetry::DbTelemetryHandle; +pub use telemetry::install_process_db_telemetry; +pub use telemetry::record_backfill_gate; +pub use telemetry::record_fallback; /// Environment variable for overriding the SQLite state database home directory. pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME"; @@ -69,3 +75,9 @@ pub const DB_ERROR_METRIC: &str = "codex.db.error"; pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill"; /// Metrics on backfill duration. Tags: [status] pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms"; +/// SQLite initialization attempts. Tags: [status, phase, db, error] +pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count"; +/// SQLite initialization latency. Tags: [status, phase, db, error] +pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms"; +/// Rollout fallback attempts. Tags: [caller, reason] +pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count"; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index c683847492..a5748da9eb 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -25,6 +25,8 @@ use crate::model::datetime_to_epoch_millis; use crate::model::datetime_to_epoch_seconds; use crate::model::epoch_millis_to_datetime; use crate::paths::file_modified_time_utc; +use crate::telemetry::DbKind; +use crate::telemetry::DbTelemetry; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; @@ -50,6 +52,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::time::Duration; +use std::time::Instant; use tracing::warn; mod agent_jobs; @@ -93,29 +96,72 @@ impl StateRuntime { /// keeping logs in a dedicated file to reduce lock contention with the /// rest of the state store. pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result> { + Self::init_inner( + codex_home, + default_provider, + /*telemetry_override*/ None, + ) + .await + } + + #[cfg(test)] + pub(crate) async fn init_with_telemetry_for_tests( + codex_home: PathBuf, + default_provider: String, + telemetry_override: &dyn DbTelemetry, + ) -> anyhow::Result> { + Self::init_inner(codex_home, default_provider, Some(telemetry_override)).await + } + + async fn init_inner( + codex_home: PathBuf, + default_provider: String, + telemetry_override: Option<&dyn DbTelemetry>, + ) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; let state_migrator = runtime_state_migrator(); let logs_migrator = runtime_logs_migrator(); let state_path = state_db_path(codex_home.as_path()); let logs_path = logs_db_path(codex_home.as_path()); - let pool = match open_state_sqlite(&state_path, &state_migrator).await { + let pool = match open_state_sqlite(&state_path, &state_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); return Err(err); } }; - let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await { + let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator, telemetry_override).await + { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open logs db at {}: {err}", logs_path.display()); return Err(err); } }; - let thread_updated_at_millis: Option = + let started = Instant::now(); + let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await; + crate::telemetry::record_init_result( + telemetry_override, + DbKind::State, + "ensure_backfill_state", + started.elapsed(), + &backfill_state_result, + ); + backfill_state_result?; + let started = Instant::now(); + let thread_updated_at_millis_result: anyhow::Result> = sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads") .fetch_one(pool.as_ref()) - .await?; + .await + .map_err(anyhow::Error::from); + crate::telemetry::record_init_result( + telemetry_override, + DbKind::State, + "post_init_query", + started.elapsed(), + &thread_updated_at_millis_result, + ); + let thread_updated_at_millis = thread_updated_at_millis_result?; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); let runtime = Arc::new(Self { pool, @@ -149,27 +195,93 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { .log_statements(LevelFilter::Off) } -async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result { +async fn open_state_sqlite( + path: &Path, + migrator: &Migrator, + telemetry_override: Option<&dyn DbTelemetry>, +) -> anyhow::Result { // New state DBs should use incremental auto-vacuum, but retrofitting an // existing DB requires a full VACUUM. Do not attempt that during process // startup: it is maintenance work that can contend with foreground writers. + open_sqlite( + path, + migrator, + DbKind::State, + "open_state", + "migrate_state", + telemetry_override, + ) + .await +} + +async fn open_logs_sqlite( + path: &Path, + migrator: &Migrator, + telemetry_override: Option<&dyn DbTelemetry>, +) -> anyhow::Result { + open_sqlite( + path, + migrator, + DbKind::Logs, + "open_logs", + "migrate_logs", + telemetry_override, + ) + .await +} + +async fn open_sqlite( + path: &Path, + migrator: &Migrator, + db: DbKind, + open_phase: &'static str, + migrate_phase: &'static str, + telemetry_override: Option<&dyn DbTelemetry>, +) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); - let pool = SqlitePoolOptions::new() + let started = Instant::now(); + let pool_result = SqlitePoolOptions::new() .max_connections(5) .connect_with(options) - .await?; - migrator.run(&pool).await?; + .await + .map_err(anyhow::Error::from); + crate::telemetry::record_init_result( + telemetry_override, + db, + open_phase, + started.elapsed(), + &pool_result, + ); + let pool = pool_result?; + let started = Instant::now(); + let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from); + crate::telemetry::record_init_result( + telemetry_override, + db, + migrate_phase, + started.elapsed(), + &migrate_result, + ); + migrate_result?; Ok(pool) } -async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result { - let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); - let pool = SqlitePoolOptions::new() - .max_connections(5) - .connect_with(options) - .await?; - migrator.run(&pool).await?; - Ok(pool) +pub(super) async fn ensure_backfill_state_row_in_pool( + pool: &sqlx::SqlitePool, +) -> anyhow::Result<()> { + sqlx::query( + r#" +INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) +VALUES (?, ?, NULL, NULL, ?) +ON CONFLICT(id) DO NOTHING + "#, + ) + .bind(1_i64) + .bind(crate::BackfillStatus::Pending.as_str()) + .bind(Utc::now().timestamp()) + .execute(pool) + .await?; + Ok(()) } pub fn state_db_filename() -> String { @@ -190,15 +302,73 @@ pub fn logs_db_path(codex_home: &Path) -> PathBuf { #[cfg(test)] mod tests { + use super::StateRuntime; use super::open_state_sqlite; use super::runtime_state_migrator; use super::state_db_path; use super::test_support::unique_temp_dir; + use crate::DB_INIT_METRIC; + use crate::DbTelemetry; use crate::migrations::STATE_MIGRATOR; + use pretty_assertions::assert_eq; use sqlx::SqlitePool; use sqlx::migrate::MigrateError; use sqlx::sqlite::SqliteConnectOptions; + use std::collections::BTreeMap; + use std::collections::BTreeSet; use std::path::Path; + use std::sync::Mutex; + + #[derive(Default)] + struct TestTelemetry { + counters: Mutex>, + } + + #[derive(Debug, Eq, PartialEq)] + struct MetricEvent { + name: String, + tags: BTreeMap, + } + + impl TestTelemetry { + fn counters(&self) -> Vec { + self.counters + .lock() + .expect("telemetry lock") + .iter() + .map(|event| MetricEvent { + name: event.name.clone(), + tags: event.tags.clone(), + }) + .collect() + } + } + + impl DbTelemetry for TestTelemetry { + fn counter(&self, name: &str, _inc: i64, tags: &[(&str, &str)]) { + self.counters + .lock() + .expect("telemetry lock") + .push(MetricEvent { + name: name.to_string(), + tags: tags_to_map(tags), + }); + } + + fn record_duration( + &self, + _name: &str, + _duration: std::time::Duration, + _tags: &[(&str, &str)], + ) { + } + } + + fn tags_to_map(tags: &[(&str, &str)]) -> BTreeMap { + tags.iter() + .map(|(key, value)| ((*key).to_string(), (*value).to_string())) + .collect() + } async fn open_db_pool(path: &Path) -> SqlitePool { SqlitePool::connect_with( @@ -250,11 +420,53 @@ mod tests { strict_pool.close().await; let tolerant_migrator = runtime_state_migrator(); - let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator) - .await - .expect("runtime migrator should tolerate newer applied migrations"); + let tolerant_pool = open_state_sqlite( + state_path.as_path(), + &tolerant_migrator, + /*telemetry_override*/ None, + ) + .await + .expect("runtime migrator should tolerate newer applied migrations"); tolerant_pool.close().await; let _ = tokio::fs::remove_dir_all(codex_home).await; } + + #[tokio::test] + async fn init_records_successful_sqlite_init_phases_to_explicit_telemetry() { + let codex_home = unique_temp_dir(); + let telemetry = TestTelemetry::default(); + + let runtime = StateRuntime::init_with_telemetry_for_tests( + codex_home.clone(), + "test-provider".to_string(), + &telemetry, + ) + .await + .expect("state runtime should initialize"); + + let phases = telemetry + .counters() + .into_iter() + .filter(|event| event.name == DB_INIT_METRIC) + .filter(|event| event.tags.get("status").map(String::as_str) == Some("success")) + .filter_map(|event| event.tags.get("phase").cloned()) + .collect::>(); + let expected = [ + "open_state", + "migrate_state", + "open_logs", + "migrate_logs", + "ensure_backfill_state", + "post_init_query", + ] + .into_iter() + .map(str::to_string) + .collect::>(); + assert_eq!(phases, expected); + + runtime.pool.close().await; + runtime.logs_pool.close().await; + let _ = tokio::fs::remove_dir_all(codex_home).await; + } } diff --git a/codex-rs/state/src/runtime/backfill.rs b/codex-rs/state/src/runtime/backfill.rs index 2bfec0a88f..77ab57fced 100644 --- a/codex-rs/state/src/runtime/backfill.rs +++ b/codex-rs/state/src/runtime/backfill.rs @@ -103,19 +103,7 @@ WHERE id = 1 } async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> { - sqlx::query( - r#" -INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) -VALUES (?, ?, NULL, NULL, ?) -ON CONFLICT(id) DO NOTHING - "#, - ) - .bind(1_i64) - .bind(crate::BackfillStatus::Pending.as_str()) - .bind(Utc::now().timestamp()) - .execute(self.pool.as_ref()) - .await?; - Ok(()) + ensure_backfill_state_row_in_pool(self.pool.as_ref()).await } } diff --git a/codex-rs/state/src/telemetry.rs b/codex-rs/state/src/telemetry.rs new file mode 100644 index 0000000000..a4a26db0ce --- /dev/null +++ b/codex-rs/state/src/telemetry.rs @@ -0,0 +1,199 @@ +use std::borrow::Cow; +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; + +use crate::DB_FALLBACK_METRIC; +use crate::DB_INIT_DURATION_METRIC; +use crate::DB_INIT_METRIC; +use tracing::debug; + +/// Low-cardinality sink for SQLite startup and fallback telemetry. +/// +/// Implementations should absorb delivery failures locally. Database behavior +/// must not depend on whether telemetry export succeeds. +pub trait DbTelemetry: Send + Sync + 'static { + fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]); + fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]); +} + +pub type DbTelemetryHandle = Arc; + +static PROCESS_DB_TELEMETRY: OnceLock = OnceLock::new(); + +/// Install the process-wide SQLite telemetry sink. +/// +/// Startup owners should call this once after OTEL initialization. Low-level +/// database paths will use the registered sink unless an explicit sink is +/// provided. Subsequent installs are ignored and keep the first installed sink. +pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool { + if PROCESS_DB_TELEMETRY.set(telemetry).is_ok() { + true + } else { + debug!("process SQLite telemetry sink already installed; ignoring duplicate install"); + false + } +} + +#[derive(Clone, Copy)] +pub(crate) enum DbKind { + State, + Logs, +} + +impl DbKind { + fn as_str(self) -> &'static str { + match self { + Self::State => "state", + Self::Logs => "logs", + } + } +} + +pub(crate) fn record_init_result( + telemetry: Option<&dyn DbTelemetry>, + db: DbKind, + phase: &'static str, + duration: Duration, + result: &anyhow::Result, +) { + let outcome = DbOutcomeTags::from_result(result); + let tags = [ + ("status", outcome.status), + ("phase", phase), + ("db", db.as_str()), + ("error", outcome.error), + ]; + record_counter(telemetry, DB_INIT_METRIC, &tags); + record_duration(telemetry, DB_INIT_DURATION_METRIC, duration, &tags); +} + +pub fn record_backfill_gate( + telemetry: Option<&dyn DbTelemetry>, + duration: Duration, + result: &anyhow::Result<()>, +) { + record_init_result(telemetry, DbKind::State, "backfill_gate", duration, result); +} + +pub fn record_fallback( + caller: &'static str, + reason: &'static str, + telemetry_override: Option<&dyn DbTelemetry>, +) { + record_counter( + telemetry_override, + DB_FALLBACK_METRIC, + &[("caller", caller), ("reason", reason)], + ); +} + +fn record_counter(telemetry: Option<&dyn DbTelemetry>, name: &str, tags: &[(&str, &str)]) { + if let Some(telemetry) = resolve_telemetry(telemetry) { + telemetry.counter(name, /*inc*/ 1, tags); + } +} + +fn record_duration( + telemetry: Option<&dyn DbTelemetry>, + name: &str, + duration: Duration, + tags: &[(&str, &str)], +) { + if let Some(telemetry) = resolve_telemetry(telemetry) { + telemetry.record_duration(name, duration, tags); + } +} + +fn resolve_telemetry(telemetry: Option<&dyn DbTelemetry>) -> Option<&dyn DbTelemetry> { + telemetry.or_else(|| PROCESS_DB_TELEMETRY.get().map(AsRef::as_ref)) +} + +struct DbOutcomeTags { + status: &'static str, + error: &'static str, +} + +impl DbOutcomeTags { + fn from_result(result: &anyhow::Result) -> Self { + match result { + Ok(_) => Self { + status: "success", + error: "none", + }, + Err(err) => Self { + status: "failed", + error: classify_error(err), + }, + } + } +} + +fn classify_error(err: &anyhow::Error) -> &'static str { + for cause in err.chain() { + if let Some(sqlx_err) = cause.downcast_ref::() { + return classify_sqlx_error(sqlx_err); + } + if cause + .downcast_ref::() + .is_some() + { + return "migration"; + } + if cause.downcast_ref::().is_some() { + return "serde"; + } + if cause.downcast_ref::().is_some() { + return "io"; + } + } + "unknown" +} + +fn classify_sqlx_error(err: &sqlx::Error) -> &'static str { + match err { + sqlx::Error::Database(database_error) => { + let code = database_error + .code() + .unwrap_or(Cow::Borrowed("none")) + .to_string(); + classify_sqlite_code(code.as_str()) + } + sqlx::Error::PoolTimedOut => "pool_timeout", + sqlx::Error::Io(_) => "io", + sqlx::Error::ColumnDecode { source, .. } if source.is::() => "serde", + sqlx::Error::Decode(source) if source.is::() => "serde", + _ => "unknown", + } +} + +fn classify_sqlite_code(code: &str) -> &'static str { + // SQLite result codes are documented at https://www.sqlite.org/rescode.html. + // Extended codes preserve the primary code in the low byte. + let primary_code = code.parse::().ok().map(|code| code & 0xff); + match primary_code { + Some(5) => "busy", + Some(6) => "locked", + Some(8) => "readonly", + Some(10) => "io", + Some(11) => "corrupt", + Some(13) => "full", + Some(14) => "cantopen", + Some(17) => "schema", + Some(19) => "constraint", + _ => "unknown", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn classifies_extended_sqlite_codes() { + assert_eq!(classify_sqlite_code("5"), "busy"); + assert_eq!(classify_sqlite_code("6"), "locked"); + assert_eq!(classify_sqlite_code("2067"), "constraint"); + } +} diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index ba7460ca47..df84dfc45a 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -42,6 +42,7 @@ use codex_config::format_config_error_with_source; use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecServerRuntimePaths; use codex_login::AuthConfig; +use codex_login::default_client::originator; use codex_login::default_client::set_default_client_residency_requirement; use codex_login::enforce_login_restrictions; use codex_protocol::ThreadId; @@ -892,6 +893,36 @@ pub async fn run_main( ) .await; + let otel_originator = originator().value; + let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + crate::legacy_core::otel_init::build_provider( + &config, + env!("CARGO_PKG_VERSION"), + /*service_name_override*/ None, + /*default_analytics_enabled*/ true, + ) + })) { + Ok(Ok(otel)) => otel, + Ok(Err(e)) => { + #[allow(clippy::print_stderr)] + { + eprintln!("Could not create otel exporter: {e}"); + } + None + } + Err(_) => { + #[allow(clippy::print_stderr)] + { + eprintln!("Could not create otel exporter: panicked during initialization"); + } + None + } + }; + crate::legacy_core::otel_init::record_process_start(otel.as_ref(), otel_originator.as_str()); + crate::legacy_core::otel_init::install_sqlite_telemetry( + otel.as_ref(), + otel_originator.as_str(), + ); let state_db = match &app_server_target { AppServerTarget::Embedded => state_db::init(&config).await, AppServerTarget::Remote { .. } => state_db::get_state_db(&config).await, @@ -1034,31 +1065,6 @@ pub async fn run_main( ensure_oss_provider_ready(provider_id, &config).await?; } - let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - crate::legacy_core::otel_init::build_provider( - &config, - env!("CARGO_PKG_VERSION"), - /*service_name_override*/ None, - /*default_analytics_enabled*/ true, - ) - })) { - Ok(Ok(otel)) => otel, - Ok(Err(e)) => { - #[allow(clippy::print_stderr)] - { - eprintln!("Could not create otel exporter: {e}"); - } - None - } - Err(_) => { - #[allow(clippy::print_stderr)] - { - eprintln!("Could not create otel exporter: panicked during initialization"); - } - None - } - }; - let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());