bind the spawn agent capability

This commit is contained in:
jif-oai
2026-05-11 23:06:14 +01:00
parent 30ebfbf30f
commit 5ea36a7efb
11 changed files with 162 additions and 20 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -2902,6 +2902,7 @@ dependencies = [
"codex-core",
"codex-extension-api",
"codex-features",
"codex-protocol",
"pretty_assertions",
]
@@ -2935,6 +2936,7 @@ version = "0.0.0"
dependencies = [
"codex-core",
"codex-extension-api",
"codex-protocol",
]
[[package]]

View File

@@ -9,6 +9,7 @@ use codex_extension_api::AgentSpawnFuture;
use codex_extension_api::AgentSpawner;
use codex_extension_api::ExtensionRegistry;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
pub(crate) fn thread_extensions<S>(guardian_agent_spawner: S) -> Arc<ExtensionRegistry<Config>>
@@ -24,13 +25,17 @@ where
pub(crate) fn guardian_agent_spawner(
thread_manager: Weak<ThreadManager>,
) -> impl AgentSpawner<StartThreadOptions, Spawned = NewThread, Error = CodexErr> {
move |options: StartThreadOptions| -> AgentSpawnFuture<'static, NewThread, CodexErr> {
move |forked_from_thread_id: ThreadId,
options: StartThreadOptions|
-> AgentSpawnFuture<'static, NewThread, CodexErr> {
let thread_manager = thread_manager.clone();
Box::pin(async move {
let thread_manager = thread_manager.upgrade().ok_or_else(|| {
CodexErr::UnsupportedOperation("thread manager dropped".to_string())
})?;
thread_manager.start_thread_with_options(options).await
thread_manager
.spawn_subagent(forked_from_thread_id, options)
.await
})
}
}

View File

@@ -815,6 +815,7 @@ impl Session {
let thread_extension_data = codex_extension_api::ExtensionData::new();
for contributor in extensions.thread_start_contributors() {
contributor.contribute(
thread_id,
config.as_ref(),
&session_extension_data,
&thread_extension_data,

View File

@@ -567,6 +567,35 @@ impl ThreadManager {
.await
}
/// Spawn a subagent by forking persisted history from `forked_from_thread_id`.
pub async fn spawn_subagent(
&self,
forked_from_thread_id: ThreadId,
mut options: StartThreadOptions,
) -> CodexResult<NewThread> {
let fork_source = self.get_thread(forked_from_thread_id).await?;
// Persist queued rollout updates before reading the fork snapshot.
fork_source.ensure_rollout_materialized().await;
fork_source.flush_rollout().await?;
let stored_thread = fork_source
.read_thread(
/*include_archived*/ true, /*include_history*/ true,
)
.await
.map_err(|err| {
CodexErr::Fatal(format!(
"failed to read subagent fork source {forked_from_thread_id}: {err}"
))
})?;
let history = stored_thread_to_initial_history(stored_thread, fork_source.rollout_path())?;
options.initial_history = fork_history_from_snapshot(
ForkSnapshot::Interrupted,
history,
InterruptedTurnHistoryMarker::from_config(&options.config),
);
self.start_thread_with_options(options).await
}
pub async fn resume_thread_from_rollout(
&self,
config: Config,

View File

@@ -481,6 +481,75 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
assert!(manager.list_thread_ids().await.is_empty());
}
#[tokio::test]
async fn spawn_subagent_forks_from_the_source_thread() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
config.cwd = config.codex_home.abs();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let state_db = init_state_db(&config).await;
let thread_store = thread_store_from_config(&config, state_db.clone());
let manager = ThreadManager::new(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
empty_extension_registry(),
/*analytics_events_client*/ None,
thread_store.clone(),
state_db,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
);
let parent = manager
.start_thread(config.clone())
.await
.expect("start parent thread");
let spawned = manager
.spawn_subagent(
parent.thread_id,
StartThreadOptions {
config,
initial_history: InitialHistory::New,
session_source: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
},
)
.await
.expect("spawn subagent");
let stored_spawned = spawned
.thread
.read_thread(
/*include_archived*/ true, /*include_history*/ true,
)
.await
.expect("read spawned subagent");
assert_ne!(spawned.thread_id, parent.thread_id);
assert_eq!(stored_spawned.forked_from_id, Some(parent.thread_id));
parent
.thread
.shutdown_and_wait()
.await
.expect("shutdown parent thread");
spawned
.thread
.shutdown_and_wait()
.await
.expect("shutdown spawned subagent");
}
#[tokio::test]
async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
let temp_dir = tempdir().expect("tempdir");

View File

@@ -1,10 +1,12 @@
use std::future::Future;
use std::pin::Pin;
/// Future returned by one injected agent-spawn helper.
use codex_protocol::ThreadId;
/// Future returned by one injected subagent-spawn helper.
pub type AgentSpawnFuture<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;
/// Constructor-injected host helper for extensions that need to spawn agents.
/// Constructor-injected host helper for extensions that need to spawn subagents.
///
/// The extension owns the request shape and resulting handle types. The host
/// provides the implementation when it constructs the extension.
@@ -12,17 +14,25 @@ pub trait AgentSpawner<R>: Send + Sync {
type Spawned;
type Error;
fn spawn_agent<'a>(&'a self, request: R) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>;
fn spawn_subagent<'a>(
&'a self,
forked_from_thread_id: ThreadId,
request: R,
) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>;
}
impl<R, S, E, F> AgentSpawner<R> for F
where
F: Fn(R) -> AgentSpawnFuture<'static, S, E> + Send + Sync,
F: Fn(ThreadId, R) -> AgentSpawnFuture<'static, S, E> + Send + Sync,
{
type Spawned = S;
type Error = E;
fn spawn_agent<'a>(&'a self, request: R) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> {
self(request)
fn spawn_subagent<'a>(
&'a self,
forked_from_thread_id: ThreadId,
request: R,
) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> {
self(forked_from_thread_id, request)
}
}

View File

@@ -1,5 +1,6 @@
use std::future::Future;
use codex_protocol::ThreadId;
use codex_protocol::items::TurnItem;
use codex_tool_api::ToolBundle;
@@ -10,10 +11,16 @@ mod prompt;
pub use prompt::PromptFragment;
pub use prompt::PromptSlot;
/// Contributor that receives host-owned thread-start input before later
/// contributors read from extension stores.
/// Contributor that receives the live thread id and host-owned thread-start
/// input before later contributors read from extension stores.
pub trait ThreadStartContributor<C>: Send + Sync {
fn contribute(&self, input: &C, session_store: &ExtensionData, thread_store: &ExtensionData);
fn contribute(
&self,
thread_id: ThreadId,
input: &C,
session_store: &ExtensionData,
thread_store: &ExtensionData,
);
}
/// Extension contribution that adds prompt fragments during prompt assembly.

View File

@@ -16,6 +16,7 @@ workspace = true
codex-core = { workspace = true }
codex-extension-api = { workspace = true }
codex-features = { workspace = true }
codex-protocol = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -7,6 +7,7 @@ use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::PromptFragment;
use codex_extension_api::ThreadStartContributor;
use codex_features::Feature;
use codex_protocol::ThreadId;
const DEFAULT_ATTRIBUTION_VALUE: &str = "Codex <noreply@openai.com>";
@@ -42,6 +43,7 @@ struct GitAttributionConfig {
impl ThreadStartContributor<Config> for GitAttributionExtension {
fn contribute(
&self,
_thread_id: ThreadId,
config: &Config,
_session_store: &ExtensionData,
thread_store: &ExtensionData,

View File

@@ -15,3 +15,4 @@ workspace = true
[dependencies]
codex-core = { workspace = true }
codex-extension-api = { workspace = true }
codex-protocol = { workspace = true }

View File

@@ -6,6 +6,7 @@ use codex_extension_api::AgentSpawner;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::ThreadStartContributor;
use codex_protocol::ThreadId;
/// Guardian extension dependencies supplied by the host at construction time.
#[derive(Clone, Debug)]
@@ -19,20 +20,30 @@ impl<S> GuardianExtension<S> {
Self { agent_spawner }
}
/// Returns the host-provided agent spawn helper.
pub fn agent_spawner(&self) -> &S {
&self.agent_spawner
}
/// Delegates one guardian-owned spawn request to the host-provided helper.
pub fn spawn_agent<'a, R>(
/// Delegates one guardian-owned subagent spawn request to the host helper.
pub fn spawn_subagent<'a, R>(
&'a self,
forked_from_thread_id: ThreadId,
request: R,
) -> AgentSpawnFuture<'a, <S as AgentSpawner<R>>::Spawned, <S as AgentSpawner<R>>::Error>
where
S: AgentSpawner<R>,
{
self.agent_spawner.spawn_agent(request)
self.agent_spawner
.spawn_subagent(forked_from_thread_id, request)
}
}
/// Thread-local guardian state captured when the host starts a thread.
#[derive(Clone, Copy, Debug)]
pub struct GuardianThreadContext {
forked_from_thread_id: ThreadId,
}
impl GuardianThreadContext {
/// Returns the thread that future guardian subagents should fork from by default.
pub fn forked_from_thread_id(&self) -> ThreadId {
self.forked_from_thread_id
}
}
@@ -42,10 +53,14 @@ where
{
fn contribute(
&self,
thread_id: ThreadId,
_input: &Config,
_session_store: &ExtensionData,
_thread_store: &ExtensionData,
thread_store: &ExtensionData,
) {
thread_store.insert(GuardianThreadContext {
forked_from_thread_id: thread_id,
});
}
}