From ba57aab13aea0731cbb13789f38db0fc60ee25ac Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 19 May 2026 11:11:41 +0200 Subject: [PATCH] feat: dedicated goal DB (#23300) ## Why Thread goals are moving toward extension-owned runtime behavior, but their persisted state was still stored in the shared state database. This makes the goal store harder to isolate and keeps future storage splits tied to ad hoc runtime plumbing. This PR gives goals their own SQLite database while keeping the existing `StateRuntime` entry point. The goal is to make this the pattern for adding more dedicated runtime databases later. This also reduce load on existing DB and reduce contention ## Limitation Thread preview from goal is not supported anymore. I'm looking into this [EDIT]: solved ## What changed - Added a dedicated `goals_1.sqlite` database with its own `goals_migrations` directory. - Moved `thread_goals` creation into the goals DB migration set. - Dropped the old `thread_goals` table from the main state DB with a normal state migration. There is intentionally no backfill for existing goal rows. - Changed `GoalStore` to be backed only by the goals DB pool. - Removed the old goal-write side effect that filled empty `threads.preview` values from the goal objective. - Added shared runtime DB path metadata so startup, telemetry, `codex doctor`, and repair handling can include future DBs without bespoke path lists. - Updated Bazel compile data so the new goals migration directory is available to `sqlx::migrate!`. ## Verification - `cargo check --tests -p codex-state -p codex-cli -p codex-core -p codex-app-server` - `just fix -p codex-state` - `just fix -p codex-cli` - `just fix -p codex-app-server` --- codex-rs/app-server/src/lib.rs | 5 +- .../thread_goal_processor.rs | 8 ++ .../tests/suite/v2/thread_resume.rs | 12 +- codex-rs/cli/src/doctor.rs | 10 +- codex-rs/cli/src/doctor/output.rs | 12 +- codex-rs/cli/src/doctor/output/detail.rs | 3 + codex-rs/cli/src/state_db_recovery.rs | 10 +- codex-rs/core/src/goals.rs | 30 ++++ codex-rs/core/src/session/tests.rs | 61 ++++++++ codex-rs/state/BUILD.bazel | 2 +- .../goals_migrations/0001_thread_goals.sql | 18 +++ .../migrations/0034_drop_thread_goals.sql | 1 + codex-rs/state/src/lib.rs | 5 + codex-rs/state/src/migrations.rs | 5 + codex-rs/state/src/runtime.rs | 133 +++++++++++++----- codex-rs/state/src/runtime/goals.rs | 65 --------- codex-rs/state/src/runtime/threads.rs | 64 +++++++++ codex-rs/state/src/telemetry.rs | 2 + 18 files changed, 330 insertions(+), 116 deletions(-) create mode 100644 codex-rs/state/goals_migrations/0001_thread_goals.sql create mode 100644 codex-rs/state/migrations/0034_drop_thread_goals.sql diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 142df216b5..b62b2bdc40 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -519,10 +519,9 @@ pub async fn run_main_with_transport_options( let state_db = match rollout_state_db::try_init(&config).await { Ok(state_db) => Some(state_db), Err(err) => { - let state_db_path = codex_state::state_db_path(config.sqlite_home.as_path()); return Err(std::io::Error::other(format!( - "failed to initialize sqlite state db at {}: {err}", - state_db_path.display() + "failed to initialize sqlite state runtime under {}: {err}", + config.sqlite_home.display() ))); } }; diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index 0133114b84..cf1e58345b 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -148,6 +148,7 @@ impl ThreadGoalRequestProcessor { thread.prepare_external_goal_mutation().await; } + let should_set_thread_preview = objective.is_some(); let (goal, previous_status) = (if let Some(objective) = objective { let existing_goal = state_db .thread_goals() @@ -221,6 +222,13 @@ impl ThreadGoalRequestProcessor { .map(|goal| (goal, previous_status)) }) .map_err(|err| invalid_request(err.to_string()))?; + if should_set_thread_preview + && let Err(err) = state_db + .set_thread_preview_if_empty(thread_id, goal.objective.as_str()) + .await + { + warn!("failed to set empty thread preview from goal objective for {thread_id}: {err}"); + } let external_goal_set = ExternalGoalSet { goal: goal.clone(), previous_status, diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index c9a5ae1485..8bcad54efe 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -994,7 +994,7 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()> codex_home.path(), "2025-01-05T12-00-00", "2025-01-05T12:00:00Z", - "materialized thread", + "", Some("mock_provider"), /*git_info*/ None, )?; @@ -1028,6 +1028,11 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()> let state_db = StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?; let thread_id = ThreadId::from_string(&thread_id)?; + let thread_metadata = state_db + .get_thread(thread_id) + .await? + .expect("thread metadata should exist"); + assert_eq!(thread_metadata.preview.as_deref(), Some("keep polishing")); let persisted_goal = state_db .thread_goals() .get_thread_goal(thread_id) @@ -1066,8 +1071,13 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()> .get_thread_goal(thread_id) .await? .expect("goal should still exist"); + let thread_metadata = state_db + .get_thread(thread_id) + .await? + .expect("thread metadata should still exist"); assert_eq!(persisted_goal.goal_id, updated_goal.goal_id); + assert_eq!(thread_metadata.preview.as_deref(), Some("keep polishing")); assert_eq!(edit.goal.objective, "keep polishing with clearer wording"); assert_eq!(edit.goal.status, ThreadGoalStatus::BudgetLimited); assert_eq!(edit.goal.token_budget, Some(40)); diff --git a/codex-rs/cli/src/doctor.rs b/codex-rs/cli/src/doctor.rs index f2b1bca6d7..aec57c1e82 100644 --- a/codex-rs/cli/src/doctor.rs +++ b/codex-rs/cli/src/doctor.rs @@ -1943,13 +1943,11 @@ async fn state_check(config: &Config) -> DoctorCheck { path_readiness(&mut details, "CODEX_HOME", &config.codex_home); path_readiness(&mut details, "log dir", &config.log_dir); path_readiness(&mut details, "sqlite home", &config.sqlite_home); - let state_db = codex_state::state_db_path(&config.sqlite_home); - let log_db = codex_state::logs_db_path(&config.sqlite_home); - path_readiness(&mut details, "state DB", &state_db); - path_readiness(&mut details, "log DB", &log_db); let mut integrity_failures = Vec::new(); - sqlite_integrity_detail(&mut details, &mut integrity_failures, "state DB", &state_db).await; - sqlite_integrity_detail(&mut details, &mut integrity_failures, "log DB", &log_db).await; + for db in codex_state::runtime_db_paths(&config.sqlite_home) { + path_readiness(&mut details, db.label, &db.path); + sqlite_integrity_detail(&mut details, &mut integrity_failures, db.label, &db.path).await; + } rollout_stats_details(&mut details, &config.codex_home); standalone_release_cache_details(&mut details); diff --git a/codex-rs/cli/src/doctor/output.rs b/codex-rs/cli/src/doctor/output.rs index da5fd20ce8..2e58b1136a 100644 --- a/codex-rs/cli/src/doctor/output.rs +++ b/codex-rs/cli/src/doctor/output.rs @@ -669,10 +669,14 @@ fn terminal_summary(check: &DoctorCheck) -> String { } fn state_summary(check: &DoctorCheck) -> String { - let state_ok = - detail::detail_value(check, "state DB integrity").is_some_and(|value| value == "ok"); - let log_ok = detail::detail_value(check, "log DB integrity").is_some_and(|value| value == "ok"); - if state_ok && log_ok { + let databases_ok = [ + "state DB integrity", + "log DB integrity", + "goals DB integrity", + ] + .into_iter() + .all(|label| detail::detail_value(check, label).is_some_and(|value| value == "ok")); + if check.status == CheckStatus::Ok && databases_ok { "databases healthy".to_string() } else { check.summary.clone() diff --git a/codex-rs/cli/src/doctor/output/detail.rs b/codex-rs/cli/src/doctor/output/detail.rs index 4b5f49991b..0105077620 100644 --- a/codex-rs/cli/src/doctor/output/detail.rs +++ b/codex-rs/cli/src/doctor/output/detail.rs @@ -294,6 +294,7 @@ fn state_details(parsed: &[ParsedDetail]) -> Vec { push_row_if_present(&mut out, parsed, "sqlite home", "sqlite home"); push_database_row(&mut out, parsed, "state DB"); push_database_row(&mut out, parsed, "log DB"); + push_database_row(&mut out, parsed, "goals DB"); for (source, label) in [ ("active rollout files", "active rollouts"), @@ -317,8 +318,10 @@ fn state_details(parsed: &[ParsedDetail]) -> Vec { "sqlite home", "state DB", "log DB", + "goals DB", "state DB integrity", "log DB integrity", + "goals DB integrity", "active rollout files", "archived rollout files", ], diff --git a/codex-rs/cli/src/state_db_recovery.rs b/codex-rs/cli/src/state_db_recovery.rs index 8db134540a..7aeffaca3a 100644 --- a/codex-rs/cli/src/state_db_recovery.rs +++ b/codex-rs/cli/src/state_db_recovery.rs @@ -48,10 +48,9 @@ pub(crate) async fn repair_files( Err(err) => return Err(err), } - let logs_db_path = codex_state::logs_db_path(sqlite_home); - for path in sqlite_paths(state_db_path) + for path in codex_state::runtime_db_paths(sqlite_home) .into_iter() - .chain(sqlite_paths(logs_db_path.as_path())) + .flat_map(|db| sqlite_paths(db.path.as_path())) { if tokio::fs::try_exists(path.as_path()).await? { backups.push(backup_path(path.as_path(), &repair_suffix).await?); @@ -137,19 +136,22 @@ mod tests { let temp_dir = TempDir::new()?; let state_path = codex_state::state_db_path(temp_dir.path()); let logs_path = codex_state::logs_db_path(temp_dir.path()); + let goals_path = codex_state::goals_db_path(temp_dir.path()); let state_sidecars = sqlite_paths(state_path.as_path()); tokio::fs::write(state_path.as_path(), b"state").await?; tokio::fs::write(state_sidecars[1].as_path(), b"state-wal").await?; tokio::fs::write(logs_path.as_path(), b"logs").await?; + tokio::fs::write(goals_path.as_path(), b"goals").await?; let startup_error = LocalStateDbStartupError::new(state_path.clone(), "corrupt".to_string()); let backups = repair_files(&startup_error).await?; - assert_eq!(backups.len(), 3); + assert_eq!(backups.len(), 4); assert!(!tokio::fs::try_exists(state_path.as_path()).await?); assert!(!tokio::fs::try_exists(state_sidecars[1].as_path()).await?); assert!(!tokio::fs::try_exists(logs_path.as_path()).await?); + assert!(!tokio::fs::try_exists(goals_path.as_path()).await?); for backup in backups { assert!(tokio::fs::try_exists(backup.as_path()).await?); } diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index 13f8cc95c8..4b3e237d53 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -23,6 +23,7 @@ use codex_otel::GOAL_DURATION_SECONDS_METRIC; use codex_otel::GOAL_RESUMED_METRIC; use codex_otel::GOAL_TOKEN_COUNT_METRIC; use codex_otel::GOAL_USAGE_LIMITED_METRIC; +use codex_protocol::ThreadId; use codex_protocol::config_types::ModeKind; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -530,6 +531,14 @@ impl Session { })? }; + if objective.is_some() { + set_thread_preview_from_goal_objective( + &state_db, + self.conversation_id, + goal.objective.as_str(), + ) + .await; + } let goal_status = goal.status; let goal_id = goal.goal_id.clone(); let previous_status_for_goal = if replacing_goal { @@ -611,6 +620,12 @@ impl Session { ) })?; + set_thread_preview_from_goal_objective( + &state_db, + self.conversation_id, + goal.objective.as_str(), + ) + .await; let goal_id = goal.goal_id.clone(); self.emit_goal_created_metric(); let goal = protocol_goal_from_state(goal); @@ -1508,6 +1523,21 @@ impl Session { } } +async fn set_thread_preview_from_goal_objective( + state_db: &StateDbHandle, + thread_id: ThreadId, + objective: &str, +) { + if let Err(err) = state_db + .set_thread_preview_if_empty(thread_id, objective) + .await + { + tracing::warn!( + "failed to set empty thread preview from goal objective for {thread_id}: {err}" + ); + } +} + fn should_ignore_goal_for_mode(mode: ModeKind) -> bool { mode == ModeKind::Plan } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 9d10f7b90b..c3528be8fc 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -54,6 +54,7 @@ use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; use tracing::Span; +use crate::goals::CreateGoalRequest; use crate::goals::ExternalGoalPreviousStatus; use crate::goals::ExternalGoalSet; use crate::goals::GoalRuntimeEvent; @@ -8415,6 +8416,66 @@ async fn goal_test_state_db(sess: &Session) -> anyhow::Result anyhow::Result<()> { + let (sess, tc, _rx, _codex_home) = make_goal_session_and_context_with_rx().await; + let state_db = goal_test_state_db(sess.as_ref()).await?; + + let page = state_db + .list_threads( + /*page_size*/ 10, + codex_state::ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: None, + anchor: None, + sort_key: codex_state::SortKey::UpdatedAt, + sort_direction: codex_state::SortDirection::Desc, + search_term: None, + }, + ) + .await?; + assert!(page.items.is_empty()); + + sess.create_thread_goal( + tc.as_ref(), + CreateGoalRequest { + objective: "Keep improving the benchmark".to_string(), + token_budget: None, + }, + ) + .await?; + + let page = state_db + .list_threads( + /*page_size*/ 10, + codex_state::ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: None, + anchor: None, + sort_key: codex_state::SortKey::UpdatedAt, + sort_direction: codex_state::SortDirection::Desc, + search_term: None, + }, + ) + .await?; + let ids = page + .items + .iter() + .map(|thread| thread.id) + .collect::>(); + assert_eq!(vec![sess.conversation_id], ids); + assert_eq!( + Some("Keep improving the benchmark"), + page.items[0].preview.as_deref() + ); + + Ok(()) +} + #[tokio::test] async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyhow::Result<()> { let (sess, tc, rx, _codex_home) = make_goal_session_and_context_with_rx().await; diff --git a/codex-rs/state/BUILD.bazel b/codex-rs/state/BUILD.bazel index e10ac17c90..b3f0fecab7 100644 --- a/codex-rs/state/BUILD.bazel +++ b/codex-rs/state/BUILD.bazel @@ -3,5 +3,5 @@ load("//:defs.bzl", "codex_rust_crate") codex_rust_crate( name = "state", crate_name = "codex_state", - compile_data = glob(["logs_migrations/**", "migrations/**"]), + compile_data = glob(["goals_migrations/**", "logs_migrations/**", "migrations/**"]), ) diff --git a/codex-rs/state/goals_migrations/0001_thread_goals.sql b/codex-rs/state/goals_migrations/0001_thread_goals.sql new file mode 100644 index 0000000000..943226ee8a --- /dev/null +++ b/codex-rs/state/goals_migrations/0001_thread_goals.sql @@ -0,0 +1,18 @@ +CREATE TABLE thread_goals ( + thread_id TEXT PRIMARY KEY NOT NULL, + goal_id TEXT NOT NULL, + objective TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ( + 'active', + 'paused', + 'blocked', + 'usage_limited', + 'budget_limited', + 'complete' + )), + token_budget INTEGER, + tokens_used INTEGER NOT NULL DEFAULT 0, + time_used_seconds INTEGER NOT NULL DEFAULT 0, + created_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL +); diff --git a/codex-rs/state/migrations/0034_drop_thread_goals.sql b/codex-rs/state/migrations/0034_drop_thread_goals.sql new file mode 100644 index 0000000000..8954bd6d5b --- /dev/null +++ b/codex-rs/state/migrations/0034_drop_thread_goals.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS thread_goals; diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index aeb8b2d13b..8746ff3a0d 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -50,12 +50,16 @@ pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; pub use runtime::GoalStore; pub use runtime::RemoteControlEnrollmentRecord; +pub use runtime::RuntimeDbPath; pub use runtime::ThreadFilterOptions; pub use runtime::ThreadGoalAccountingMode; pub use runtime::ThreadGoalAccountingOutcome; pub use runtime::ThreadGoalUpdate; +pub use runtime::goals_db_filename; +pub use runtime::goals_db_path; pub use runtime::logs_db_filename; pub use runtime::logs_db_path; +pub use runtime::runtime_db_paths; pub use runtime::sqlite_integrity_check; pub use runtime::state_db_filename; pub use runtime::state_db_path; @@ -69,6 +73,7 @@ pub use telemetry::record_fallback; pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME"; pub const LOGS_DB_FILENAME: &str = "logs_2.sqlite"; +pub const GOALS_DB_FILENAME: &str = "goals_1.sqlite"; pub const STATE_DB_FILENAME: &str = "state_5.sqlite"; /// Errors encountered during DB operations. Tags: [stage] diff --git a/codex-rs/state/src/migrations.rs b/codex-rs/state/src/migrations.rs index 883129a943..526e958b5a 100644 --- a/codex-rs/state/src/migrations.rs +++ b/codex-rs/state/src/migrations.rs @@ -4,6 +4,7 @@ use sqlx::migrate::Migrator; pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations"); pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations"); +pub(crate) static GOALS_MIGRATOR: Migrator = sqlx::migrate!("./goals_migrations"); /// Allow an older Codex binary to open a database that has already been /// migrated by a newer binary running in parallel. @@ -27,3 +28,7 @@ pub(crate) fn runtime_state_migrator() -> Migrator { pub(crate) fn runtime_logs_migrator() -> Migrator { runtime_migrator(&LOGS_MIGRATOR) } + +pub(crate) fn runtime_goals_migrator() -> Migrator { + runtime_migrator(&GOALS_MIGRATOR) +} diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index e95cc7cc8a..ac03f227da 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -5,6 +5,7 @@ use crate::AgentJobItemCreateParams; use crate::AgentJobItemStatus; use crate::AgentJobProgress; use crate::AgentJobStatus; +use crate::GOALS_DB_FILENAME; use crate::LOGS_DB_FILENAME; use crate::LogEntry; use crate::LogQuery; @@ -15,6 +16,7 @@ use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; use crate::ThreadsPage; use crate::apply_rollout_item; +use crate::migrations::runtime_goals_migrator; use crate::migrations::runtime_logs_migrator; use crate::migrations::runtime_state_migrator; use crate::model::AgentJobRow; @@ -80,6 +82,53 @@ pub use threads::ThreadFilterOptions; const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024; const LOG_PARTITION_ROW_LIMIT: i64 = 1_000; +#[derive(Clone, Copy)] +struct RuntimeDbSpec { + label: &'static str, + filename: &'static str, + kind: DbKind, + open_phase: &'static str, + migrate_phase: &'static str, +} + +impl RuntimeDbSpec { + fn path(self, codex_home: &Path) -> PathBuf { + codex_home.join(self.filename) + } +} + +const STATE_DB: RuntimeDbSpec = RuntimeDbSpec { + label: "state DB", + filename: STATE_DB_FILENAME, + kind: DbKind::State, + open_phase: "open_state", + migrate_phase: "migrate_state", +}; + +const LOGS_DB: RuntimeDbSpec = RuntimeDbSpec { + label: "log DB", + filename: LOGS_DB_FILENAME, + kind: DbKind::Logs, + open_phase: "open_logs", + migrate_phase: "migrate_logs", +}; + +const GOALS_DB: RuntimeDbSpec = RuntimeDbSpec { + label: "goals DB", + filename: GOALS_DB_FILENAME, + kind: DbKind::Goals, + open_phase: "open_goals", + migrate_phase: "migrate_goals", +}; + +const RUNTIME_DBS: [RuntimeDbSpec; 3] = [STATE_DB, LOGS_DB, GOALS_DB]; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RuntimeDbPath { + pub label: &'static str, + pub path: PathBuf, +} + #[derive(Clone)] pub struct StateRuntime { codex_home: PathBuf, @@ -122,8 +171,10 @@ impl StateRuntime { tokio::fs::create_dir_all(&codex_home).await?; let state_migrator = runtime_state_migrator(); let logs_migrator = runtime_logs_migrator(); - let state_path = state_db_path(codex_home.as_path()); - let logs_path = logs_db_path(codex_home.as_path()); + let goals_migrator = runtime_goals_migrator(); + let state_path = STATE_DB.path(codex_home.as_path()); + let logs_path = LOGS_DB.path(codex_home.as_path()); + let goals_path = GOALS_DB.path(codex_home.as_path()); let pool = match open_state_sqlite(&state_path, &state_migrator, telemetry_override).await { Ok(db) => Arc::new(db), Err(err) => { @@ -139,6 +190,14 @@ impl StateRuntime { return Err(err); } }; + let goals_pool = + match open_goals_sqlite(&goals_path, &goals_migrator, telemetry_override).await { + Ok(db) => Arc::new(db), + Err(err) => { + warn!("failed to open goals db at {}: {err}", goals_path.display()); + return Err(err); + } + }; let started = Instant::now(); let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await; crate::telemetry::record_init_result( @@ -165,7 +224,7 @@ impl StateRuntime { let thread_updated_at_millis = thread_updated_at_millis_result?; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); let runtime = Arc::new(Self { - thread_goals: GoalStore::new(Arc::clone(&pool)), + thread_goals: GoalStore::new(Arc::clone(&goals_pool)), pool, logs_pool, codex_home, @@ -209,15 +268,7 @@ async fn open_state_sqlite( // New state DBs should use incremental auto-vacuum, but retrofitting an // existing DB requires a full VACUUM. Do not attempt that during process // startup: it is maintenance work that can contend with foreground writers. - open_sqlite( - path, - migrator, - DbKind::State, - "open_state", - "migrate_state", - telemetry_override, - ) - .await + open_sqlite(path, migrator, STATE_DB, telemetry_override).await } async fn open_logs_sqlite( @@ -225,23 +276,21 @@ async fn open_logs_sqlite( migrator: &Migrator, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { - open_sqlite( - path, - migrator, - DbKind::Logs, - "open_logs", - "migrate_logs", - telemetry_override, - ) - .await + open_sqlite(path, migrator, LOGS_DB, telemetry_override).await +} + +async fn open_goals_sqlite( + path: &Path, + migrator: &Migrator, + telemetry_override: Option<&dyn DbTelemetry>, +) -> anyhow::Result { + open_sqlite(path, migrator, GOALS_DB, telemetry_override).await } async fn open_sqlite( path: &Path, migrator: &Migrator, - db: DbKind, - open_phase: &'static str, - migrate_phase: &'static str, + spec: RuntimeDbSpec, telemetry_override: Option<&dyn DbTelemetry>, ) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); @@ -253,8 +302,8 @@ async fn open_sqlite( .map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, - db, - open_phase, + spec.kind, + spec.open_phase, started.elapsed(), &pool_result, ); @@ -263,8 +312,8 @@ async fn open_sqlite( let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, - db, - migrate_phase, + spec.kind, + spec.migrate_phase, started.elapsed(), &migrate_result, ); @@ -291,19 +340,37 @@ ON CONFLICT(id) DO NOTHING } pub fn state_db_filename() -> String { - STATE_DB_FILENAME.to_string() + STATE_DB.filename.to_string() } pub fn state_db_path(codex_home: &Path) -> PathBuf { - codex_home.join(state_db_filename()) + STATE_DB.path(codex_home) } pub fn logs_db_filename() -> String { - LOGS_DB_FILENAME.to_string() + LOGS_DB.filename.to_string() } pub fn logs_db_path(codex_home: &Path) -> PathBuf { - codex_home.join(logs_db_filename()) + LOGS_DB.path(codex_home) +} + +pub fn goals_db_filename() -> String { + GOALS_DB.filename.to_string() +} + +pub fn goals_db_path(codex_home: &Path) -> PathBuf { + GOALS_DB.path(codex_home) +} + +pub fn runtime_db_paths(codex_home: &Path) -> Vec { + RUNTIME_DBS + .iter() + .map(|spec| RuntimeDbPath { + label: spec.label, + path: spec.path(codex_home), + }) + .collect() } /// Run SQLite's built-in integrity check against an existing database file. @@ -510,6 +577,8 @@ mod tests { "migrate_state", "open_logs", "migrate_logs", + "open_goals", + "migrate_goals", "ensure_backfill_state", "post_init_query", ] diff --git a/codex-rs/state/src/runtime/goals.rs b/codex-rs/state/src/runtime/goals.rs index 9de561ef2d..bf5a7bd825 100644 --- a/codex-rs/state/src/runtime/goals.rs +++ b/codex-rs/state/src/runtime/goals.rs @@ -115,8 +115,6 @@ RETURNING .fetch_one(self.pool.as_ref()) .await?; - self.set_thread_preview_if_empty(thread_id, objective) - .await?; thread_goal_from_row(&row) } @@ -166,10 +164,6 @@ RETURNING .fetch_optional(self.pool.as_ref()) .await?; - if row.is_some() { - self.set_thread_preview_if_empty(thread_id, objective) - .await?; - } row.map(|row| thread_goal_from_row(&row)).transpose() } @@ -478,29 +472,6 @@ RETURNING let updated = thread_goal_from_row(&row)?; Ok(ThreadGoalAccountingOutcome::Updated(updated)) } - - async fn set_thread_preview_if_empty( - &self, - thread_id: ThreadId, - preview: &str, - ) -> anyhow::Result<()> { - let preview = preview.trim(); - if preview.is_empty() { - return Ok(()); - } - sqlx::query( - r#" -UPDATE threads -SET preview = ? -WHERE id = ? AND preview = '' - "#, - ) - .bind(preview) - .bind(thread_id.to_string()) - .execute(self.pool.as_ref()) - .await?; - Ok(()) - } } fn thread_goal_from_row(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result { @@ -643,42 +614,6 @@ mod tests { ); } - #[tokio::test] - async fn replace_thread_goal_sets_preview_when_empty() { - let runtime = test_runtime().await; - let thread_id = test_thread_id(); - let mut metadata = test_thread_metadata( - runtime.codex_home(), - thread_id, - runtime.codex_home().join("workspace"), - ); - metadata.preview = None; - metadata.first_user_message = None; - runtime - .upsert_thread(&metadata) - .await - .expect("test thread should be upserted"); - - runtime - .thread_goals() - .replace_thread_goal( - thread_id, - "optimize the benchmark", - crate::ThreadGoalStatus::Active, - /*token_budget*/ None, - ) - .await - .expect("goal replacement should succeed"); - - let metadata = runtime - .get_thread(thread_id) - .await - .expect("thread metadata should load") - .expect("thread should exist"); - assert_eq!(metadata.preview.as_deref(), Some("optimize the benchmark")); - assert_eq!(metadata.first_user_message, None); - } - #[tokio::test] async fn replace_thread_goal_applies_budget_limit_immediately() { let runtime = test_runtime().await; diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index c8c48db721..c7030d7e44 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -51,6 +51,29 @@ WHERE threads.id = ? Ok(row.and_then(|row| row.try_get("memory_mode").ok())) } + pub async fn set_thread_preview_if_empty( + &self, + thread_id: ThreadId, + preview: &str, + ) -> anyhow::Result { + let preview = preview.trim(); + if preview.is_empty() { + return Ok(false); + } + let result = sqlx::query( + r#" +UPDATE threads +SET preview = ? +WHERE id = ? AND preview = '' + "#, + ) + .bind(preview) + .bind(thread_id.to_string()) + .execute(self.pool.as_ref()) + .await?; + Ok(result.rows_affected() > 0) + } + /// Get dynamic tools for a thread, if present. pub async fn get_dynamic_tools( &self, @@ -1569,6 +1592,47 @@ mod tests { assert_eq!(persisted.preview.as_deref(), Some("migrated goal preview")); } + #[tokio::test] + async fn set_thread_preview_if_empty_only_fills_blank_preview() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000460").expect("valid thread id"); + let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone()); + metadata.first_user_message = None; + metadata.preview = None; + + runtime + .upsert_thread(&metadata) + .await + .expect("initial upsert should succeed"); + + let empty_updated = runtime + .set_thread_preview_if_empty(thread_id, " ") + .await + .expect("empty preview update should succeed"); + assert!(!empty_updated); + let goal_updated = runtime + .set_thread_preview_if_empty(thread_id, " goal preview ") + .await + .expect("goal preview update should succeed"); + assert!(goal_updated); + let overwrite_updated = runtime + .set_thread_preview_if_empty(thread_id, "new preview") + .await + .expect("overwrite preview update should succeed"); + assert!(!overwrite_updated); + + let persisted = runtime + .get_thread(thread_id) + .await + .expect("thread should load") + .expect("thread should exist"); + assert_eq!(persisted.preview.as_deref(), Some("goal preview")); + } + #[tokio::test] async fn update_thread_git_info_preserves_newer_non_git_metadata() { let codex_home = unique_temp_dir(); diff --git a/codex-rs/state/src/telemetry.rs b/codex-rs/state/src/telemetry.rs index a4a26db0ce..da2b7de7a8 100644 --- a/codex-rs/state/src/telemetry.rs +++ b/codex-rs/state/src/telemetry.rs @@ -39,6 +39,7 @@ pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool { pub(crate) enum DbKind { State, Logs, + Goals, } impl DbKind { @@ -46,6 +47,7 @@ impl DbKind { match self { Self::State => "state", Self::Logs => "logs", + Self::Goals => "goals", } } }