From 5ea36a7efb466864d496c08177692eabf2acac5d Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 11 May 2026 23:06:14 +0100 Subject: [PATCH] bind the spawn agent capability --- codex-rs/Cargo.lock | 2 + codex-rs/app-server/src/extensions.rs | 9 ++- codex-rs/core/src/session/session.rs | 1 + codex-rs/core/src/thread_manager.rs | 29 ++++++++ codex-rs/core/src/thread_manager_tests.rs | 69 +++++++++++++++++++ .../extension-api/src/capabilities/agent.rs | 22 ++++-- .../ext/extension-api/src/contributors.rs | 13 +++- codex-rs/ext/git-attribution/Cargo.toml | 1 + codex-rs/ext/git-attribution/src/lib.rs | 2 + codex-rs/ext/guardian/Cargo.toml | 1 + codex-rs/ext/guardian/src/lib.rs | 33 ++++++--- 11 files changed, 162 insertions(+), 20 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index df0827a0de..8b7a41b2b1 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2902,6 +2902,7 @@ dependencies = [ "codex-core", "codex-extension-api", "codex-features", + "codex-protocol", "pretty_assertions", ] @@ -2935,6 +2936,7 @@ version = "0.0.0" dependencies = [ "codex-core", "codex-extension-api", + "codex-protocol", ] [[package]] diff --git a/codex-rs/app-server/src/extensions.rs b/codex-rs/app-server/src/extensions.rs index e3828526b7..411685e7d8 100644 --- a/codex-rs/app-server/src/extensions.rs +++ b/codex-rs/app-server/src/extensions.rs @@ -9,6 +9,7 @@ use codex_extension_api::AgentSpawnFuture; use codex_extension_api::AgentSpawner; use codex_extension_api::ExtensionRegistry; use codex_extension_api::ExtensionRegistryBuilder; +use codex_protocol::ThreadId; use codex_protocol::error::CodexErr; pub(crate) fn thread_extensions(guardian_agent_spawner: S) -> Arc> @@ -24,13 +25,17 @@ where pub(crate) fn guardian_agent_spawner( thread_manager: Weak, ) -> impl AgentSpawner { - move |options: StartThreadOptions| -> AgentSpawnFuture<'static, NewThread, CodexErr> { + move |forked_from_thread_id: ThreadId, + options: StartThreadOptions| + -> AgentSpawnFuture<'static, NewThread, CodexErr> { let thread_manager = thread_manager.clone(); Box::pin(async move { let thread_manager = thread_manager.upgrade().ok_or_else(|| { CodexErr::UnsupportedOperation("thread manager dropped".to_string()) })?; - thread_manager.start_thread_with_options(options).await + thread_manager + .spawn_subagent(forked_from_thread_id, options) + .await }) } } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 32aaf8fb0b..747a6f5b4f 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -815,6 +815,7 @@ impl Session { let thread_extension_data = codex_extension_api::ExtensionData::new(); for contributor in extensions.thread_start_contributors() { contributor.contribute( + thread_id, config.as_ref(), &session_extension_data, &thread_extension_data, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 83bfe3b3d1..841d27f27a 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -567,6 +567,35 @@ impl ThreadManager { .await } + /// Spawn a subagent by forking persisted history from `forked_from_thread_id`. + pub async fn spawn_subagent( + &self, + forked_from_thread_id: ThreadId, + mut options: StartThreadOptions, + ) -> CodexResult { + let fork_source = self.get_thread(forked_from_thread_id).await?; + // Persist queued rollout updates before reading the fork snapshot. + fork_source.ensure_rollout_materialized().await; + fork_source.flush_rollout().await?; + let stored_thread = fork_source + .read_thread( + /*include_archived*/ true, /*include_history*/ true, + ) + .await + .map_err(|err| { + CodexErr::Fatal(format!( + "failed to read subagent fork source {forked_from_thread_id}: {err}" + )) + })?; + let history = stored_thread_to_initial_history(stored_thread, fork_source.rollout_path())?; + options.initial_history = fork_history_from_snapshot( + ForkSnapshot::Interrupted, + history, + InterruptedTurnHistoryMarker::from_config(&options.config), + ); + self.start_thread_with_options(options).await + } + pub async fn resume_thread_from_rollout( &self, config: Config, diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 9fedff9470..4a9bc1bb97 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -481,6 +481,75 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() { assert!(manager.list_thread_ids().await.is_empty()); } +#[tokio::test] +async fn spawn_subagent_forks_from_the_source_thread() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + config.cwd = config.codex_home.abs(); + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let state_db = init_state_db(&config).await; + let thread_store = thread_store_from_config(&config, state_db.clone()); + let manager = ThreadManager::new( + &config, + auth_manager, + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + empty_extension_registry(), + /*analytics_events_client*/ None, + thread_store.clone(), + state_db, + TEST_INSTALLATION_ID.to_string(), + /*attestation_provider*/ None, + ); + let parent = manager + .start_thread(config.clone()) + .await + .expect("start parent thread"); + + let spawned = manager + .spawn_subagent( + parent.thread_id, + StartThreadOptions { + config, + initial_history: InitialHistory::New, + session_source: None, + thread_source: None, + dynamic_tools: Vec::new(), + persist_extended_history: false, + metrics_service_name: None, + parent_trace: None, + environments: Vec::new(), + }, + ) + .await + .expect("spawn subagent"); + let stored_spawned = spawned + .thread + .read_thread( + /*include_archived*/ true, /*include_history*/ true, + ) + .await + .expect("read spawned subagent"); + + assert_ne!(spawned.thread_id, parent.thread_id); + assert_eq!(stored_spawned.forked_from_id, Some(parent.thread_id)); + + parent + .thread + .shutdown_and_wait() + .await + .expect("shutdown parent thread"); + spawned + .thread + .shutdown_and_wait() + .await + .expect("shutdown spawned subagent"); +} + #[tokio::test] async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { let temp_dir = tempdir().expect("tempdir"); diff --git a/codex-rs/ext/extension-api/src/capabilities/agent.rs b/codex-rs/ext/extension-api/src/capabilities/agent.rs index a60a43790e..3ecdfdf5fd 100644 --- a/codex-rs/ext/extension-api/src/capabilities/agent.rs +++ b/codex-rs/ext/extension-api/src/capabilities/agent.rs @@ -1,10 +1,12 @@ use std::future::Future; use std::pin::Pin; -/// Future returned by one injected agent-spawn helper. +use codex_protocol::ThreadId; + +/// Future returned by one injected subagent-spawn helper. pub type AgentSpawnFuture<'a, T, E> = Pin> + Send + 'a>>; -/// Constructor-injected host helper for extensions that need to spawn agents. +/// Constructor-injected host helper for extensions that need to spawn subagents. /// /// The extension owns the request shape and resulting handle types. The host /// provides the implementation when it constructs the extension. @@ -12,17 +14,25 @@ pub trait AgentSpawner: Send + Sync { type Spawned; type Error; - fn spawn_agent<'a>(&'a self, request: R) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>; + fn spawn_subagent<'a>( + &'a self, + forked_from_thread_id: ThreadId, + request: R, + ) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>; } impl AgentSpawner for F where - F: Fn(R) -> AgentSpawnFuture<'static, S, E> + Send + Sync, + F: Fn(ThreadId, R) -> AgentSpawnFuture<'static, S, E> + Send + Sync, { type Spawned = S; type Error = E; - fn spawn_agent<'a>(&'a self, request: R) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> { - self(request) + fn spawn_subagent<'a>( + &'a self, + forked_from_thread_id: ThreadId, + request: R, + ) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> { + self(forked_from_thread_id, request) } } diff --git a/codex-rs/ext/extension-api/src/contributors.rs b/codex-rs/ext/extension-api/src/contributors.rs index 4475e9271d..18ca7b0299 100644 --- a/codex-rs/ext/extension-api/src/contributors.rs +++ b/codex-rs/ext/extension-api/src/contributors.rs @@ -1,5 +1,6 @@ use std::future::Future; +use codex_protocol::ThreadId; use codex_protocol::items::TurnItem; use codex_tool_api::ToolBundle; @@ -10,10 +11,16 @@ mod prompt; pub use prompt::PromptFragment; pub use prompt::PromptSlot; -/// Contributor that receives host-owned thread-start input before later -/// contributors read from extension stores. +/// Contributor that receives the live thread id and host-owned thread-start +/// input before later contributors read from extension stores. pub trait ThreadStartContributor: Send + Sync { - fn contribute(&self, input: &C, session_store: &ExtensionData, thread_store: &ExtensionData); + fn contribute( + &self, + thread_id: ThreadId, + input: &C, + session_store: &ExtensionData, + thread_store: &ExtensionData, + ); } /// Extension contribution that adds prompt fragments during prompt assembly. diff --git a/codex-rs/ext/git-attribution/Cargo.toml b/codex-rs/ext/git-attribution/Cargo.toml index edf8c07196..500ce47aec 100644 --- a/codex-rs/ext/git-attribution/Cargo.toml +++ b/codex-rs/ext/git-attribution/Cargo.toml @@ -16,6 +16,7 @@ workspace = true codex-core = { workspace = true } codex-extension-api = { workspace = true } codex-features = { workspace = true } +codex-protocol = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/codex-rs/ext/git-attribution/src/lib.rs b/codex-rs/ext/git-attribution/src/lib.rs index bb73a83011..eae906fa56 100644 --- a/codex-rs/ext/git-attribution/src/lib.rs +++ b/codex-rs/ext/git-attribution/src/lib.rs @@ -7,6 +7,7 @@ use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::PromptFragment; use codex_extension_api::ThreadStartContributor; use codex_features::Feature; +use codex_protocol::ThreadId; const DEFAULT_ATTRIBUTION_VALUE: &str = "Codex "; @@ -42,6 +43,7 @@ struct GitAttributionConfig { impl ThreadStartContributor for GitAttributionExtension { fn contribute( &self, + _thread_id: ThreadId, config: &Config, _session_store: &ExtensionData, thread_store: &ExtensionData, diff --git a/codex-rs/ext/guardian/Cargo.toml b/codex-rs/ext/guardian/Cargo.toml index 8c062c816a..8c26ab0a1c 100644 --- a/codex-rs/ext/guardian/Cargo.toml +++ b/codex-rs/ext/guardian/Cargo.toml @@ -15,3 +15,4 @@ workspace = true [dependencies] codex-core = { workspace = true } codex-extension-api = { workspace = true } +codex-protocol = { workspace = true } diff --git a/codex-rs/ext/guardian/src/lib.rs b/codex-rs/ext/guardian/src/lib.rs index 5e5e02b99a..d8c9834d60 100644 --- a/codex-rs/ext/guardian/src/lib.rs +++ b/codex-rs/ext/guardian/src/lib.rs @@ -6,6 +6,7 @@ use codex_extension_api::AgentSpawner; use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::ThreadStartContributor; +use codex_protocol::ThreadId; /// Guardian extension dependencies supplied by the host at construction time. #[derive(Clone, Debug)] @@ -19,20 +20,30 @@ impl GuardianExtension { Self { agent_spawner } } - /// Returns the host-provided agent spawn helper. - pub fn agent_spawner(&self) -> &S { - &self.agent_spawner - } - - /// Delegates one guardian-owned spawn request to the host-provided helper. - pub fn spawn_agent<'a, R>( + /// Delegates one guardian-owned subagent spawn request to the host helper. + pub fn spawn_subagent<'a, R>( &'a self, + forked_from_thread_id: ThreadId, request: R, ) -> AgentSpawnFuture<'a, >::Spawned, >::Error> where S: AgentSpawner, { - self.agent_spawner.spawn_agent(request) + self.agent_spawner + .spawn_subagent(forked_from_thread_id, request) + } +} + +/// Thread-local guardian state captured when the host starts a thread. +#[derive(Clone, Copy, Debug)] +pub struct GuardianThreadContext { + forked_from_thread_id: ThreadId, +} + +impl GuardianThreadContext { + /// Returns the thread that future guardian subagents should fork from by default. + pub fn forked_from_thread_id(&self) -> ThreadId { + self.forked_from_thread_id } } @@ -42,10 +53,14 @@ where { fn contribute( &self, + thread_id: ThreadId, _input: &Config, _session_store: &ExtensionData, - _thread_store: &ExtensionData, + thread_store: &ExtensionData, ) { + thread_store.insert(GuardianThreadContext { + forked_from_thread_id: thread_id, + }); } }