mirror of
https://github.com/openai/codex.git
synced 2026-05-28 06:55:01 +00:00
Inject agent graph store into thread starts
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2374,6 +2374,7 @@ dependencies = [
|
||||
"bm25",
|
||||
"chrono",
|
||||
"clap",
|
||||
"codex-agent-graph-store",
|
||||
"codex-analytics",
|
||||
"codex-api",
|
||||
"codex-app-server-protocol",
|
||||
|
||||
@@ -2588,6 +2588,7 @@ impl CodexMessageProcessor {
|
||||
.thread_manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
thread_store: thread_store_from_config(&config),
|
||||
agent_graph_store: None,
|
||||
config,
|
||||
initial_history: InitialHistory::Forked(rollout_items),
|
||||
session_source: None,
|
||||
@@ -2786,6 +2787,7 @@ impl CodexMessageProcessor {
|
||||
.thread_manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
thread_store: thread_store_from_config(&config),
|
||||
agent_graph_store: None,
|
||||
config,
|
||||
initial_history: match session_start_source
|
||||
.unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup)
|
||||
|
||||
@@ -29,6 +29,7 @@ pub use codex_core::NewThread;
|
||||
pub use codex_core::StartThreadOptions;
|
||||
pub use codex_core::ThreadManager;
|
||||
pub use codex_core::ThreadShutdownReport;
|
||||
pub use codex_core::agent_graph_store_from_config;
|
||||
pub use codex_core::config::Config;
|
||||
pub use codex_core::config::Constrained;
|
||||
pub use codex_core::config::GhostSnapshotConfig;
|
||||
|
||||
@@ -27,6 +27,7 @@ chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
codex-analytics = { workspace = true }
|
||||
codex-api = { workspace = true }
|
||||
codex-agent-graph-store = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-apply-patch = { workspace = true }
|
||||
codex-async-utils = { workspace = true }
|
||||
|
||||
@@ -16,6 +16,9 @@ use crate::thread_manager::ResumeThreadFromRolloutOptions;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use crate::thread_manager::thread_store_from_config;
|
||||
use crate::thread_rollout_truncation::truncate_rollout_to_last_n_fork_turns;
|
||||
use codex_agent_graph_store::AgentGraphStore;
|
||||
use codex_agent_graph_store::LocalAgentGraphStore;
|
||||
use codex_agent_graph_store::ThreadSpawnEdgeStatus;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -33,7 +36,6 @@ use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_rollout::state_db;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
@@ -139,13 +141,18 @@ pub(crate) struct AgentControl {
|
||||
/// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`.
|
||||
manager: Weak<ThreadManagerState>,
|
||||
state: Arc<AgentRegistry>,
|
||||
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
|
||||
}
|
||||
|
||||
impl AgentControl {
|
||||
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
|
||||
pub(crate) fn new(manager: Weak<ThreadManagerState>) -> Self {
|
||||
pub(crate) fn new(
|
||||
manager: Weak<ThreadManagerState>,
|
||||
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
manager,
|
||||
agent_graph_store,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -466,17 +473,15 @@ impl AgentControl {
|
||||
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
|
||||
return Ok(resumed_thread_id);
|
||||
};
|
||||
let Some(state_db_ctx) = resumed_thread.state_db() else {
|
||||
let Some(agent_graph_store) = self.agent_graph_store_for_thread(resumed_thread.as_ref())
|
||||
else {
|
||||
return Ok(resumed_thread_id);
|
||||
};
|
||||
|
||||
let mut resume_queue = VecDeque::from([(thread_id, root_depth)]);
|
||||
while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() {
|
||||
let child_ids = match state_db_ctx
|
||||
.list_thread_spawn_children_with_status(
|
||||
parent_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
let child_ids = match agent_graph_store
|
||||
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Open))
|
||||
.await
|
||||
{
|
||||
Ok(child_ids) => child_ids,
|
||||
@@ -734,9 +739,9 @@ impl AgentControl {
|
||||
pub(crate) async fn close_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
if let Ok(thread) = state.get_thread(agent_id).await
|
||||
&& let Some(state_db_ctx) = thread.state_db()
|
||||
&& let Err(err) = state_db_ctx
|
||||
.set_thread_spawn_edge_status(agent_id, DirectionalThreadSpawnEdgeStatus::Closed)
|
||||
&& let Some(agent_graph_store) = self.agent_graph_store_for_thread(thread.as_ref())
|
||||
&& let Err(err) = agent_graph_store
|
||||
.set_thread_spawn_edge_status(agent_id, ThreadSpawnEdgeStatus::Closed)
|
||||
.await
|
||||
{
|
||||
warn!("failed to persist thread-spawn edge status for {agent_id}: {err}");
|
||||
@@ -1160,14 +1165,14 @@ impl AgentControl {
|
||||
let Some(parent_thread_id) = session_source.and_then(thread_spawn_parent_thread_id) else {
|
||||
return;
|
||||
};
|
||||
let Some(state_db_ctx) = thread.state_db() else {
|
||||
let Some(agent_graph_store) = self.agent_graph_store_for_thread(thread) else {
|
||||
return;
|
||||
};
|
||||
if let Err(err) = state_db_ctx
|
||||
if let Err(err) = agent_graph_store
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -1175,6 +1180,17 @@ impl AgentControl {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn agent_graph_store_for_thread(
|
||||
&self,
|
||||
thread: &crate::CodexThread,
|
||||
) -> Option<Arc<dyn AgentGraphStore>> {
|
||||
self.agent_graph_store.clone().or_else(|| {
|
||||
thread.state_db().map(|state_db| {
|
||||
Arc::new(LocalAgentGraphStore::new(state_db)) as Arc<dyn AgentGraphStore>
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn live_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
|
||||
@@ -120,6 +120,7 @@ pub use thread_manager::NewThread;
|
||||
pub use thread_manager::StartThreadOptions;
|
||||
pub use thread_manager::ThreadManager;
|
||||
pub use thread_manager::ThreadShutdownReport;
|
||||
pub use thread_manager::agent_graph_store_from_config;
|
||||
pub use thread_manager::build_models_manager;
|
||||
pub use thread_manager::thread_store_from_config;
|
||||
pub use web_search::web_search_action_detail;
|
||||
|
||||
@@ -20,6 +20,9 @@ use crate::skills_watcher::SkillsWatcher;
|
||||
use crate::skills_watcher::SkillsWatcherEvent;
|
||||
use crate::tasks::InterruptedTurnHistoryMarker;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_agent_graph_store::AgentGraphStore;
|
||||
use codex_agent_graph_store::LocalAgentGraphStore;
|
||||
use codex_agent_graph_store::ThreadSpawnEdgeStatus;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -51,7 +54,7 @@ use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_rollout::RolloutConfig;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_rollout::state_db;
|
||||
#[cfg(debug_assertions)]
|
||||
use codex_thread_store::InMemoryThreadStore;
|
||||
use codex_thread_store::LocalThreadStore;
|
||||
@@ -213,6 +216,7 @@ pub struct ThreadManager {
|
||||
pub struct StartThreadOptions {
|
||||
pub config: Config,
|
||||
pub thread_store: Arc<dyn ThreadStore>,
|
||||
pub agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
|
||||
pub initial_history: InitialHistory,
|
||||
pub session_source: Option<SessionSource>,
|
||||
pub dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
@@ -273,6 +277,12 @@ pub fn thread_store_from_config(config: &Config) -> Arc<dyn ThreadStore> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn agent_graph_store_from_config(config: &Config) -> Option<Arc<dyn AgentGraphStore>> {
|
||||
state_db::get_state_db(config)
|
||||
.await
|
||||
.map(|state_db| Arc::new(LocalAgentGraphStore::new(state_db)) as Arc<dyn AgentGraphStore>)
|
||||
}
|
||||
|
||||
impl ThreadManager {
|
||||
pub fn new(
|
||||
config: &Config,
|
||||
@@ -484,13 +494,16 @@ impl ThreadManager {
|
||||
subtree_thread_ids.push(thread_id);
|
||||
seen_thread_ids.insert(thread_id);
|
||||
|
||||
if let Some(state_db_ctx) = thread.state_db() {
|
||||
for status in [
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
] {
|
||||
for descendant_id in state_db_ctx
|
||||
.list_thread_spawn_descendants_with_status(thread_id, status)
|
||||
if let Some(agent_graph_store) = thread
|
||||
.codex
|
||||
.session
|
||||
.services
|
||||
.agent_control
|
||||
.agent_graph_store_for_thread(thread.as_ref())
|
||||
{
|
||||
for status in [ThreadSpawnEdgeStatus::Open, ThreadSpawnEdgeStatus::Closed] {
|
||||
for descendant_id in agent_graph_store
|
||||
.list_thread_spawn_descendants(thread_id, Some(status))
|
||||
.await
|
||||
.map_err(|err| {
|
||||
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
|
||||
@@ -546,9 +559,11 @@ impl ThreadManager {
|
||||
self.state.environment_manager.as_ref(),
|
||||
&config.cwd,
|
||||
);
|
||||
let agent_graph_store = agent_graph_store_from_config(&config).await;
|
||||
Box::pin(self.start_thread_with_options(StartThreadOptions {
|
||||
config,
|
||||
thread_store,
|
||||
agent_graph_store,
|
||||
initial_history: InitialHistory::New,
|
||||
session_source: None,
|
||||
dynamic_tools,
|
||||
@@ -572,7 +587,7 @@ impl ThreadManager {
|
||||
options.thread_store,
|
||||
options.initial_history,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
self.agent_control_with_store(options.agent_graph_store),
|
||||
session_source,
|
||||
options.dynamic_tools,
|
||||
options.persist_extended_history,
|
||||
@@ -619,12 +634,13 @@ impl ThreadManager {
|
||||
self.state.environment_manager.as_ref(),
|
||||
&config.cwd,
|
||||
);
|
||||
let agent_graph_store = agent_graph_store_from_config(&config).await;
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
thread_store,
|
||||
initial_history,
|
||||
auth_manager,
|
||||
self.agent_control(),
|
||||
self.agent_control_with_store(agent_graph_store),
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
/*metrics_service_name*/ None,
|
||||
@@ -645,12 +661,13 @@ impl ThreadManager {
|
||||
self.state.environment_manager.as_ref(),
|
||||
&config.cwd,
|
||||
);
|
||||
let agent_graph_store = agent_graph_store_from_config(&config).await;
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
thread_store,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
self.agent_control_with_store(agent_graph_store),
|
||||
Vec::new(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*metrics_service_name*/ None,
|
||||
@@ -674,12 +691,13 @@ impl ThreadManager {
|
||||
self.state.environment_manager.as_ref(),
|
||||
&config.cwd,
|
||||
);
|
||||
let agent_graph_store = agent_graph_store_from_config(&config).await;
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
thread_store,
|
||||
initial_history,
|
||||
auth_manager,
|
||||
self.agent_control(),
|
||||
self.agent_control_with_store(agent_graph_store),
|
||||
Vec::new(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*metrics_service_name*/ None,
|
||||
@@ -816,12 +834,13 @@ impl ThreadManager {
|
||||
self.state.environment_manager.as_ref(),
|
||||
&config.cwd,
|
||||
);
|
||||
let agent_graph_store = agent_graph_store_from_config(&config).await;
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
thread_store,
|
||||
history,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
self.agent_control_with_store(agent_graph_store),
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
/*metrics_service_name*/ None,
|
||||
@@ -832,8 +851,16 @@ impl ThreadManager {
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn agent_control(&self) -> AgentControl {
|
||||
AgentControl::new(Arc::downgrade(&self.state))
|
||||
self.agent_control_with_store(None)
|
||||
}
|
||||
|
||||
fn agent_control_with_store(
|
||||
&self,
|
||||
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
|
||||
) -> AgentControl {
|
||||
AgentControl::new(Arc::downgrade(&self.state), agent_graph_store)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -315,6 +315,7 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d
|
||||
let thread = manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
thread_store: thread_store_from_config(&config),
|
||||
agent_graph_store: None,
|
||||
config: config.clone(),
|
||||
initial_history: InitialHistory::New,
|
||||
session_source: None,
|
||||
@@ -351,6 +352,7 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
|
||||
let thread = manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
thread_store,
|
||||
agent_graph_store: None,
|
||||
config,
|
||||
initial_history: InitialHistory::New,
|
||||
session_source: Some(SessionSource::Internal(
|
||||
@@ -406,6 +408,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
|
||||
let source = manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
thread_store: Arc::clone(&thread_store),
|
||||
agent_graph_store: None,
|
||||
config: config.clone(),
|
||||
initial_history: InitialHistory::New,
|
||||
session_source: None,
|
||||
|
||||
@@ -238,6 +238,7 @@ impl MemoryStartupContext {
|
||||
.thread_manager
|
||||
.start_thread_with_options(StartThreadOptions {
|
||||
thread_store: thread_store_from_config(&config),
|
||||
agent_graph_store: None,
|
||||
config,
|
||||
initial_history: InitialHistory::New,
|
||||
session_source: Some(SessionSource::Internal(
|
||||
|
||||
Reference in New Issue
Block a user