From 450444e31648bc06f290fa61f9a648214e2c6b60 Mon Sep 17 00:00:00 2001 From: Rasmus Rygaard Date: Fri, 1 May 2026 11:39:38 -0700 Subject: [PATCH] Inject agent graph store into thread starts --- codex-rs/Cargo.lock | 1 + .../app-server/src/codex_message_processor.rs | 2 + codex-rs/core-api/src/lib.rs | 1 + codex-rs/core/Cargo.toml | 1 + codex-rs/core/src/agent/control.rs | 44 ++++++++++----- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/thread_manager.rs | 55 ++++++++++++++----- codex-rs/core/src/thread_manager_tests.rs | 3 + codex-rs/memories/write/src/runtime.rs | 1 + 9 files changed, 81 insertions(+), 28 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 404ada1eea..f92940e084 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2374,6 +2374,7 @@ dependencies = [ "bm25", "chrono", "clap", + "codex-agent-graph-store", "codex-analytics", "codex-api", "codex-app-server-protocol", diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 89d07ca987..a3eddf3142 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2588,6 +2588,7 @@ impl CodexMessageProcessor { .thread_manager .start_thread_with_options(StartThreadOptions { thread_store: thread_store_from_config(&config), + agent_graph_store: None, config, initial_history: InitialHistory::Forked(rollout_items), session_source: None, @@ -2786,6 +2787,7 @@ impl CodexMessageProcessor { .thread_manager .start_thread_with_options(StartThreadOptions { thread_store: thread_store_from_config(&config), + agent_graph_store: None, config, initial_history: match session_start_source .unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup) diff --git a/codex-rs/core-api/src/lib.rs b/codex-rs/core-api/src/lib.rs index c4eee541c6..8226d871b9 100644 --- a/codex-rs/core-api/src/lib.rs +++ b/codex-rs/core-api/src/lib.rs @@ -29,6 +29,7 @@ pub use codex_core::NewThread; pub use codex_core::StartThreadOptions; pub use codex_core::ThreadManager; pub use codex_core::ThreadShutdownReport; +pub use codex_core::agent_graph_store_from_config; pub use codex_core::config::Config; pub use codex_core::config::Constrained; pub use codex_core::config::GhostSnapshotConfig; diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 44c6aacac5..1956016141 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -27,6 +27,7 @@ chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } codex-analytics = { workspace = true } codex-api = { workspace = true } +codex-agent-graph-store = { workspace = true } codex-app-server-protocol = { workspace = true } codex-apply-patch = { workspace = true } codex-async-utils = { workspace = true } diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 09a9d4c148..f062f3179c 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -16,6 +16,9 @@ use crate::thread_manager::ResumeThreadFromRolloutOptions; use crate::thread_manager::ThreadManagerState; use crate::thread_manager::thread_store_from_config; use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns; +use codex_agent_graph_store::AgentGraphStore; +use codex_agent_graph_store::LocalAgentGraphStore; +use codex_agent_graph_store::ThreadSpawnEdgeStatus; use codex_features::Feature; use codex_protocol::AgentPath; use codex_protocol::ThreadId; @@ -33,7 +36,6 @@ use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::user_input::UserInput; use codex_rollout::state_db; -use codex_state::DirectionalThreadSpawnEdgeStatus; use serde::Serialize; use std::collections::HashMap; use std::collections::VecDeque; @@ -139,13 +141,18 @@ pub(crate) struct AgentControl { /// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`. manager: Weak, state: Arc, + agent_graph_store: Option>, } impl AgentControl { /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. - pub(crate) fn new(manager: Weak) -> Self { + pub(crate) fn new( + manager: Weak, + agent_graph_store: Option>, + ) -> Self { Self { manager, + agent_graph_store, ..Default::default() } } @@ -466,17 +473,15 @@ impl AgentControl { let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else { return Ok(resumed_thread_id); }; - let Some(state_db_ctx) = resumed_thread.state_db() else { + let Some(agent_graph_store) = self.agent_graph_store_for_thread(resumed_thread.as_ref()) + else { return Ok(resumed_thread_id); }; let mut resume_queue = VecDeque::from([(thread_id, root_depth)]); while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() { - let child_ids = match state_db_ctx - .list_thread_spawn_children_with_status( - parent_thread_id, - DirectionalThreadSpawnEdgeStatus::Open, - ) + let child_ids = match agent_graph_store + .list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Open)) .await { Ok(child_ids) => child_ids, @@ -734,9 +739,9 @@ impl AgentControl { pub(crate) async fn close_agent(&self, agent_id: ThreadId) -> CodexResult { let state = self.upgrade()?; if let Ok(thread) = state.get_thread(agent_id).await - && let Some(state_db_ctx) = thread.state_db() - && let Err(err) = state_db_ctx - .set_thread_spawn_edge_status(agent_id, DirectionalThreadSpawnEdgeStatus::Closed) + && let Some(agent_graph_store) = self.agent_graph_store_for_thread(thread.as_ref()) + && let Err(err) = agent_graph_store + .set_thread_spawn_edge_status(agent_id, ThreadSpawnEdgeStatus::Closed) .await { warn!("failed to persist thread-spawn edge status for {agent_id}: {err}"); @@ -1160,14 +1165,14 @@ impl AgentControl { let Some(parent_thread_id) = session_source.and_then(thread_spawn_parent_thread_id) else { return; }; - let Some(state_db_ctx) = thread.state_db() else { + let Some(agent_graph_store) = self.agent_graph_store_for_thread(thread) else { return; }; - if let Err(err) = state_db_ctx + if let Err(err) = agent_graph_store .upsert_thread_spawn_edge( parent_thread_id, child_thread_id, - DirectionalThreadSpawnEdgeStatus::Open, + ThreadSpawnEdgeStatus::Open, ) .await { @@ -1175,6 +1180,17 @@ impl AgentControl { } } + pub(crate) fn agent_graph_store_for_thread( + &self, + thread: &crate::CodexThread, + ) -> Option> { + self.agent_graph_store.clone().or_else(|| { + thread.state_db().map(|state_db| { + Arc::new(LocalAgentGraphStore::new(state_db)) as Arc + }) + }) + } + async fn live_thread_spawn_descendants( &self, root_thread_id: ThreadId, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 414a587a23..2e97c64d83 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -120,6 +120,7 @@ pub use thread_manager::NewThread; pub use thread_manager::StartThreadOptions; pub use thread_manager::ThreadManager; pub use thread_manager::ThreadShutdownReport; +pub use thread_manager::agent_graph_store_from_config; pub use thread_manager::build_models_manager; pub use thread_manager::thread_store_from_config; pub use web_search::web_search_action_detail; diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 3587dc937e..6376733a90 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -20,6 +20,9 @@ use crate::skills_watcher::SkillsWatcher; use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::InterruptedTurnHistoryMarker; use crate::tasks::interrupted_turn_history_marker; +use codex_agent_graph_store::AgentGraphStore; +use codex_agent_graph_store::LocalAgentGraphStore; +use codex_agent_graph_store::ThreadSpawnEdgeStatus; use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::TurnStatus; @@ -51,7 +54,7 @@ use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_rollout::RolloutConfig; -use codex_state::DirectionalThreadSpawnEdgeStatus; +use codex_rollout::state_db; #[cfg(debug_assertions)] use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; @@ -213,6 +216,7 @@ pub struct ThreadManager { pub struct StartThreadOptions { pub config: Config, pub thread_store: Arc, + pub agent_graph_store: Option>, pub initial_history: InitialHistory, pub session_source: Option, pub dynamic_tools: Vec, @@ -273,6 +277,12 @@ pub fn thread_store_from_config(config: &Config) -> Arc { } } +pub async fn agent_graph_store_from_config(config: &Config) -> Option> { + state_db::get_state_db(config) + .await + .map(|state_db| Arc::new(LocalAgentGraphStore::new(state_db)) as Arc) +} + impl ThreadManager { pub fn new( config: &Config, @@ -484,13 +494,16 @@ impl ThreadManager { subtree_thread_ids.push(thread_id); seen_thread_ids.insert(thread_id); - if let Some(state_db_ctx) = thread.state_db() { - for status in [ - DirectionalThreadSpawnEdgeStatus::Open, - DirectionalThreadSpawnEdgeStatus::Closed, - ] { - for descendant_id in state_db_ctx - .list_thread_spawn_descendants_with_status(thread_id, status) + if let Some(agent_graph_store) = thread + .codex + .session + .services + .agent_control + .agent_graph_store_for_thread(thread.as_ref()) + { + for status in [ThreadSpawnEdgeStatus::Open, ThreadSpawnEdgeStatus::Closed] { + for descendant_id in agent_graph_store + .list_thread_spawn_descendants(thread_id, Some(status)) .await .map_err(|err| { CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}")) @@ -546,9 +559,11 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_graph_store = agent_graph_store_from_config(&config).await; Box::pin(self.start_thread_with_options(StartThreadOptions { config, thread_store, + agent_graph_store, initial_history: InitialHistory::New, session_source: None, dynamic_tools, @@ -572,7 +587,7 @@ impl ThreadManager { options.thread_store, options.initial_history, Arc::clone(&self.state.auth_manager), - self.agent_control(), + self.agent_control_with_store(options.agent_graph_store), session_source, options.dynamic_tools, options.persist_extended_history, @@ -619,12 +634,13 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_graph_store = agent_graph_store_from_config(&config).await; Box::pin(self.state.spawn_thread( config, thread_store, initial_history, auth_manager, - self.agent_control(), + self.agent_control_with_store(agent_graph_store), Vec::new(), persist_extended_history, /*metrics_service_name*/ None, @@ -645,12 +661,13 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_graph_store = agent_graph_store_from_config(&config).await; Box::pin(self.state.spawn_thread( config, thread_store, InitialHistory::New, Arc::clone(&self.state.auth_manager), - self.agent_control(), + self.agent_control_with_store(agent_graph_store), Vec::new(), /*persist_extended_history*/ false, /*metrics_service_name*/ None, @@ -674,12 +691,13 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_graph_store = agent_graph_store_from_config(&config).await; Box::pin(self.state.spawn_thread( config, thread_store, initial_history, auth_manager, - self.agent_control(), + self.agent_control_with_store(agent_graph_store), Vec::new(), /*persist_extended_history*/ false, /*metrics_service_name*/ None, @@ -816,12 +834,13 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_graph_store = agent_graph_store_from_config(&config).await; Box::pin(self.state.spawn_thread( config, thread_store, history, Arc::clone(&self.state.auth_manager), - self.agent_control(), + self.agent_control_with_store(agent_graph_store), Vec::new(), persist_extended_history, /*metrics_service_name*/ None, @@ -832,8 +851,16 @@ impl ThreadManager { .await } + #[cfg(test)] pub(crate) fn agent_control(&self) -> AgentControl { - AgentControl::new(Arc::downgrade(&self.state)) + self.agent_control_with_store(None) + } + + fn agent_control_with_store( + &self, + agent_graph_store: Option>, + ) -> AgentControl { + AgentControl::new(Arc::downgrade(&self.state), agent_graph_store) } #[cfg(test)] diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index c79321b94b..e73d881ed9 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -315,6 +315,7 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d let thread = manager .start_thread_with_options(StartThreadOptions { thread_store: thread_store_from_config(&config), + agent_graph_store: None, config: config.clone(), initial_history: InitialHistory::New, session_source: None, @@ -351,6 +352,7 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() { let thread = manager .start_thread_with_options(StartThreadOptions { thread_store, + agent_graph_store: None, config, initial_history: InitialHistory::New, session_source: Some(SessionSource::Internal( @@ -406,6 +408,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { let source = manager .start_thread_with_options(StartThreadOptions { thread_store: Arc::clone(&thread_store), + agent_graph_store: None, config: config.clone(), initial_history: InitialHistory::New, session_source: None, diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 255fb718f2..2953f5b739 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -238,6 +238,7 @@ impl MemoryStartupContext { .thread_manager .start_thread_with_options(StartThreadOptions { thread_store: thread_store_from_config(&config), + agent_graph_store: None, config, initial_history: InitialHistory::New, session_source: Some(SessionSource::Internal(