diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index ea3cf118c2..8ba76556d7 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -121,84 +121,74 @@ async fn try_init_with_roots_inner( ) })?; let backfill_gate_started = Instant::now(); + let backfill_gate_result = wait_for_startup_backfill( + runtime.as_ref(), + codex_home.as_path(), + default_model_provider_id.as_str(), + backfill_lease_seconds, + ) + .await; + codex_state::record_db_init_backfill_gate_metric( + metrics.as_deref(), + backfill_gate_started.elapsed(), + &backfill_gate_result, + ); + backfill_gate_result?; + Ok(runtime) +} + +async fn wait_for_startup_backfill( + runtime: &codex_state::StateRuntime, + codex_home: &Path, + default_model_provider_id: &str, + backfill_lease_seconds: Option, +) -> anyhow::Result<()> { let wait_started = Instant::now(); let mut reported_wait = false; loop { let backfill_state = match runtime.get_backfill_state().await { Ok(state) => state, Err(err) => { - let err = anyhow::anyhow!( + return Err(anyhow::anyhow!( "failed to read backfill state at {}: {err}", codex_home.display() - ); - return finish_backfill_gate( - metrics.as_deref(), - backfill_gate_started, - Err(err), - runtime, - ); + )); } }; if backfill_state.status == codex_state::BackfillStatus::Complete { - return finish_backfill_gate( - metrics.as_deref(), - backfill_gate_started, - Ok(()), - runtime, - ); + return Ok(()); } if let Some(backfill_lease_seconds) = backfill_lease_seconds { metadata::backfill_sessions_with_lease( - runtime.as_ref(), - codex_home.as_path(), - default_model_provider_id.as_str(), + runtime, + codex_home, + default_model_provider_id, backfill_lease_seconds, ) .await; } else { - metadata::backfill_sessions( - runtime.as_ref(), - codex_home.as_path(), - default_model_provider_id.as_str(), - ) - .await; + metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await; } let backfill_state = match runtime.get_backfill_state().await { Ok(state) => state, Err(err) => { - let err = anyhow::anyhow!( + return Err(anyhow::anyhow!( "failed to read backfill state at {} after startup backfill: {err}", codex_home.display() - ); - return finish_backfill_gate( - metrics.as_deref(), - backfill_gate_started, - Err(err), - runtime, - ); + )); } }; if backfill_state.status == codex_state::BackfillStatus::Complete { - return finish_backfill_gate( - metrics.as_deref(), - backfill_gate_started, - Ok(()), - runtime, - ); + return Ok(()); } if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT { - return finish_backfill_gate( - metrics.as_deref(), - backfill_gate_started, - Err(anyhow::anyhow!( - "timed out waiting for state db backfill at {} after {:?} (status: {})", - codex_home.display(), - STARTUP_BACKFILL_WAIT_TIMEOUT, - backfill_state.status.as_str() - )), - runtime, - ); + return Err(anyhow::anyhow!( + "timed out waiting for state db backfill at {} after {:?} (status: {})", + codex_home.display(), + STARTUP_BACKFILL_WAIT_TIMEOUT, + backfill_state.status.as_str() + )); } let message = format!( @@ -217,16 +207,6 @@ async fn try_init_with_roots_inner( } } -fn finish_backfill_gate( - metrics: Option<&dyn codex_state::DbMetricsRecorder>, - started: Instant, - result: anyhow::Result<()>, - runtime: StateDbHandle, -) -> anyhow::Result { - codex_state::record_db_init_backfill_gate_metric(metrics, started.elapsed(), &result); - result.map(|()| runtime) -} - fn emit_startup_warning(message: &str) { warn!("{message}"); if !tracing::dispatcher::has_been_set() { diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 26fc98ba46..62ba16d723 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -227,6 +227,24 @@ impl StateRuntime { { crate::telemetry::record_operation(self.metrics(), db, operation, access, future).await } + + pub(crate) fn record_db_operation_result( + &self, + db: DbKind, + operation: &'static str, + access: DbAccess, + started: Instant, + result: &anyhow::Result, + ) { + crate::telemetry::record_operation_result( + self.metrics(), + db, + operation, + access, + started.elapsed(), + result, + ); + } } fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { diff --git a/codex-rs/state/src/runtime/backfill.rs b/codex-rs/state/src/runtime/backfill.rs index 621596825b..8e91cbb88c 100644 --- a/codex-rs/state/src/runtime/backfill.rs +++ b/codex-rs/state/src/runtime/backfill.rs @@ -55,19 +55,27 @@ WHERE id = 1 /// Mark rollout metadata backfill as running. pub async fn mark_backfill_running(&self) -> anyhow::Result<()> { - self.ensure_backfill_state_row().await?; - sqlx::query( - r#" + self.record_db_operation( + DbKind::State, + "mark_backfill_running", + DbAccess::Write, + async { + self.ensure_backfill_state_row().await?; + sqlx::query( + r#" UPDATE backfill_state SET status = ?, updated_at = ? WHERE id = 1 "#, + ) + .bind(crate::BackfillStatus::Running.as_str()) + .bind(Utc::now().timestamp()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + }, ) - .bind(crate::BackfillStatus::Running.as_str()) - .bind(Utc::now().timestamp()) - .execute(self.pool.as_ref()) - .await?; - Ok(()) + .await } /// Persist rollout metadata backfill progress. diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index 1ff538a019..cf79182d2f 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -9,7 +9,8 @@ impl StateRuntime { /// Insert a batch of log entries into the logs table. pub async fn insert_logs(&self, entries: &[LogEntry]) -> anyhow::Result<()> { - self.record_db_operation(DbKind::Logs, "insert_logs", DbAccess::Transaction, async { + let started = Instant::now(); + let result: anyhow::Result<()> = async { if entries.is_empty() { return Ok(()); } @@ -45,8 +46,16 @@ impl StateRuntime { self.prune_logs_after_insert(entries, &mut tx).await?; tx.commit().await?; Ok(()) - }) - .await + } + .await; + self.record_db_operation_result( + DbKind::Logs, + "insert_logs", + DbAccess::Transaction, + started, + &result, + ); + result } /// Enforce per-partition retained-log-content caps after a successful batch insert. diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 41874def94..0b60633de9 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -702,7 +702,8 @@ WHERE id = ? metadata: &crate::ThreadMetadata, creation_memory_mode: Option<&str>, ) -> anyhow::Result<()> { - self.record_db_operation(DbKind::State, "upsert_thread", DbAccess::Write, async { + let started = Instant::now(); + let result: anyhow::Result<()> = async { let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?; // Backfill/reconcile callers merge existing git info before upserting, but that // read/modify/write is not atomic. Preserve non-null SQLite git fields here so @@ -810,8 +811,16 @@ ON CONFLICT(id) DO UPDATE SET ) .await?; Ok(()) - }) - .await + } + .await; + self.record_db_operation_result( + DbKind::State, + "upsert_thread", + DbAccess::Write, + started, + &result, + ); + result } /// Persist dynamic tools for a thread if none have been stored yet. @@ -823,24 +832,21 @@ ON CONFLICT(id) DO UPDATE SET thread_id: ThreadId, tools: Option<&[DynamicToolSpec]>, ) -> anyhow::Result<()> { - self.record_db_operation( - DbKind::State, - "persist_dynamic_tools", - DbAccess::Transaction, - async { - let Some(tools) = tools else { - return Ok(()); - }; - if tools.is_empty() { - return Ok(()); - } - let thread_id = thread_id.to_string(); - let mut tx = self.pool.begin().await?; - for (idx, tool) in tools.iter().enumerate() { - let position = i64::try_from(idx).unwrap_or(i64::MAX); - let input_schema = serde_json::to_string(&tool.input_schema)?; - sqlx::query( - r#" + let started = Instant::now(); + let result: anyhow::Result<()> = async { + let Some(tools) = tools else { + return Ok(()); + }; + if tools.is_empty() { + return Ok(()); + } + let thread_id = thread_id.to_string(); + let mut tx = self.pool.begin().await?; + for (idx, tool) in tools.iter().enumerate() { + let position = i64::try_from(idx).unwrap_or(i64::MAX); + let input_schema = serde_json::to_string(&tool.input_schema)?; + sqlx::query( + r#" INSERT INTO thread_dynamic_tools ( thread_id, position, @@ -852,22 +858,29 @@ INSERT INTO thread_dynamic_tools ( ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(thread_id, position) DO NOTHING "#, - ) - .bind(thread_id.as_str()) - .bind(position) - .bind(tool.namespace.as_deref()) - .bind(tool.name.as_str()) - .bind(tool.description.as_str()) - .bind(input_schema) - .bind(tool.defer_loading) - .execute(&mut *tx) - .await?; - } - tx.commit().await?; - Ok(()) - }, - ) - .await + ) + .bind(thread_id.as_str()) + .bind(position) + .bind(tool.namespace.as_deref()) + .bind(tool.name.as_str()) + .bind(tool.description.as_str()) + .bind(input_schema) + .bind(tool.defer_loading) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + .await; + self.record_db_operation_result( + DbKind::State, + "persist_dynamic_tools", + DbAccess::Transaction, + started, + &result, + ); + result } /// Apply rollout items incrementally using the underlying database. diff --git a/codex-rs/state/src/telemetry.rs b/codex-rs/state/src/telemetry.rs index edd516fe5b..ed598aa738 100644 --- a/codex-rs/state/src/telemetry.rs +++ b/codex-rs/state/src/telemetry.rs @@ -98,18 +98,13 @@ pub(crate) fn record_init_result( duration: Duration, result: &anyhow::Result, ) { - let status = if result.is_ok() { "success" } else { "failed" }; - let error_tags = result - .as_ref() - .err() - .map(classify_error) - .unwrap_or_else(DbErrorTags::none); + let outcome = DbOutcomeTags::from_result(result); let tags = [ - ("status", status), + ("status", outcome.status), ("phase", phase), ("db", db.as_str()), - ("error_class", error_tags.error_class), - ("sqlite_code", error_tags.sqlite_code.as_str()), + ("error_class", outcome.error.error_class), + ("sqlite_code", outcome.error.sqlite_code.as_str()), ]; record_counter(metrics, DB_INIT_METRIC, &tags); record_duration(metrics, DB_INIT_DURATION_METRIC, duration, &tags); @@ -191,7 +186,7 @@ pub(crate) fn classify_sqlite_code(code: &str) -> &'static str { } } -fn record_operation_result( +pub(crate) fn record_operation_result( metrics: Option<&dyn DbMetricsRecorder>, db: DbKind, operation: &'static str, @@ -199,24 +194,39 @@ fn record_operation_result( duration: Duration, result: &anyhow::Result, ) { - let status = if result.is_ok() { "success" } else { "failed" }; - let error_tags = result - .as_ref() - .err() - .map(classify_error) - .unwrap_or_else(DbErrorTags::none); + let outcome = DbOutcomeTags::from_result(result); let tags = [ - ("status", status), + ("status", outcome.status), ("db", db.as_str()), ("operation", operation), ("access", access.as_str()), - ("error_class", error_tags.error_class), - ("sqlite_code", error_tags.sqlite_code.as_str()), + ("error_class", outcome.error.error_class), + ("sqlite_code", outcome.error.sqlite_code.as_str()), ]; record_counter(metrics, DB_OPERATION_METRIC, &tags); record_duration(metrics, DB_OPERATION_DURATION_METRIC, duration, &tags); } +struct DbOutcomeTags { + status: &'static str, + error: DbErrorTags, +} + +impl DbOutcomeTags { + fn from_result(result: &anyhow::Result) -> Self { + match result { + Ok(_) => Self { + status: "success", + error: DbErrorTags::none(), + }, + Err(err) => Self { + status: "failed", + error: classify_error(err), + }, + } + } +} + fn classify_sqlx_error(err: &sqlx::Error) -> DbErrorTags { match err { sqlx::Error::Database(database_error) => {