This commit is contained in:
Owen Lin
2026-05-07 17:15:57 -07:00
parent 3a9c914b1f
commit 0c63bcda5e
6 changed files with 163 additions and 125 deletions

View File

@@ -121,84 +121,74 @@ async fn try_init_with_roots_inner(
)
})?;
let backfill_gate_started = Instant::now();
let backfill_gate_result = wait_for_startup_backfill(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
backfill_lease_seconds,
)
.await;
codex_state::record_db_init_backfill_gate_metric(
metrics.as_deref(),
backfill_gate_started.elapsed(),
&backfill_gate_result,
);
backfill_gate_result?;
Ok(runtime)
}
async fn wait_for_startup_backfill(
runtime: &codex_state::StateRuntime,
codex_home: &Path,
default_model_provider_id: &str,
backfill_lease_seconds: Option<i64>,
) -> anyhow::Result<()> {
let wait_started = Instant::now();
let mut reported_wait = false;
loop {
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
let err = anyhow::anyhow!(
return Err(anyhow::anyhow!(
"failed to read backfill state at {}: {err}",
codex_home.display()
);
return finish_backfill_gate(
metrics.as_deref(),
backfill_gate_started,
Err(err),
runtime,
);
));
}
};
if backfill_state.status == codex_state::BackfillStatus::Complete {
return finish_backfill_gate(
metrics.as_deref(),
backfill_gate_started,
Ok(()),
runtime,
);
return Ok(());
}
if let Some(backfill_lease_seconds) = backfill_lease_seconds {
metadata::backfill_sessions_with_lease(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
runtime,
codex_home,
default_model_provider_id,
backfill_lease_seconds,
)
.await;
} else {
metadata::backfill_sessions(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
)
.await;
metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await;
}
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
let err = anyhow::anyhow!(
return Err(anyhow::anyhow!(
"failed to read backfill state at {} after startup backfill: {err}",
codex_home.display()
);
return finish_backfill_gate(
metrics.as_deref(),
backfill_gate_started,
Err(err),
runtime,
);
));
}
};
if backfill_state.status == codex_state::BackfillStatus::Complete {
return finish_backfill_gate(
metrics.as_deref(),
backfill_gate_started,
Ok(()),
runtime,
);
return Ok(());
}
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
return finish_backfill_gate(
metrics.as_deref(),
backfill_gate_started,
Err(anyhow::anyhow!(
"timed out waiting for state db backfill at {} after {:?} (status: {})",
codex_home.display(),
STARTUP_BACKFILL_WAIT_TIMEOUT,
backfill_state.status.as_str()
)),
runtime,
);
return Err(anyhow::anyhow!(
"timed out waiting for state db backfill at {} after {:?} (status: {})",
codex_home.display(),
STARTUP_BACKFILL_WAIT_TIMEOUT,
backfill_state.status.as_str()
));
}
let message = format!(
@@ -217,16 +207,6 @@ async fn try_init_with_roots_inner(
}
}
fn finish_backfill_gate(
metrics: Option<&dyn codex_state::DbMetricsRecorder>,
started: Instant,
result: anyhow::Result<()>,
runtime: StateDbHandle,
) -> anyhow::Result<StateDbHandle> {
codex_state::record_db_init_backfill_gate_metric(metrics, started.elapsed(), &result);
result.map(|()| runtime)
}
fn emit_startup_warning(message: &str) {
warn!("{message}");
if !tracing::dispatcher::has_been_set() {

View File

@@ -227,6 +227,24 @@ impl StateRuntime {
{
crate::telemetry::record_operation(self.metrics(), db, operation, access, future).await
}
pub(crate) fn record_db_operation_result<T>(
&self,
db: DbKind,
operation: &'static str,
access: DbAccess,
started: Instant,
result: &anyhow::Result<T>,
) {
crate::telemetry::record_operation_result(
self.metrics(),
db,
operation,
access,
started.elapsed(),
result,
);
}
}
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {

View File

@@ -55,19 +55,27 @@ WHERE id = 1
/// Mark rollout metadata backfill as running.
pub async fn mark_backfill_running(&self) -> anyhow::Result<()> {
self.ensure_backfill_state_row().await?;
sqlx::query(
r#"
self.record_db_operation(
DbKind::State,
"mark_backfill_running",
DbAccess::Write,
async {
self.ensure_backfill_state_row().await?;
sqlx::query(
r#"
UPDATE backfill_state
SET status = ?, updated_at = ?
WHERE id = 1
"#,
)
.bind(crate::BackfillStatus::Running.as_str())
.bind(Utc::now().timestamp())
.execute(self.pool.as_ref())
.await?;
Ok(())
},
)
.bind(crate::BackfillStatus::Running.as_str())
.bind(Utc::now().timestamp())
.execute(self.pool.as_ref())
.await?;
Ok(())
.await
}
/// Persist rollout metadata backfill progress.

View File

@@ -9,7 +9,8 @@ impl StateRuntime {
/// Insert a batch of log entries into the logs table.
pub async fn insert_logs(&self, entries: &[LogEntry]) -> anyhow::Result<()> {
self.record_db_operation(DbKind::Logs, "insert_logs", DbAccess::Transaction, async {
let started = Instant::now();
let result: anyhow::Result<()> = async {
if entries.is_empty() {
return Ok(());
}
@@ -45,8 +46,16 @@ impl StateRuntime {
self.prune_logs_after_insert(entries, &mut tx).await?;
tx.commit().await?;
Ok(())
})
.await
}
.await;
self.record_db_operation_result(
DbKind::Logs,
"insert_logs",
DbAccess::Transaction,
started,
&result,
);
result
}
/// Enforce per-partition retained-log-content caps after a successful batch insert.

View File

@@ -702,7 +702,8 @@ WHERE id = ?
metadata: &crate::ThreadMetadata,
creation_memory_mode: Option<&str>,
) -> anyhow::Result<()> {
self.record_db_operation(DbKind::State, "upsert_thread", DbAccess::Write, async {
let started = Instant::now();
let result: anyhow::Result<()> = async {
let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?;
// Backfill/reconcile callers merge existing git info before upserting, but that
// read/modify/write is not atomic. Preserve non-null SQLite git fields here so
@@ -810,8 +811,16 @@ ON CONFLICT(id) DO UPDATE SET
)
.await?;
Ok(())
})
.await
}
.await;
self.record_db_operation_result(
DbKind::State,
"upsert_thread",
DbAccess::Write,
started,
&result,
);
result
}
/// Persist dynamic tools for a thread if none have been stored yet.
@@ -823,24 +832,21 @@ ON CONFLICT(id) DO UPDATE SET
thread_id: ThreadId,
tools: Option<&[DynamicToolSpec]>,
) -> anyhow::Result<()> {
self.record_db_operation(
DbKind::State,
"persist_dynamic_tools",
DbAccess::Transaction,
async {
let Some(tools) = tools else {
return Ok(());
};
if tools.is_empty() {
return Ok(());
}
let thread_id = thread_id.to_string();
let mut tx = self.pool.begin().await?;
for (idx, tool) in tools.iter().enumerate() {
let position = i64::try_from(idx).unwrap_or(i64::MAX);
let input_schema = serde_json::to_string(&tool.input_schema)?;
sqlx::query(
r#"
let started = Instant::now();
let result: anyhow::Result<()> = async {
let Some(tools) = tools else {
return Ok(());
};
if tools.is_empty() {
return Ok(());
}
let thread_id = thread_id.to_string();
let mut tx = self.pool.begin().await?;
for (idx, tool) in tools.iter().enumerate() {
let position = i64::try_from(idx).unwrap_or(i64::MAX);
let input_schema = serde_json::to_string(&tool.input_schema)?;
sqlx::query(
r#"
INSERT INTO thread_dynamic_tools (
thread_id,
position,
@@ -852,22 +858,29 @@ INSERT INTO thread_dynamic_tools (
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(thread_id, position) DO NOTHING
"#,
)
.bind(thread_id.as_str())
.bind(position)
.bind(tool.namespace.as_deref())
.bind(tool.name.as_str())
.bind(tool.description.as_str())
.bind(input_schema)
.bind(tool.defer_loading)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
},
)
.await
)
.bind(thread_id.as_str())
.bind(position)
.bind(tool.namespace.as_deref())
.bind(tool.name.as_str())
.bind(tool.description.as_str())
.bind(input_schema)
.bind(tool.defer_loading)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
.await;
self.record_db_operation_result(
DbKind::State,
"persist_dynamic_tools",
DbAccess::Transaction,
started,
&result,
);
result
}
/// Apply rollout items incrementally using the underlying database.

View File

@@ -98,18 +98,13 @@ pub(crate) fn record_init_result<T>(
duration: Duration,
result: &anyhow::Result<T>,
) {
let status = if result.is_ok() { "success" } else { "failed" };
let error_tags = result
.as_ref()
.err()
.map(classify_error)
.unwrap_or_else(DbErrorTags::none);
let outcome = DbOutcomeTags::from_result(result);
let tags = [
("status", status),
("status", outcome.status),
("phase", phase),
("db", db.as_str()),
("error_class", error_tags.error_class),
("sqlite_code", error_tags.sqlite_code.as_str()),
("error_class", outcome.error.error_class),
("sqlite_code", outcome.error.sqlite_code.as_str()),
];
record_counter(metrics, DB_INIT_METRIC, &tags);
record_duration(metrics, DB_INIT_DURATION_METRIC, duration, &tags);
@@ -191,7 +186,7 @@ pub(crate) fn classify_sqlite_code(code: &str) -> &'static str {
}
}
fn record_operation_result<T>(
pub(crate) fn record_operation_result<T>(
metrics: Option<&dyn DbMetricsRecorder>,
db: DbKind,
operation: &'static str,
@@ -199,24 +194,39 @@ fn record_operation_result<T>(
duration: Duration,
result: &anyhow::Result<T>,
) {
let status = if result.is_ok() { "success" } else { "failed" };
let error_tags = result
.as_ref()
.err()
.map(classify_error)
.unwrap_or_else(DbErrorTags::none);
let outcome = DbOutcomeTags::from_result(result);
let tags = [
("status", status),
("status", outcome.status),
("db", db.as_str()),
("operation", operation),
("access", access.as_str()),
("error_class", error_tags.error_class),
("sqlite_code", error_tags.sqlite_code.as_str()),
("error_class", outcome.error.error_class),
("sqlite_code", outcome.error.sqlite_code.as_str()),
];
record_counter(metrics, DB_OPERATION_METRIC, &tags);
record_duration(metrics, DB_OPERATION_DURATION_METRIC, duration, &tags);
}
struct DbOutcomeTags {
status: &'static str,
error: DbErrorTags,
}
impl DbOutcomeTags {
fn from_result<T>(result: &anyhow::Result<T>) -> Self {
match result {
Ok(_) => Self {
status: "success",
error: DbErrorTags::none(),
},
Err(err) => Self {
status: "failed",
error: classify_error(err),
},
}
}
}
fn classify_sqlx_error(err: &sqlx::Error) -> DbErrorTags {
match err {
sqlx::Error::Database(database_error) => {