mirror of
https://github.com/openai/codex.git
synced 2026-05-24 21:14:51 +00:00
## Why Thread goals are moving toward extension-owned runtime behavior, but their persisted state was still stored in the shared state database. This makes the goal store harder to isolate and keeps future storage splits tied to ad hoc runtime plumbing. This PR gives goals their own SQLite database while keeping the existing `StateRuntime` entry point. The goal is to make this the pattern for adding more dedicated runtime databases later. This also reduce load on existing DB and reduce contention ## Limitation Thread preview from goal is not supported anymore. I'm looking into this [EDIT]: solved ## What changed - Added a dedicated `goals_1.sqlite` database with its own `goals_migrations` directory. - Moved `thread_goals` creation into the goals DB migration set. - Dropped the old `thread_goals` table from the main state DB with a normal state migration. There is intentionally no backfill for existing goal rows. - Changed `GoalStore` to be backed only by the goals DB pool. - Removed the old goal-write side effect that filled empty `threads.preview` values from the goal objective. - Added shared runtime DB path metadata so startup, telemetry, `codex doctor`, and repair handling can include future DBs without bespoke path lists. - Updated Bazel compile data so the new goals migration directory is available to `sqlx::migrate!`. ## Verification - `cargo check --tests -p codex-state -p codex-cli -p codex-core -p codex-app-server` - `just fix -p codex-state` - `just fix -p codex-cli` - `just fix -p codex-app-server`
202 lines
5.7 KiB
Rust
202 lines
5.7 KiB
Rust
use std::borrow::Cow;
|
|
use std::sync::Arc;
|
|
use std::sync::OnceLock;
|
|
use std::time::Duration;
|
|
|
|
use crate::DB_FALLBACK_METRIC;
|
|
use crate::DB_INIT_DURATION_METRIC;
|
|
use crate::DB_INIT_METRIC;
|
|
use tracing::debug;
|
|
|
|
/// Low-cardinality sink for SQLite startup and fallback telemetry.
|
|
///
|
|
/// Implementations should absorb delivery failures locally. Database behavior
|
|
/// must not depend on whether telemetry export succeeds.
|
|
pub trait DbTelemetry: Send + Sync + 'static {
|
|
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]);
|
|
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]);
|
|
}
|
|
|
|
pub type DbTelemetryHandle = Arc<dyn DbTelemetry>;
|
|
|
|
static PROCESS_DB_TELEMETRY: OnceLock<DbTelemetryHandle> = OnceLock::new();
|
|
|
|
/// Install the process-wide SQLite telemetry sink.
|
|
///
|
|
/// Startup owners should call this once after OTEL initialization. Low-level
|
|
/// database paths will use the registered sink unless an explicit sink is
|
|
/// provided. Subsequent installs are ignored and keep the first installed sink.
|
|
pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool {
|
|
if PROCESS_DB_TELEMETRY.set(telemetry).is_ok() {
|
|
true
|
|
} else {
|
|
debug!("process SQLite telemetry sink already installed; ignoring duplicate install");
|
|
false
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy)]
|
|
pub(crate) enum DbKind {
|
|
State,
|
|
Logs,
|
|
Goals,
|
|
}
|
|
|
|
impl DbKind {
|
|
fn as_str(self) -> &'static str {
|
|
match self {
|
|
Self::State => "state",
|
|
Self::Logs => "logs",
|
|
Self::Goals => "goals",
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) fn record_init_result<T>(
|
|
telemetry: Option<&dyn DbTelemetry>,
|
|
db: DbKind,
|
|
phase: &'static str,
|
|
duration: Duration,
|
|
result: &anyhow::Result<T>,
|
|
) {
|
|
let outcome = DbOutcomeTags::from_result(result);
|
|
let tags = [
|
|
("status", outcome.status),
|
|
("phase", phase),
|
|
("db", db.as_str()),
|
|
("error", outcome.error),
|
|
];
|
|
record_counter(telemetry, DB_INIT_METRIC, &tags);
|
|
record_duration(telemetry, DB_INIT_DURATION_METRIC, duration, &tags);
|
|
}
|
|
|
|
pub fn record_backfill_gate(
|
|
telemetry: Option<&dyn DbTelemetry>,
|
|
duration: Duration,
|
|
result: &anyhow::Result<()>,
|
|
) {
|
|
record_init_result(telemetry, DbKind::State, "backfill_gate", duration, result);
|
|
}
|
|
|
|
pub fn record_fallback(
|
|
caller: &'static str,
|
|
reason: &'static str,
|
|
telemetry_override: Option<&dyn DbTelemetry>,
|
|
) {
|
|
record_counter(
|
|
telemetry_override,
|
|
DB_FALLBACK_METRIC,
|
|
&[("caller", caller), ("reason", reason)],
|
|
);
|
|
}
|
|
|
|
fn record_counter(telemetry: Option<&dyn DbTelemetry>, name: &str, tags: &[(&str, &str)]) {
|
|
if let Some(telemetry) = resolve_telemetry(telemetry) {
|
|
telemetry.counter(name, /*inc*/ 1, tags);
|
|
}
|
|
}
|
|
|
|
fn record_duration(
|
|
telemetry: Option<&dyn DbTelemetry>,
|
|
name: &str,
|
|
duration: Duration,
|
|
tags: &[(&str, &str)],
|
|
) {
|
|
if let Some(telemetry) = resolve_telemetry(telemetry) {
|
|
telemetry.record_duration(name, duration, tags);
|
|
}
|
|
}
|
|
|
|
fn resolve_telemetry(telemetry: Option<&dyn DbTelemetry>) -> Option<&dyn DbTelemetry> {
|
|
telemetry.or_else(|| PROCESS_DB_TELEMETRY.get().map(AsRef::as_ref))
|
|
}
|
|
|
|
struct DbOutcomeTags {
|
|
status: &'static str,
|
|
error: &'static str,
|
|
}
|
|
|
|
impl DbOutcomeTags {
|
|
fn from_result<T>(result: &anyhow::Result<T>) -> Self {
|
|
match result {
|
|
Ok(_) => Self {
|
|
status: "success",
|
|
error: "none",
|
|
},
|
|
Err(err) => Self {
|
|
status: "failed",
|
|
error: classify_error(err),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
fn classify_error(err: &anyhow::Error) -> &'static str {
|
|
for cause in err.chain() {
|
|
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>() {
|
|
return classify_sqlx_error(sqlx_err);
|
|
}
|
|
if cause
|
|
.downcast_ref::<sqlx::migrate::MigrateError>()
|
|
.is_some()
|
|
{
|
|
return "migration";
|
|
}
|
|
if cause.downcast_ref::<serde_json::Error>().is_some() {
|
|
return "serde";
|
|
}
|
|
if cause.downcast_ref::<std::io::Error>().is_some() {
|
|
return "io";
|
|
}
|
|
}
|
|
"unknown"
|
|
}
|
|
|
|
fn classify_sqlx_error(err: &sqlx::Error) -> &'static str {
|
|
match err {
|
|
sqlx::Error::Database(database_error) => {
|
|
let code = database_error
|
|
.code()
|
|
.unwrap_or(Cow::Borrowed("none"))
|
|
.to_string();
|
|
classify_sqlite_code(code.as_str())
|
|
}
|
|
sqlx::Error::PoolTimedOut => "pool_timeout",
|
|
sqlx::Error::Io(_) => "io",
|
|
sqlx::Error::ColumnDecode { source, .. } if source.is::<serde_json::Error>() => "serde",
|
|
sqlx::Error::Decode(source) if source.is::<serde_json::Error>() => "serde",
|
|
_ => "unknown",
|
|
}
|
|
}
|
|
|
|
fn classify_sqlite_code(code: &str) -> &'static str {
|
|
// SQLite result codes are documented at https://www.sqlite.org/rescode.html.
|
|
// Extended codes preserve the primary code in the low byte.
|
|
let primary_code = code.parse::<i32>().ok().map(|code| code & 0xff);
|
|
match primary_code {
|
|
Some(5) => "busy",
|
|
Some(6) => "locked",
|
|
Some(8) => "readonly",
|
|
Some(10) => "io",
|
|
Some(11) => "corrupt",
|
|
Some(13) => "full",
|
|
Some(14) => "cantopen",
|
|
Some(17) => "schema",
|
|
Some(19) => "constraint",
|
|
_ => "unknown",
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn classifies_extended_sqlite_codes() {
|
|
assert_eq!(classify_sqlite_code("5"), "busy");
|
|
assert_eq!(classify_sqlite_code("6"), "locked");
|
|
assert_eq!(classify_sqlite_code("2067"), "constraint");
|
|
}
|
|
}
|