feat: simplify DB further (#13771)

This commit is contained in:
jif-oai
2026-03-07 12:48:36 +01:00
committed by GitHub
parent 5ceff6588e
commit cf143bf71e
28 changed files with 147 additions and 297 deletions

View File

@@ -236,7 +236,7 @@ impl AgentControl {
// Collab resume callers rebuild a placeholder ThreadSpawn source. Rehydrate the
// stored nickname/role from sqlite when available; otherwise leave both unset.
let (resumed_agent_nickname, resumed_agent_role) =
if let Some(state_db_ctx) = state_db::get_state_db(&config, None).await {
if let Some(state_db_ctx) = state_db::get_state_db(&config).await {
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
Ok(None) | Err(_) => (None, None),

View File

@@ -451,7 +451,7 @@ impl Codex {
};
match thread_id {
Some(thread_id) => {
let state_db_ctx = state_db::get_state_db(&config, None).await;
let state_db_ctx = state_db::get_state_db(&config).await;
state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn")
.await
}
@@ -1273,7 +1273,7 @@ impl Session {
if config.ephemeral {
Ok::<_, anyhow::Error>((None, None))
} else {
let state_db_ctx = state_db::init(&config, None).await;
let state_db_ctx = state_db::init(&config).await;
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,

View File

@@ -474,7 +474,6 @@ mod phase2 {
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");
@@ -857,7 +856,6 @@ mod phase2 {
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");

View File

@@ -7,7 +7,6 @@ use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Timelike;
use chrono::Utc;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::RolloutItem;
@@ -96,7 +95,6 @@ pub(crate) fn builder_from_items(
pub(crate) async fn extract_metadata_from_rollout(
rollout_path: &Path,
default_provider: &str,
otel: Option<&SessionTelemetry>,
) -> anyhow::Result<ExtractionOutcome> {
let (items, _thread_id, parse_errors) =
RolloutRecorder::load_rollout_items(rollout_path).await?;
@@ -119,15 +117,6 @@ pub(crate) async fn extract_metadata_from_rollout(
if let Some(updated_at) = file_modified_time_utc(rollout_path).await {
metadata.updated_at = updated_at;
}
if parse_errors > 0
&& let Some(otel) = otel
{
otel.counter(
DB_ERROR_METRIC,
parse_errors as i64,
&[("stage", "extract_metadata_from_rollout")],
);
}
Ok(ExtractionOutcome {
metadata,
memory_mode: items.iter().rev().find_map(|item| match item {
@@ -141,12 +130,11 @@ pub(crate) async fn extract_metadata_from_rollout(
})
}
pub(crate) async fn backfill_sessions(
runtime: &codex_state::StateRuntime,
config: &Config,
otel: Option<&SessionTelemetry>,
) {
let timer = otel.and_then(|otel| otel.start_timer(DB_METRIC_BACKFILL_DURATION_MS, &[]).ok());
pub(crate) async fn backfill_sessions(runtime: &codex_state::StateRuntime, config: &Config) {
let metric_client = codex_otel::metrics::global();
let timer = metric_client
.as_ref()
.and_then(|otel| otel.start_timer(DB_METRIC_BACKFILL_DURATION_MS, &[]).ok());
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
@@ -154,9 +142,6 @@ pub(crate) async fn backfill_sessions(
"failed to read backfill state at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "backfill_state_read")]);
}
BackfillState::default()
}
};
@@ -170,13 +155,6 @@ pub(crate) async fn backfill_sessions(
"failed to claim backfill worker at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter(
DB_ERROR_METRIC,
1,
&[("stage", "backfill_state_claim_running")],
);
}
return;
}
};
@@ -194,13 +172,6 @@ pub(crate) async fn backfill_sessions(
"failed to read claimed backfill state at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter(
DB_ERROR_METRIC,
1,
&[("stage", "backfill_state_read_claimed")],
);
}
BackfillState {
status: BackfillStatus::Running,
..Default::default()
@@ -213,13 +184,6 @@ pub(crate) async fn backfill_sessions(
"failed to mark backfill running at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter(
DB_ERROR_METRIC,
1,
&[("stage", "backfill_state_mark_running")],
);
}
} else {
backfill_state.status = BackfillStatus::Running;
}
@@ -262,18 +226,14 @@ pub(crate) async fn backfill_sessions(
for batch in rollout_paths.chunks(BACKFILL_BATCH_SIZE) {
for rollout in batch {
stats.scanned = stats.scanned.saturating_add(1);
match extract_metadata_from_rollout(
&rollout.path,
config.model_provider_id.as_str(),
otel,
)
.await
match extract_metadata_from_rollout(&rollout.path, config.model_provider_id.as_str())
.await
{
Ok(outcome) => {
if outcome.parse_errors > 0
&& let Some(otel) = otel
&& let Some(ref metric_client) = metric_client
{
otel.counter(
let _ = metric_client.counter(
DB_ERROR_METRIC,
outcome.parse_errors as i64,
&[("stage", "backfill_sessions")],
@@ -317,13 +277,6 @@ pub(crate) async fn backfill_sessions(
)
.await
{
if let Some(otel) = otel {
otel.counter(
DB_ERROR_METRIC,
1,
&[("stage", "backfill_dynamic_tools")],
);
}
warn!(
"failed to backfill dynamic tools {}: {err}",
rollout.path.display()
@@ -356,13 +309,6 @@ pub(crate) async fn backfill_sessions(
"failed to checkpoint backfill at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter(
DB_ERROR_METRIC,
1,
&[("stage", "backfill_state_checkpoint")],
);
}
} else {
last_watermark = Some(last_entry.watermark.clone());
}
@@ -376,26 +322,19 @@ pub(crate) async fn backfill_sessions(
"failed to mark backfill complete at {}: {err}",
config.codex_home.display()
);
if let Some(otel) = otel {
otel.counter(
DB_ERROR_METRIC,
1,
&[("stage", "backfill_state_mark_complete")],
);
}
}
info!(
"state db backfill scanned={}, upserted={}, failed={}",
stats.scanned, stats.upserted, stats.failed
);
if let Some(otel) = otel {
otel.counter(
if let Some(metric_client) = metric_client {
let _ = metric_client.counter(
DB_METRIC_BACKFILL,
stats.upserted as i64,
&[("status", "upserted")],
);
otel.counter(
let _ = metric_client.counter(
DB_METRIC_BACKFILL,
stats.failed as i64,
&[("status", "failed")],
@@ -558,7 +497,7 @@ mod tests {
let mut file = File::create(&path).expect("create rollout");
writeln!(file, "{json}").expect("write rollout");
let outcome = extract_metadata_from_rollout(&path, "openai", None)
let outcome = extract_metadata_from_rollout(&path, "openai")
.await
.expect("extract");
@@ -627,7 +566,7 @@ mod tests {
.expect("write rollout line");
}
let outcome = extract_metadata_from_rollout(&path, "openai", None)
let outcome = extract_metadata_from_rollout(&path, "openai")
.await
.expect("extract");
@@ -684,7 +623,7 @@ mod tests {
);
let runtime =
codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let first_watermark =
@@ -702,7 +641,7 @@ mod tests {
let mut config = crate::config::test_config();
config.codex_home = codex_home.clone();
config.model_provider_id = "test-provider".to_string();
backfill_sessions(runtime.as_ref(), &config, None).await;
backfill_sessions(runtime.as_ref(), &config).await;
let first_id = ThreadId::from_string(&first_uuid.to_string()).expect("first thread id");
let second_id = ThreadId::from_string(&second_uuid.to_string()).expect("second thread id");
@@ -754,11 +693,11 @@ mod tests {
);
let runtime =
codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let thread_id = ThreadId::from_string(&thread_uuid.to_string()).expect("thread id");
let mut existing = extract_metadata_from_rollout(&rollout_path, "test-provider", None)
let mut existing = extract_metadata_from_rollout(&rollout_path, "test-provider")
.await
.expect("extract")
.metadata;
@@ -773,7 +712,7 @@ mod tests {
let mut config = crate::config::test_config();
config.codex_home = codex_home.clone();
config.model_provider_id = "test-provider".to_string();
backfill_sessions(runtime.as_ref(), &config, None).await;
backfill_sessions(runtime.as_ref(), &config).await;
let persisted = runtime
.get_thread(thread_id)
@@ -804,14 +743,14 @@ mod tests {
);
let runtime =
codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let mut config = crate::config::test_config();
config.codex_home = codex_home.clone();
config.model_provider_id = "test-provider".to_string();
backfill_sessions(runtime.as_ref(), &config, None).await;
backfill_sessions(runtime.as_ref(), &config).await;
let thread_id = ThreadId::from_string(&thread_uuid.to_string()).expect("thread id");
let stored = runtime

View File

@@ -256,7 +256,7 @@ impl RolloutRecorder {
.await?
};
let state_db_ctx = state_db::get_state_db(config, None).await;
let state_db_ctx = state_db::get_state_db(config).await;
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
@@ -308,7 +308,7 @@ impl RolloutRecorder {
filter_cwd: Option<&Path>,
) -> std::io::Result<Option<PathBuf>> {
let codex_home = config.codex_home.as_path();
let state_db_ctx = state_db::get_state_db(config, None).await;
let state_db_ctx = state_db::get_state_db(config).await;
if state_db_ctx.is_some() {
let mut db_cursor = cursor.cloned();
loop {
@@ -1061,7 +1061,7 @@ async fn resume_candidate_matches_cwd(
return cwd_matches(latest_turn_context_cwd, cwd);
}
metadata::extract_metadata_from_rollout(rollout_path, default_provider, None)
metadata::extract_metadata_from_rollout(rollout_path, default_provider)
.await
.is_ok_and(|outcome| cwd_matches(outcome.metadata.cwd.as_path(), cwd))
}
@@ -1254,13 +1254,10 @@ mod tests {
.enable(Feature::Sqlite)
.expect("test config should allow sqlite");
let state_db = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
None,
)
.await
.expect("state db should initialize");
let state_db =
StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone())
.await
.expect("state db should initialize");
state_db
.mark_backfill_complete(None)
.await
@@ -1345,13 +1342,10 @@ mod tests {
.enable(Feature::Sqlite)
.expect("test config should allow sqlite");
let state_db = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
None,
)
.await
.expect("state db should initialize");
let state_db =
StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone())
.await
.expect("state db should initialize");
let thread_id = ThreadId::new();
let rollout_path = home.path().join("rollout.jsonl");
let builder = ThreadMetadataBuilder::new(
@@ -1457,7 +1451,6 @@ mod tests {
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
None,
)
.await
.expect("state db should initialize");
@@ -1527,7 +1520,6 @@ mod tests {
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
None,
)
.await
.expect("state db should initialize");

View File

@@ -57,10 +57,9 @@ async fn insert_state_db_thread(
rollout_path: &Path,
archived: bool,
) {
let runtime =
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
let runtime = codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string())
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(None)
.await
@@ -248,10 +247,9 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() {
let fs_rollout_path = home.join(format!("sessions/2025/01/03/rollout-{ts}-{uuid}.jsonl"));
// Create an empty state DB so lookup takes the DB-first path and then falls back to files.
let _runtime =
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
let _runtime = codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string())
.await
.expect("state db should initialize");
_runtime
.mark_backfill_complete(None)
.await
@@ -279,10 +277,9 @@ async fn assert_state_db_rollout_path(
thread_id: ThreadId,
expected_path: Option<&Path>,
) {
let runtime =
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
let runtime = codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string())
.await
.expect("state db should initialize");
let path = runtime
.find_rollout_path_by_id(thread_id, Some(false))
.await

View File

@@ -7,7 +7,6 @@ use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Timelike;
use chrono::Utc;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
@@ -26,14 +25,10 @@ pub type StateDbHandle = Arc<codex_state::StateRuntime>;
/// Initialize the state runtime for thread state persistence and backfill checks. To only be used
/// inside `core`. The initialization should not be done anywhere else.
pub(crate) async fn init(
config: &Config,
otel: Option<&SessionTelemetry>,
) -> Option<StateDbHandle> {
pub(crate) async fn init(config: &Config) -> Option<StateDbHandle> {
let runtime = match codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
)
.await
{
@@ -43,9 +38,6 @@ pub(crate) async fn init(
"failed to initialize state runtime at {}: {err}",
config.sqlite_home.display()
);
if let Some(otel) = otel {
otel.counter("codex.db.init", 1, &[("status", "init_error")]);
}
return None;
}
};
@@ -62,20 +54,15 @@ pub(crate) async fn init(
if backfill_state.status != codex_state::BackfillStatus::Complete {
let runtime_for_backfill = runtime.clone();
let config = config.clone();
let otel = otel.cloned();
tokio::spawn(async move {
metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config, otel.as_ref())
.await;
metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config).await;
});
}
Some(runtime)
}
/// Get the DB if the feature is enabled and the DB exists.
pub async fn get_state_db(
config: &Config,
otel: Option<&SessionTelemetry>,
) -> Option<StateDbHandle> {
pub async fn get_state_db(config: &Config) -> Option<StateDbHandle> {
let state_path = codex_state::state_db_path(config.sqlite_home.as_path());
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
return None;
@@ -83,7 +70,6 @@ pub async fn get_state_db(
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
)
.await
.ok()?;
@@ -98,13 +84,10 @@ pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Optio
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
return None;
}
let runtime = codex_state::StateRuntime::init(
codex_home.to_path_buf(),
default_provider.to_string(),
None,
)
.await
.ok()?;
let runtime =
codex_state::StateRuntime::init(codex_home.to_path_buf(), default_provider.to_string())
.await
.ok()?;
require_backfill_complete(runtime, codex_home).await
}
@@ -373,7 +356,7 @@ pub async fn reconcile_rollout(
return;
}
let outcome =
match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await {
match metadata::extract_metadata_from_rollout(rollout_path, default_provider).await {
Ok(outcome) => outcome,
Err(err) => {
warn!(
@@ -529,13 +512,7 @@ pub async fn apply_rollout_items(
builder.rollout_path = rollout_path.to_path_buf();
builder.cwd = normalize_cwd_for_state_db(&builder.cwd);
if let Err(err) = ctx
.apply_rollout_items(
&builder,
items,
None,
new_thread_memory_mode,
updated_at_override,
)
.apply_rollout_items(&builder, items, new_thread_memory_mode, updated_at_override)
.await
{
warn!(

View File

@@ -129,7 +129,7 @@ async fn record_stage1_output_usage_for_completed_item(
return;
}
if let Some(db) = state_db::get_state_db(turn_context.config.as_ref(), None).await {
if let Some(db) = state_db::get_state_db(turn_context.config.as_ref()).await {
let _ = db.record_stage1_output_usage(&thread_ids).await;
}
}