Inject state DB, agent graph store (#20689)

## Why

We want the agent graph store to be passed down the stack as a real
dependency, the same way we already treat the thread store.

This will let us inject the agent graph store as a real dependency and
support implementations other than the local SQLite-backed one. Right
now most code instantiates a state DB and an agent graph store
just-in-time. Ideally, we would not depend on the state DB directly but
only read through the higher-level interfaces.

This change makes the dependency boundaries explicit and moves state DB
initialization to process bootstrap instead of hiding it inside local
store implementations.

## What changed

- `ThreadManager` now requires a `StateDbHandle` and an
`AgentGraphStore` at construction time instead of treating them as
optional internals.
- The local store constructors no longer lazily initialize SQLite.
Callers now initialize the state DB once per process and use that shared
handle to build:
  - `LocalThreadStore`
  - `LocalAgentGraphStore`
- App bootstraps (`app-server`, `mcp-server`, `prompt_debug`, and the
thread-manager sample) now initialize the state DB up front and inject
the resulting handle down the stack.
- `app-server` now consistently uses its process-scoped state DB handle
instead of reopening SQLite or trying to recover it from loaded threads.
- Device-key storage now reuses the shared state DB handle instead of
maintaining its own lazy opener.
- The thread archive / descendant traversal paths now use the injected
`AgentGraphStore` instead of reaching through local
thread-store-specific state.

## Verification

- `cargo check -p codex-core -p codex-thread-store -p codex-app-server
-p codex-mcp-server -p codex-thread-manager-sample --tests`
- `cargo test -p codex-thread-store`
- `cargo test -p codex-core
thread_manager_accepts_separate_agent_graph_store_and_thread_store --
--nocapture`
- `cargo test -p codex-app-server
thread_archive_archives_spawned_descendants -- --nocapture`
This commit is contained in:
Rasmus Rygaard
2026-05-05 14:45:29 -07:00
committed by GitHub
parent 36460387ec
commit 7e310bc7f3
53 changed files with 768 additions and 745 deletions

View File

@@ -1,6 +1,5 @@
use super::*;
use crate::config::test_config;
use crate::init_state_db;
use crate::rollout::RolloutRecorder;
use crate::session::session::SessionSettingsUpdate;
use crate::session::tests::make_session_and_context;
@@ -47,6 +46,21 @@ fn assistant_msg(text: &str) -> ResponseItem {
}
}
async fn state_backed_stores(
config: &Config,
) -> (
StateDbHandle,
Arc<dyn ThreadStore>,
Arc<dyn AgentGraphStore>,
) {
let state_db = init_state_db_from_config(config)
.await
.expect("thread manager test requires state db");
let thread_store = thread_store_from_config(config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
(state_db, thread_store, agent_graph_store)
}
fn contextual_user_interrupted_marker() -> ResponseItem {
interrupted_turn_history_marker(InterruptedTurnHistoryMarker::ContextualUser)
.expect("contextual-user interrupted marker should be enabled")
@@ -261,7 +275,8 @@ async fn shutdown_all_threads_bounded_submits_shutdown_to_every_thread() {
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
)
.await;
let thread_1 = manager
.start_thread(config.clone())
.await
@@ -310,7 +325,8 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d
config.model_provider.clone(),
config.codex_home.to_path_buf(),
environment_manager,
);
)
.await;
let thread = manager
.start_thread_with_options(StartThreadOptions {
@@ -345,7 +361,8 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
)
.await;
let thread = manager
.start_thread_with_options(StartThreadOptions {
config,
@@ -384,14 +401,16 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
state_db,
thread_store,
agent_graph_store,
);
let selected_cwd =
AbsolutePathBuf::try_from(config.cwd.as_path().join("selected")).expect("absolute path");
@@ -494,14 +513,16 @@ async fn resume_active_thread_from_rollout_returns_running_thread() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
state_db,
thread_store,
agent_graph_store,
);
let source = manager
@@ -548,14 +569,16 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
state_db,
thread_store,
agent_graph_store,
);
let source = manager
@@ -612,14 +635,16 @@ async fn new_uses_active_provider_for_model_refresh() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
state_db,
thread_store,
agent_graph_store,
);
let _ = manager.list_models(RefreshStrategy::Online).await;
@@ -824,15 +849,16 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let state_db = init_state_db(&config).await;
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
state_db,
thread_store,
agent_graph_store,
);
let source = manager
@@ -928,15 +954,16 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let state_db = init_state_db(&config).await;
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
state_db,
thread_store,
agent_graph_store,
);
let source = manager
@@ -1021,15 +1048,16 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let state_db = init_state_db(&config).await;
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
state_db,
thread_store,
agent_graph_store,
);
let source = manager
@@ -1159,15 +1187,16 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let state_db = init_state_db(&config).await;
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
state_db,
thread_store,
agent_graph_store,
);
let source = manager
@@ -1236,6 +1265,11 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
.await
.is_none()
);
let goal = state_db
.get_thread_goal(resumed.thread_id)
.await?
.expect("goal should still exist after resume");
assert_eq!(codex_state::ThreadGoalStatus::Paused, goal.status);
resumed.thread.shutdown_and_wait().await?;
Ok(())