mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
318 lines
10 KiB
Rust
318 lines
10 KiB
Rust
use crate::config::Config;
|
|
use crate::features::Feature;
|
|
use crate::rollout::list::Cursor;
|
|
use crate::rollout::list::ThreadSortKey;
|
|
use crate::rollout::metadata;
|
|
use chrono::DateTime;
|
|
use chrono::NaiveDateTime;
|
|
use chrono::Timelike;
|
|
use chrono::Utc;
|
|
use codex_otel::OtelManager;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::protocol::RolloutItem;
|
|
use codex_protocol::protocol::SessionSource;
|
|
pub use codex_state::LogEntry;
|
|
use codex_state::STATE_DB_FILENAME;
|
|
use codex_state::ThreadMetadataBuilder;
|
|
use serde_json::Value;
|
|
use std::path::Path;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tracing::warn;
|
|
use uuid::Uuid;
|
|
|
|
/// Core-facing handle to the optional SQLite-backed state runtime.
|
|
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
|
|
|
|
/// Initialize the state runtime when the `sqlite` feature flag is enabled. To only be used
|
|
/// inside `core`. The initialization should not be done anywhere else.
|
|
pub(crate) async fn init_if_enabled(
|
|
config: &Config,
|
|
otel: Option<&OtelManager>,
|
|
) -> Option<StateDbHandle> {
|
|
let state_path = config.codex_home.join(STATE_DB_FILENAME);
|
|
if !config.features.enabled(Feature::Sqlite) {
|
|
// We delete the file on best effort basis to maintain retro-compatibility in the future.
|
|
let wal_path = state_path.with_extension("sqlite-wal");
|
|
let shm_path = state_path.with_extension("sqlite-shm");
|
|
for path in [state_path.as_path(), wal_path.as_path(), shm_path.as_path()] {
|
|
tokio::fs::remove_file(path).await.ok();
|
|
}
|
|
return None;
|
|
}
|
|
let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false);
|
|
let runtime = match codex_state::StateRuntime::init(
|
|
config.codex_home.clone(),
|
|
config.model_provider_id.clone(),
|
|
otel.cloned(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(runtime) => runtime,
|
|
Err(err) => {
|
|
warn!(
|
|
"failed to initialize state runtime at {}: {err}",
|
|
config.codex_home.display()
|
|
);
|
|
if let Some(otel) = otel {
|
|
otel.counter("codex.db.init", 1, &[("status", "init_error")]);
|
|
}
|
|
return None;
|
|
}
|
|
};
|
|
if !existed {
|
|
let runtime_for_backfill = Arc::clone(&runtime);
|
|
let config_for_backfill = config.clone();
|
|
let otel_for_backfill = otel.cloned();
|
|
tokio::task::spawn(async move {
|
|
metadata::backfill_sessions(
|
|
runtime_for_backfill.as_ref(),
|
|
&config_for_backfill,
|
|
otel_for_backfill.as_ref(),
|
|
)
|
|
.await;
|
|
});
|
|
}
|
|
Some(runtime)
|
|
}
|
|
|
|
/// Get the DB if the feature is enabled and the DB exists.
|
|
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
|
|
let state_path = config.codex_home.join(STATE_DB_FILENAME);
|
|
if !config.features.enabled(Feature::Sqlite)
|
|
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
|
|
{
|
|
return None;
|
|
}
|
|
codex_state::StateRuntime::init(
|
|
config.codex_home.clone(),
|
|
config.model_provider_id.clone(),
|
|
otel.cloned(),
|
|
)
|
|
.await
|
|
.ok()
|
|
}
|
|
|
|
/// Open the state runtime when the SQLite file exists, without feature gating.
|
|
///
|
|
/// This is used for parity checks during the SQLite migration phase.
|
|
pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
|
|
let db_path = codex_home.join(STATE_DB_FILENAME);
|
|
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
|
return None;
|
|
}
|
|
let runtime = codex_state::StateRuntime::init(
|
|
codex_home.to_path_buf(),
|
|
default_provider.to_string(),
|
|
None,
|
|
)
|
|
.await
|
|
.ok()?;
|
|
Some(runtime)
|
|
}
|
|
|
|
fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option<codex_state::Anchor> {
|
|
let cursor = cursor?;
|
|
let value = serde_json::to_value(cursor).ok()?;
|
|
let cursor_str = value.as_str()?;
|
|
let (ts_str, id_str) = cursor_str.split_once('|')?;
|
|
if id_str.contains('|') {
|
|
return None;
|
|
}
|
|
let id = Uuid::parse_str(id_str).ok()?;
|
|
let ts = if let Ok(naive) = NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S") {
|
|
DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
|
|
} else if let Ok(dt) = DateTime::parse_from_rfc3339(ts_str) {
|
|
dt.with_timezone(&Utc)
|
|
} else {
|
|
return None;
|
|
}
|
|
.with_nanosecond(0)?;
|
|
Some(codex_state::Anchor { ts, id })
|
|
}
|
|
|
|
/// List thread ids from SQLite for parity checks without rollout scanning.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub async fn list_thread_ids_db(
|
|
context: Option<&codex_state::StateRuntime>,
|
|
codex_home: &Path,
|
|
page_size: usize,
|
|
cursor: Option<&Cursor>,
|
|
sort_key: ThreadSortKey,
|
|
allowed_sources: &[SessionSource],
|
|
model_providers: Option<&[String]>,
|
|
archived_only: bool,
|
|
stage: &str,
|
|
) -> Option<Vec<ThreadId>> {
|
|
let ctx = context?;
|
|
if ctx.codex_home() != codex_home {
|
|
warn!(
|
|
"state db codex_home mismatch: expected {}, got {}",
|
|
ctx.codex_home().display(),
|
|
codex_home.display()
|
|
);
|
|
}
|
|
|
|
let anchor = cursor_to_anchor(cursor);
|
|
let allowed_sources: Vec<String> = allowed_sources
|
|
.iter()
|
|
.map(|value| match serde_json::to_value(value) {
|
|
Ok(Value::String(s)) => s,
|
|
Ok(other) => other.to_string(),
|
|
Err(_) => String::new(),
|
|
})
|
|
.collect();
|
|
let model_providers = model_providers.map(<[String]>::to_vec);
|
|
match ctx
|
|
.list_thread_ids(
|
|
page_size,
|
|
anchor.as_ref(),
|
|
match sort_key {
|
|
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
|
|
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
|
|
},
|
|
allowed_sources.as_slice(),
|
|
model_providers.as_deref(),
|
|
archived_only,
|
|
)
|
|
.await
|
|
{
|
|
Ok(ids) => Some(ids),
|
|
Err(err) => {
|
|
warn!("state db list_thread_ids failed during {stage}: {err}");
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Look up the rollout path for a thread id using SQLite.
|
|
pub async fn find_rollout_path_by_id(
|
|
context: Option<&codex_state::StateRuntime>,
|
|
thread_id: ThreadId,
|
|
archived_only: Option<bool>,
|
|
stage: &str,
|
|
) -> Option<PathBuf> {
|
|
let ctx = context?;
|
|
ctx.find_rollout_path_by_id(thread_id, archived_only)
|
|
.await
|
|
.unwrap_or_else(|err| {
|
|
warn!("state db find_rollout_path_by_id failed during {stage}: {err}");
|
|
None
|
|
})
|
|
}
|
|
|
|
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
|
|
pub async fn reconcile_rollout(
|
|
context: Option<&codex_state::StateRuntime>,
|
|
rollout_path: &Path,
|
|
default_provider: &str,
|
|
builder: Option<&ThreadMetadataBuilder>,
|
|
items: &[RolloutItem],
|
|
) {
|
|
let Some(ctx) = context else {
|
|
return;
|
|
};
|
|
if builder.is_some() || !items.is_empty() {
|
|
apply_rollout_items(
|
|
Some(ctx),
|
|
rollout_path,
|
|
default_provider,
|
|
builder,
|
|
items,
|
|
"reconcile_rollout",
|
|
)
|
|
.await;
|
|
return;
|
|
}
|
|
let outcome =
|
|
match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await {
|
|
Ok(outcome) => outcome,
|
|
Err(err) => {
|
|
warn!(
|
|
"state db reconcile_rollout extraction failed {}: {err}",
|
|
rollout_path.display()
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
if let Err(err) = ctx.upsert_thread(&outcome.metadata).await {
|
|
warn!(
|
|
"state db reconcile_rollout upsert failed {}: {err}",
|
|
rollout_path.display()
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Apply rollout items incrementally to SQLite.
|
|
pub async fn apply_rollout_items(
|
|
context: Option<&codex_state::StateRuntime>,
|
|
rollout_path: &Path,
|
|
_default_provider: &str,
|
|
builder: Option<&ThreadMetadataBuilder>,
|
|
items: &[RolloutItem],
|
|
stage: &str,
|
|
) {
|
|
let Some(ctx) = context else {
|
|
return;
|
|
};
|
|
let mut builder = match builder {
|
|
Some(builder) => builder.clone(),
|
|
None => match metadata::builder_from_items(items, rollout_path) {
|
|
Some(builder) => builder,
|
|
None => {
|
|
warn!(
|
|
"state db apply_rollout_items missing builder during {stage}: {}",
|
|
rollout_path.display()
|
|
);
|
|
record_discrepancy(stage, "missing_builder");
|
|
return;
|
|
}
|
|
},
|
|
};
|
|
builder.rollout_path = rollout_path.to_path_buf();
|
|
if let Err(err) = ctx.apply_rollout_items(&builder, items, None).await {
|
|
warn!(
|
|
"state db apply_rollout_items failed during {stage} for {}: {err}",
|
|
rollout_path.display()
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Record a state discrepancy metric with a stage and reason tag.
|
|
pub fn record_discrepancy(stage: &str, reason: &str) {
|
|
// We access the global metric because the call sites might not have access to the broader
|
|
// OtelManager.
|
|
if let Some(metric) = codex_otel::metrics::global() {
|
|
let _ = metric.counter(
|
|
"codex.db.discrepancy",
|
|
1,
|
|
&[("stage", stage), ("reason", reason)],
|
|
);
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::rollout::list::parse_cursor;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn cursor_to_anchor_normalizes_timestamp_format() {
|
|
let uuid = Uuid::new_v4();
|
|
let ts_str = "2026-01-27T12-34-56";
|
|
let token = format!("{ts_str}|{uuid}");
|
|
let cursor = parse_cursor(token.as_str()).expect("cursor should parse");
|
|
let anchor = cursor_to_anchor(Some(&cursor)).expect("anchor should parse");
|
|
|
|
let naive =
|
|
NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S").expect("ts should parse");
|
|
let expected_ts = DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
|
|
.with_nanosecond(0)
|
|
.expect("nanosecond");
|
|
|
|
assert_eq!(anchor.id, uuid);
|
|
assert_eq!(anchor.ts, expected_ts);
|
|
}
|
|
}
|