feat: dedicated goal DB

This commit is contained in:
jif-oai
2026-05-18 14:47:55 +02:00
parent 7ee7fe239f
commit fd1bdda453
11 changed files with 138 additions and 111 deletions

View File

@@ -517,10 +517,9 @@ pub async fn run_main_with_transport_options(
let state_db = match rollout_state_db::try_init(&config).await {
Ok(state_db) => Some(state_db),
Err(err) => {
let state_db_path = codex_state::state_db_path(config.sqlite_home.as_path());
return Err(std::io::Error::other(format!(
"failed to initialize sqlite state db at {}: {err}",
state_db_path.display()
"failed to initialize sqlite state runtime under {}: {err}",
config.sqlite_home.display()
)));
}
};

View File

@@ -1943,13 +1943,11 @@ async fn state_check(config: &Config) -> DoctorCheck {
path_readiness(&mut details, "CODEX_HOME", &config.codex_home);
path_readiness(&mut details, "log dir", &config.log_dir);
path_readiness(&mut details, "sqlite home", &config.sqlite_home);
let state_db = codex_state::state_db_path(&config.sqlite_home);
let log_db = codex_state::logs_db_path(&config.sqlite_home);
path_readiness(&mut details, "state DB", &state_db);
path_readiness(&mut details, "log DB", &log_db);
let mut integrity_failures = Vec::new();
sqlite_integrity_detail(&mut details, &mut integrity_failures, "state DB", &state_db).await;
sqlite_integrity_detail(&mut details, &mut integrity_failures, "log DB", &log_db).await;
for db in codex_state::runtime_db_paths(&config.sqlite_home) {
path_readiness(&mut details, db.label, &db.path);
sqlite_integrity_detail(&mut details, &mut integrity_failures, db.label, &db.path).await;
}
rollout_stats_details(&mut details, &config.codex_home);
standalone_release_cache_details(&mut details);

View File

@@ -48,10 +48,9 @@ pub(crate) async fn repair_files(
Err(err) => return Err(err),
}
let logs_db_path = codex_state::logs_db_path(sqlite_home);
for path in sqlite_paths(state_db_path)
for path in codex_state::runtime_db_paths(sqlite_home)
.into_iter()
.chain(sqlite_paths(logs_db_path.as_path()))
.flat_map(|db| sqlite_paths(db.path.as_path()))
{
if tokio::fs::try_exists(path.as_path()).await? {
backups.push(backup_path(path.as_path(), &repair_suffix).await?);
@@ -137,19 +136,22 @@ mod tests {
let temp_dir = TempDir::new()?;
let state_path = codex_state::state_db_path(temp_dir.path());
let logs_path = codex_state::logs_db_path(temp_dir.path());
let goals_path = codex_state::goals_db_path(temp_dir.path());
let state_sidecars = sqlite_paths(state_path.as_path());
tokio::fs::write(state_path.as_path(), b"state").await?;
tokio::fs::write(state_sidecars[1].as_path(), b"state-wal").await?;
tokio::fs::write(logs_path.as_path(), b"logs").await?;
tokio::fs::write(goals_path.as_path(), b"goals").await?;
let startup_error =
LocalStateDbStartupError::new(state_path.clone(), "corrupt".to_string());
let backups = repair_files(&startup_error).await?;
assert_eq!(backups.len(), 3);
assert_eq!(backups.len(), 4);
assert!(!tokio::fs::try_exists(state_path.as_path()).await?);
assert!(!tokio::fs::try_exists(state_sidecars[1].as_path()).await?);
assert!(!tokio::fs::try_exists(logs_path.as_path()).await?);
assert!(!tokio::fs::try_exists(goals_path.as_path()).await?);
for backup in backups {
assert!(tokio::fs::try_exists(backup.as_path()).await?);
}

View File

@@ -3,5 +3,5 @@ load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "state",
crate_name = "codex_state",
compile_data = glob(["logs_migrations/**", "migrations/**"]),
compile_data = glob(["goals_migrations/**", "logs_migrations/**", "migrations/**"]),
)

View File

@@ -0,0 +1,11 @@
CREATE TABLE thread_goals (
thread_id TEXT PRIMARY KEY NOT NULL,
goal_id TEXT NOT NULL,
objective TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('active', 'paused', 'budget_limited', 'complete')),
token_budget INTEGER,
tokens_used INTEGER NOT NULL DEFAULT 0,
time_used_seconds INTEGER NOT NULL DEFAULT 0,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
);

View File

@@ -0,0 +1 @@
DROP TABLE IF EXISTS thread_goals;

View File

@@ -50,12 +50,16 @@ pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
pub use runtime::GoalStore;
pub use runtime::RemoteControlEnrollmentRecord;
pub use runtime::RuntimeDbPath;
pub use runtime::ThreadFilterOptions;
pub use runtime::ThreadGoalAccountingMode;
pub use runtime::ThreadGoalAccountingOutcome;
pub use runtime::ThreadGoalUpdate;
pub use runtime::goals_db_filename;
pub use runtime::goals_db_path;
pub use runtime::logs_db_filename;
pub use runtime::logs_db_path;
pub use runtime::runtime_db_paths;
pub use runtime::sqlite_integrity_check;
pub use runtime::state_db_filename;
pub use runtime::state_db_path;
@@ -69,6 +73,7 @@ pub use telemetry::record_fallback;
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
pub const LOGS_DB_FILENAME: &str = "logs_2.sqlite";
pub const GOALS_DB_FILENAME: &str = "goals_1.sqlite";
pub const STATE_DB_FILENAME: &str = "state_5.sqlite";
/// Errors encountered during DB operations. Tags: [stage]

View File

@@ -4,6 +4,7 @@ use sqlx::migrate::Migrator;
pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations");
pub(crate) static GOALS_MIGRATOR: Migrator = sqlx::migrate!("./goals_migrations");
/// Allow an older Codex binary to open a database that has already been
/// migrated by a newer binary running in parallel.
@@ -27,3 +28,7 @@ pub(crate) fn runtime_state_migrator() -> Migrator {
pub(crate) fn runtime_logs_migrator() -> Migrator {
runtime_migrator(&LOGS_MIGRATOR)
}
pub(crate) fn runtime_goals_migrator() -> Migrator {
runtime_migrator(&GOALS_MIGRATOR)
}

View File

@@ -5,6 +5,7 @@ use crate::AgentJobItemCreateParams;
use crate::AgentJobItemStatus;
use crate::AgentJobProgress;
use crate::AgentJobStatus;
use crate::GOALS_DB_FILENAME;
use crate::LOGS_DB_FILENAME;
use crate::LogEntry;
use crate::LogQuery;
@@ -15,6 +16,7 @@ use crate::ThreadMetadata;
use crate::ThreadMetadataBuilder;
use crate::ThreadsPage;
use crate::apply_rollout_item;
use crate::migrations::runtime_goals_migrator;
use crate::migrations::runtime_logs_migrator;
use crate::migrations::runtime_state_migrator;
use crate::model::AgentJobRow;
@@ -80,6 +82,53 @@ pub use threads::ThreadFilterOptions;
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
#[derive(Clone, Copy)]
struct RuntimeDbSpec {
label: &'static str,
filename: &'static str,
kind: DbKind,
open_phase: &'static str,
migrate_phase: &'static str,
}
impl RuntimeDbSpec {
fn path(self, codex_home: &Path) -> PathBuf {
codex_home.join(self.filename)
}
}
const STATE_DB: RuntimeDbSpec = RuntimeDbSpec {
label: "state DB",
filename: STATE_DB_FILENAME,
kind: DbKind::State,
open_phase: "open_state",
migrate_phase: "migrate_state",
};
const LOGS_DB: RuntimeDbSpec = RuntimeDbSpec {
label: "log DB",
filename: LOGS_DB_FILENAME,
kind: DbKind::Logs,
open_phase: "open_logs",
migrate_phase: "migrate_logs",
};
const GOALS_DB: RuntimeDbSpec = RuntimeDbSpec {
label: "goals DB",
filename: GOALS_DB_FILENAME,
kind: DbKind::Goals,
open_phase: "open_goals",
migrate_phase: "migrate_goals",
};
const RUNTIME_DBS: [RuntimeDbSpec; 3] = [STATE_DB, LOGS_DB, GOALS_DB];
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RuntimeDbPath {
pub label: &'static str,
pub path: PathBuf,
}
#[derive(Clone)]
pub struct StateRuntime {
codex_home: PathBuf,
@@ -122,8 +171,10 @@ impl StateRuntime {
tokio::fs::create_dir_all(&codex_home).await?;
let state_migrator = runtime_state_migrator();
let logs_migrator = runtime_logs_migrator();
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let goals_migrator = runtime_goals_migrator();
let state_path = STATE_DB.path(codex_home.as_path());
let logs_path = LOGS_DB.path(codex_home.as_path());
let goals_path = GOALS_DB.path(codex_home.as_path());
let pool = match open_state_sqlite(&state_path, &state_migrator, telemetry_override).await {
Ok(db) => Arc::new(db),
Err(err) => {
@@ -139,6 +190,14 @@ impl StateRuntime {
return Err(err);
}
};
let goals_pool =
match open_goals_sqlite(&goals_path, &goals_migrator, telemetry_override).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open goals db at {}: {err}", goals_path.display());
return Err(err);
}
};
let started = Instant::now();
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
crate::telemetry::record_init_result(
@@ -165,7 +224,7 @@ impl StateRuntime {
let thread_updated_at_millis = thread_updated_at_millis_result?;
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
let runtime = Arc::new(Self {
thread_goals: GoalStore::new(Arc::clone(&pool)),
thread_goals: GoalStore::new(Arc::clone(&goals_pool)),
pool,
logs_pool,
codex_home,
@@ -209,15 +268,7 @@ async fn open_state_sqlite(
// New state DBs should use incremental auto-vacuum, but retrofitting an
// existing DB requires a full VACUUM. Do not attempt that during process
// startup: it is maintenance work that can contend with foreground writers.
open_sqlite(
path,
migrator,
DbKind::State,
"open_state",
"migrate_state",
telemetry_override,
)
.await
open_sqlite(path, migrator, STATE_DB, telemetry_override).await
}
async fn open_logs_sqlite(
@@ -225,23 +276,21 @@ async fn open_logs_sqlite(
migrator: &Migrator,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
open_sqlite(
path,
migrator,
DbKind::Logs,
"open_logs",
"migrate_logs",
telemetry_override,
)
.await
open_sqlite(path, migrator, LOGS_DB, telemetry_override).await
}
async fn open_goals_sqlite(
path: &Path,
migrator: &Migrator,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
open_sqlite(path, migrator, GOALS_DB, telemetry_override).await
}
async fn open_sqlite(
path: &Path,
migrator: &Migrator,
db: DbKind,
open_phase: &'static str,
migrate_phase: &'static str,
spec: RuntimeDbSpec,
telemetry_override: Option<&dyn DbTelemetry>,
) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
@@ -253,8 +302,8 @@ async fn open_sqlite(
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
telemetry_override,
db,
open_phase,
spec.kind,
spec.open_phase,
started.elapsed(),
&pool_result,
);
@@ -263,8 +312,8 @@ async fn open_sqlite(
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
telemetry_override,
db,
migrate_phase,
spec.kind,
spec.migrate_phase,
started.elapsed(),
&migrate_result,
);
@@ -291,19 +340,37 @@ ON CONFLICT(id) DO NOTHING
}
pub fn state_db_filename() -> String {
STATE_DB_FILENAME.to_string()
STATE_DB.filename.to_string()
}
pub fn state_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(state_db_filename())
STATE_DB.path(codex_home)
}
pub fn logs_db_filename() -> String {
LOGS_DB_FILENAME.to_string()
LOGS_DB.filename.to_string()
}
pub fn logs_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(logs_db_filename())
LOGS_DB.path(codex_home)
}
pub fn goals_db_filename() -> String {
GOALS_DB.filename.to_string()
}
pub fn goals_db_path(codex_home: &Path) -> PathBuf {
GOALS_DB.path(codex_home)
}
pub fn runtime_db_paths(codex_home: &Path) -> Vec<RuntimeDbPath> {
RUNTIME_DBS
.iter()
.map(|spec| RuntimeDbPath {
label: spec.label,
path: spec.path(codex_home),
})
.collect()
}
/// Run SQLite's built-in integrity check against an existing database file.
@@ -510,6 +577,8 @@ mod tests {
"migrate_state",
"open_logs",
"migrate_logs",
"open_goals",
"migrate_goals",
"ensure_backfill_state",
"post_init_query",
]

View File

@@ -115,8 +115,6 @@ RETURNING
.fetch_one(self.pool.as_ref())
.await?;
self.set_thread_preview_if_empty(thread_id, objective)
.await?;
thread_goal_from_row(&row)
}
@@ -166,10 +164,6 @@ RETURNING
.fetch_optional(self.pool.as_ref())
.await?;
if row.is_some() {
self.set_thread_preview_if_empty(thread_id, objective)
.await?;
}
row.map(|row| thread_goal_from_row(&row)).transpose()
}
@@ -452,29 +446,6 @@ RETURNING
let updated = thread_goal_from_row(&row)?;
Ok(ThreadGoalAccountingOutcome::Updated(updated))
}
async fn set_thread_preview_if_empty(
&self,
thread_id: ThreadId,
preview: &str,
) -> anyhow::Result<()> {
let preview = preview.trim();
if preview.is_empty() {
return Ok(());
}
sqlx::query(
r#"
UPDATE threads
SET preview = ?
WHERE id = ? AND preview = ''
"#,
)
.bind(preview)
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
.await?;
Ok(())
}
}
fn thread_goal_from_row(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result<crate::ThreadGoal> {
@@ -617,42 +588,6 @@ mod tests {
);
}
#[tokio::test]
async fn replace_thread_goal_sets_preview_when_empty() {
let runtime = test_runtime().await;
let thread_id = test_thread_id();
let mut metadata = test_thread_metadata(
runtime.codex_home(),
thread_id,
runtime.codex_home().join("workspace"),
);
metadata.preview = None;
metadata.first_user_message = None;
runtime
.upsert_thread(&metadata)
.await
.expect("test thread should be upserted");
runtime
.thread_goals()
.replace_thread_goal(
thread_id,
"optimize the benchmark",
crate::ThreadGoalStatus::Active,
/*token_budget*/ None,
)
.await
.expect("goal replacement should succeed");
let metadata = runtime
.get_thread(thread_id)
.await
.expect("thread metadata should load")
.expect("thread should exist");
assert_eq!(metadata.preview.as_deref(), Some("optimize the benchmark"));
assert_eq!(metadata.first_user_message, None);
}
#[tokio::test]
async fn replace_thread_goal_applies_budget_limit_immediately() {
let runtime = test_runtime().await;

View File

@@ -39,6 +39,7 @@ pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool {
pub(crate) enum DbKind {
State,
Logs,
Goals,
}
impl DbKind {
@@ -46,6 +47,7 @@ impl DbKind {
match self {
Self::State => "state",
Self::Logs => "logs",
Self::Goals => "goals",
}
}
}