Compare commits

...

1 Commits

Author SHA1 Message Date
Rasmus Rygaard
450444e316 Inject agent graph store into thread starts 2026-05-01 11:39:38 -07:00
9 changed files with 81 additions and 28 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2374,6 +2374,7 @@ dependencies = [
"bm25",
"chrono",
"clap",
"codex-agent-graph-store",
"codex-analytics",
"codex-api",
"codex-app-server-protocol",

View File

@@ -2588,6 +2588,7 @@ impl CodexMessageProcessor {
.thread_manager
.start_thread_with_options(StartThreadOptions {
thread_store: thread_store_from_config(&config),
agent_graph_store: None,
config,
initial_history: InitialHistory::Forked(rollout_items),
session_source: None,
@@ -2786,6 +2787,7 @@ impl CodexMessageProcessor {
.thread_manager
.start_thread_with_options(StartThreadOptions {
thread_store: thread_store_from_config(&config),
agent_graph_store: None,
config,
initial_history: match session_start_source
.unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup)

View File

@@ -29,6 +29,7 @@ pub use codex_core::NewThread;
pub use codex_core::StartThreadOptions;
pub use codex_core::ThreadManager;
pub use codex_core::ThreadShutdownReport;
pub use codex_core::agent_graph_store_from_config;
pub use codex_core::config::Config;
pub use codex_core::config::Constrained;
pub use codex_core::config::GhostSnapshotConfig;

View File

@@ -27,6 +27,7 @@ chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
codex-analytics = { workspace = true }
codex-api = { workspace = true }
codex-agent-graph-store = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
codex-async-utils = { workspace = true }

View File

@@ -16,6 +16,9 @@ use crate::thread_manager::ResumeThreadFromRolloutOptions;
use crate::thread_manager::ThreadManagerState;
use crate::thread_manager::thread_store_from_config;
use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns;
use codex_agent_graph_store::AgentGraphStore;
use codex_agent_graph_store::LocalAgentGraphStore;
use codex_agent_graph_store::ThreadSpawnEdgeStatus;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
@@ -33,7 +36,6 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::user_input::UserInput;
use codex_rollout::state_db;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::VecDeque;
@@ -139,13 +141,18 @@ pub(crate) struct AgentControl {
/// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`.
manager: Weak<ThreadManagerState>,
state: Arc<AgentRegistry>,
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
}
impl AgentControl {
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
pub(crate) fn new(manager: Weak<ThreadManagerState>) -> Self {
pub(crate) fn new(
manager: Weak<ThreadManagerState>,
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
) -> Self {
Self {
manager,
agent_graph_store,
..Default::default()
}
}
@@ -466,17 +473,15 @@ impl AgentControl {
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
return Ok(resumed_thread_id);
};
let Some(state_db_ctx) = resumed_thread.state_db() else {
let Some(agent_graph_store) = self.agent_graph_store_for_thread(resumed_thread.as_ref())
else {
return Ok(resumed_thread_id);
};
let mut resume_queue = VecDeque::from([(thread_id, root_depth)]);
while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() {
let child_ids = match state_db_ctx
.list_thread_spawn_children_with_status(
parent_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
let child_ids = match agent_graph_store
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Open))
.await
{
Ok(child_ids) => child_ids,
@@ -734,9 +739,9 @@ impl AgentControl {
pub(crate) async fn close_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
if let Ok(thread) = state.get_thread(agent_id).await
&& let Some(state_db_ctx) = thread.state_db()
&& let Err(err) = state_db_ctx
.set_thread_spawn_edge_status(agent_id, DirectionalThreadSpawnEdgeStatus::Closed)
&& let Some(agent_graph_store) = self.agent_graph_store_for_thread(thread.as_ref())
&& let Err(err) = agent_graph_store
.set_thread_spawn_edge_status(agent_id, ThreadSpawnEdgeStatus::Closed)
.await
{
warn!("failed to persist thread-spawn edge status for {agent_id}: {err}");
@@ -1160,14 +1165,14 @@ impl AgentControl {
let Some(parent_thread_id) = session_source.and_then(thread_spawn_parent_thread_id) else {
return;
};
let Some(state_db_ctx) = thread.state_db() else {
let Some(agent_graph_store) = self.agent_graph_store_for_thread(thread) else {
return;
};
if let Err(err) = state_db_ctx
if let Err(err) = agent_graph_store
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
ThreadSpawnEdgeStatus::Open,
)
.await
{
@@ -1175,6 +1180,17 @@ impl AgentControl {
}
}
pub(crate) fn agent_graph_store_for_thread(
&self,
thread: &crate::CodexThread,
) -> Option<Arc<dyn AgentGraphStore>> {
self.agent_graph_store.clone().or_else(|| {
thread.state_db().map(|state_db| {
Arc::new(LocalAgentGraphStore::new(state_db)) as Arc<dyn AgentGraphStore>
})
})
}
async fn live_thread_spawn_descendants(
&self,
root_thread_id: ThreadId,

View File

@@ -120,6 +120,7 @@ pub use thread_manager::NewThread;
pub use thread_manager::StartThreadOptions;
pub use thread_manager::ThreadManager;
pub use thread_manager::ThreadShutdownReport;
pub use thread_manager::agent_graph_store_from_config;
pub use thread_manager::build_models_manager;
pub use thread_manager::thread_store_from_config;
pub use web_search::web_search_action_detail;

View File

@@ -20,6 +20,9 @@ use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_agent_graph_store::AgentGraphStore;
use codex_agent_graph_store::LocalAgentGraphStore;
use codex_agent_graph_store::ThreadSpawnEdgeStatus;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
@@ -51,7 +54,7 @@ use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout::RolloutConfig;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_rollout::state_db;
#[cfg(debug_assertions)]
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::LocalThreadStore;
@@ -213,6 +216,7 @@ pub struct ThreadManager {
pub struct StartThreadOptions {
pub config: Config,
pub thread_store: Arc<dyn ThreadStore>,
pub agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
pub initial_history: InitialHistory,
pub session_source: Option<SessionSource>,
pub dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
@@ -273,6 +277,12 @@ pub fn thread_store_from_config(config: &Config) -> Arc<dyn ThreadStore> {
}
}
pub async fn agent_graph_store_from_config(config: &Config) -> Option<Arc<dyn AgentGraphStore>> {
state_db::get_state_db(config)
.await
.map(|state_db| Arc::new(LocalAgentGraphStore::new(state_db)) as Arc<dyn AgentGraphStore>)
}
impl ThreadManager {
pub fn new(
config: &Config,
@@ -484,13 +494,16 @@ impl ThreadManager {
subtree_thread_ids.push(thread_id);
seen_thread_ids.insert(thread_id);
if let Some(state_db_ctx) = thread.state_db() {
for status in [
DirectionalThreadSpawnEdgeStatus::Open,
DirectionalThreadSpawnEdgeStatus::Closed,
] {
for descendant_id in state_db_ctx
.list_thread_spawn_descendants_with_status(thread_id, status)
if let Some(agent_graph_store) = thread
.codex
.session
.services
.agent_control
.agent_graph_store_for_thread(thread.as_ref())
{
for status in [ThreadSpawnEdgeStatus::Open, ThreadSpawnEdgeStatus::Closed] {
for descendant_id in agent_graph_store
.list_thread_spawn_descendants(thread_id, Some(status))
.await
.map_err(|err| {
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
@@ -546,9 +559,11 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let agent_graph_store = agent_graph_store_from_config(&config).await;
Box::pin(self.start_thread_with_options(StartThreadOptions {
config,
thread_store,
agent_graph_store,
initial_history: InitialHistory::New,
session_source: None,
dynamic_tools,
@@ -572,7 +587,7 @@ impl ThreadManager {
options.thread_store,
options.initial_history,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
self.agent_control_with_store(options.agent_graph_store),
session_source,
options.dynamic_tools,
options.persist_extended_history,
@@ -619,12 +634,13 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let agent_graph_store = agent_graph_store_from_config(&config).await;
Box::pin(self.state.spawn_thread(
config,
thread_store,
initial_history,
auth_manager,
self.agent_control(),
self.agent_control_with_store(agent_graph_store),
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
@@ -645,12 +661,13 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let agent_graph_store = agent_graph_store_from_config(&config).await;
Box::pin(self.state.spawn_thread(
config,
thread_store,
InitialHistory::New,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
self.agent_control_with_store(agent_graph_store),
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
@@ -674,12 +691,13 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let agent_graph_store = agent_graph_store_from_config(&config).await;
Box::pin(self.state.spawn_thread(
config,
thread_store,
initial_history,
auth_manager,
self.agent_control(),
self.agent_control_with_store(agent_graph_store),
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
@@ -816,12 +834,13 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let agent_graph_store = agent_graph_store_from_config(&config).await;
Box::pin(self.state.spawn_thread(
config,
thread_store,
history,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
self.agent_control_with_store(agent_graph_store),
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
@@ -832,8 +851,16 @@ impl ThreadManager {
.await
}
#[cfg(test)]
pub(crate) fn agent_control(&self) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state))
self.agent_control_with_store(None)
}
fn agent_control_with_store(
&self,
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state), agent_graph_store)
}
#[cfg(test)]

View File

@@ -315,6 +315,7 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d
let thread = manager
.start_thread_with_options(StartThreadOptions {
thread_store: thread_store_from_config(&config),
agent_graph_store: None,
config: config.clone(),
initial_history: InitialHistory::New,
session_source: None,
@@ -351,6 +352,7 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
let thread = manager
.start_thread_with_options(StartThreadOptions {
thread_store,
agent_graph_store: None,
config,
initial_history: InitialHistory::New,
session_source: Some(SessionSource::Internal(
@@ -406,6 +408,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
let source = manager
.start_thread_with_options(StartThreadOptions {
thread_store: Arc::clone(&thread_store),
agent_graph_store: None,
config: config.clone(),
initial_history: InitialHistory::New,
session_source: None,

View File

@@ -238,6 +238,7 @@ impl MemoryStartupContext {
.thread_manager
.start_thread_with_options(StartThreadOptions {
thread_store: thread_store_from_config(&config),
agent_graph_store: None,
config,
initial_history: InitialHistory::New,
session_source: Some(SessionSource::Internal(