mirror of
https://github.com/openai/codex.git
synced 2026-05-23 20:44:50 +00:00
feat: guardian as an extension (contributors part) (#22216)
Part 1 of guardian as extension. This bind all the logic to spawn another agent from an extension and it adds `ThreadId` in the start thread collaborator
This commit is contained in:
11
codex-rs/Cargo.lock
generated
11
codex-rs/Cargo.lock
generated
@@ -1904,6 +1904,7 @@ dependencies = [
|
||||
"codex-file-watcher",
|
||||
"codex-git-attribution",
|
||||
"codex-git-utils",
|
||||
"codex-guardian",
|
||||
"codex-hooks",
|
||||
"codex-login",
|
||||
"codex-mcp",
|
||||
@@ -2906,6 +2907,7 @@ dependencies = [
|
||||
"codex-core",
|
||||
"codex-extension-api",
|
||||
"codex-features",
|
||||
"codex-protocol",
|
||||
"pretty_assertions",
|
||||
]
|
||||
|
||||
@@ -2933,6 +2935,15 @@ dependencies = [
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-guardian"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"codex-core",
|
||||
"codex-extension-api",
|
||||
"codex-protocol",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-hooks"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -45,6 +45,7 @@ members = [
|
||||
"execpolicy",
|
||||
"execpolicy-legacy",
|
||||
"ext/extension-api",
|
||||
"ext/guardian",
|
||||
"ext/git-attribution",
|
||||
"external-agent-migration",
|
||||
"external-agent-sessions",
|
||||
@@ -162,6 +163,7 @@ codex-file-system = { path = "file-system" }
|
||||
codex-exec-server = { path = "exec-server" }
|
||||
codex-execpolicy = { path = "execpolicy" }
|
||||
codex-extension-api = { path = "ext/extension-api" }
|
||||
codex-guardian = { path = "ext/guardian" }
|
||||
codex-git-attribution = { path = "ext/git-attribution" }
|
||||
codex-external-agent-migration = { path = "external-agent-migration" }
|
||||
codex-external-agent-sessions = { path = "external-agent-sessions" }
|
||||
|
||||
@@ -40,6 +40,7 @@ codex-extension-api = { workspace = true }
|
||||
codex-external-agent-migration = { workspace = true }
|
||||
codex-external-agent-sessions = { workspace = true }
|
||||
codex-features = { workspace = true }
|
||||
codex-guardian = { workspace = true }
|
||||
codex-git-attribution = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-file-watcher = { workspace = true }
|
||||
|
||||
@@ -1,11 +1,41 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
|
||||
use codex_core::NewThread;
|
||||
use codex_core::StartThreadOptions;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_extension_api::AgentSpawnFuture;
|
||||
use codex_extension_api::AgentSpawner;
|
||||
use codex_extension_api::ExtensionRegistry;
|
||||
use codex_extension_api::ExtensionRegistryBuilder;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::error::CodexErr;
|
||||
|
||||
pub(crate) fn thread_extensions() -> Arc<ExtensionRegistry<Config>> {
|
||||
pub(crate) fn thread_extensions<S>(guardian_agent_spawner: S) -> Arc<ExtensionRegistry<Config>>
|
||||
where
|
||||
S: AgentSpawner<StartThreadOptions, Spawned = NewThread, Error = CodexErr> + 'static,
|
||||
{
|
||||
let mut builder = ExtensionRegistryBuilder::<Config>::new();
|
||||
codex_git_attribution::install(&mut builder);
|
||||
codex_guardian::install(&mut builder, guardian_agent_spawner);
|
||||
Arc::new(builder.build())
|
||||
}
|
||||
|
||||
pub(crate) fn guardian_agent_spawner(
|
||||
thread_manager: Weak<ThreadManager>,
|
||||
) -> impl AgentSpawner<StartThreadOptions, Spawned = NewThread, Error = CodexErr> {
|
||||
move |forked_from_thread_id: ThreadId,
|
||||
options: StartThreadOptions|
|
||||
-> AgentSpawnFuture<'static, NewThread, CodexErr> {
|
||||
let thread_manager = thread_manager.clone();
|
||||
Box::pin(async move {
|
||||
let thread_manager = thread_manager.upgrade().ok_or_else(|| {
|
||||
CodexErr::UnsupportedOperation("thread manager dropped".to_string())
|
||||
})?;
|
||||
thread_manager
|
||||
.spawn_subagent(forked_from_thread_id, options)
|
||||
.await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,6 +99,7 @@ async fn queue_refresh(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::extensions::guardian_agent_spawner;
|
||||
use crate::extensions::thread_extensions;
|
||||
use async_trait::async_trait;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
@@ -179,18 +180,20 @@ mod tests {
|
||||
.await
|
||||
.expect("refresh tests require state db");
|
||||
let thread_store = thread_store_from_config(&good_config, Some(state_db.clone()));
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
&good_config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(EnvironmentManager::default_for_tests()),
|
||||
thread_extensions(),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store,
|
||||
Some(state_db.clone()),
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
/*attestation_provider*/ None,
|
||||
));
|
||||
let thread_manager = Arc::new_cyclic(|thread_manager| {
|
||||
ThreadManager::new(
|
||||
&good_config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(EnvironmentManager::default_for_tests()),
|
||||
thread_extensions(guardian_agent_spawner(thread_manager.clone())),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store,
|
||||
Some(state_db.clone()),
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
/*attestation_provider*/ None,
|
||||
)
|
||||
});
|
||||
thread_manager.start_thread(good_config).await?;
|
||||
thread_manager.start_thread(bad_config).await?;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::attestation::app_server_attestation_provider;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::connection_rpc_gate::ConnectionRpcGate;
|
||||
use crate::error_code::invalid_request;
|
||||
use crate::extensions::guardian_agent_spawner;
|
||||
use crate::extensions::thread_extensions;
|
||||
use crate::fs_watch::FsWatchManager;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
@@ -298,21 +299,23 @@ impl MessageProcessor {
|
||||
// affect per-thread behavior, but they must not move newly started,
|
||||
// resumed, or forked threads to a different persistence backend/root.
|
||||
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
environment_manager,
|
||||
thread_extensions(),
|
||||
Some(analytics_events_client.clone()),
|
||||
Arc::clone(&thread_store),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
Some(app_server_attestation_provider(
|
||||
outgoing.clone(),
|
||||
thread_state_manager.clone(),
|
||||
)),
|
||||
));
|
||||
let thread_manager = Arc::new_cyclic(|thread_manager| {
|
||||
ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
environment_manager,
|
||||
thread_extensions(guardian_agent_spawner(thread_manager.clone())),
|
||||
Some(analytics_events_client.clone()),
|
||||
Arc::clone(&thread_store),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
Some(app_server_attestation_provider(
|
||||
outgoing.clone(),
|
||||
thread_state_manager.clone(),
|
||||
)),
|
||||
)
|
||||
});
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.set_analytics_events_client(analytics_events_client.clone());
|
||||
|
||||
@@ -815,6 +815,7 @@ impl Session {
|
||||
let thread_extension_data = codex_extension_api::ExtensionData::new();
|
||||
for contributor in extensions.thread_start_contributors() {
|
||||
contributor.contribute(
|
||||
thread_id,
|
||||
config.as_ref(),
|
||||
&session_extension_data,
|
||||
&thread_extension_data,
|
||||
|
||||
@@ -567,6 +567,36 @@ impl ThreadManager {
|
||||
.await
|
||||
}
|
||||
|
||||
// TODO(jif) merge with fork_agent
|
||||
/// Spawn a subagent by forking persisted history from `forked_from_thread_id`.
|
||||
pub async fn spawn_subagent(
|
||||
&self,
|
||||
forked_from_thread_id: ThreadId,
|
||||
mut options: StartThreadOptions,
|
||||
) -> CodexResult<NewThread> {
|
||||
let fork_source = self.get_thread(forked_from_thread_id).await?;
|
||||
// Persist queued rollout updates before reading the fork snapshot.
|
||||
fork_source.ensure_rollout_materialized().await;
|
||||
fork_source.flush_rollout().await?;
|
||||
let stored_thread = fork_source
|
||||
.read_thread(
|
||||
/*include_archived*/ true, /*include_history*/ true,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
CodexErr::Fatal(format!(
|
||||
"failed to read subagent fork source {forked_from_thread_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
let history = stored_thread_to_initial_history(stored_thread, fork_source.rollout_path())?;
|
||||
options.initial_history = fork_history_from_snapshot(
|
||||
ForkSnapshot::Interrupted,
|
||||
history,
|
||||
InterruptedTurnHistoryMarker::from_config(&options.config),
|
||||
);
|
||||
self.start_thread_with_options(options).await
|
||||
}
|
||||
|
||||
pub async fn resume_thread_from_rollout(
|
||||
&self,
|
||||
config: Config,
|
||||
|
||||
38
codex-rs/ext/extension-api/src/capabilities/agent.rs
Normal file
38
codex-rs/ext/extension-api/src/capabilities/agent.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
|
||||
/// Future returned by one injected subagent-spawn helper.
|
||||
pub type AgentSpawnFuture<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;
|
||||
|
||||
/// Constructor-injected host helper for extensions that need to spawn subagents.
|
||||
///
|
||||
/// The extension owns the request shape and resulting handle types. The host
|
||||
/// provides the implementation when it constructs the extension.
|
||||
pub trait AgentSpawner<R>: Send + Sync {
|
||||
type Spawned;
|
||||
type Error;
|
||||
|
||||
fn spawn_subagent<'a>(
|
||||
&'a self,
|
||||
forked_from_thread_id: ThreadId,
|
||||
request: R,
|
||||
) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>;
|
||||
}
|
||||
|
||||
impl<R, S, E, F> AgentSpawner<R> for F
|
||||
where
|
||||
F: Fn(ThreadId, R) -> AgentSpawnFuture<'static, S, E> + Send + Sync,
|
||||
{
|
||||
type Spawned = S;
|
||||
type Error = E;
|
||||
|
||||
fn spawn_subagent<'a>(
|
||||
&'a self,
|
||||
forked_from_thread_id: ThreadId,
|
||||
request: R,
|
||||
) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> {
|
||||
self(forked_from_thread_id, request)
|
||||
}
|
||||
}
|
||||
4
codex-rs/ext/extension-api/src/capabilities/mod.rs
Normal file
4
codex-rs/ext/extension-api/src/capabilities/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
mod agent;
|
||||
|
||||
pub use agent::AgentSpawnFuture;
|
||||
pub use agent::AgentSpawner;
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::future::Future;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_tool_api::ToolBundle;
|
||||
|
||||
@@ -10,10 +11,16 @@ mod prompt;
|
||||
pub use prompt::PromptFragment;
|
||||
pub use prompt::PromptSlot;
|
||||
|
||||
/// Contributor that receives host-owned thread-start input before later
|
||||
/// contributors read from extension stores.
|
||||
/// Contributor that receives the live thread id and host-owned thread-start
|
||||
/// input before later contributors read from extension stores.
|
||||
pub trait ThreadStartContributor<C>: Send + Sync {
|
||||
fn contribute(&self, input: &C, session_store: &ExtensionData, thread_store: &ExtensionData);
|
||||
fn contribute(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
input: &C,
|
||||
session_store: &ExtensionData,
|
||||
thread_store: &ExtensionData,
|
||||
);
|
||||
}
|
||||
|
||||
/// Extension contribution that adds prompt fragments during prompt assembly.
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
mod capabilities;
|
||||
mod contributors;
|
||||
mod registry;
|
||||
mod state;
|
||||
|
||||
pub use capabilities::AgentSpawnFuture;
|
||||
pub use capabilities::AgentSpawner;
|
||||
pub use codex_tool_api::FunctionToolSpec;
|
||||
pub use codex_tool_api::ToolBundle;
|
||||
pub use codex_tool_api::ToolCall;
|
||||
|
||||
@@ -16,6 +16,7 @@ workspace = true
|
||||
codex-core = { workspace = true }
|
||||
codex-extension-api = { workspace = true }
|
||||
codex-features = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_extension_api::ExtensionRegistryBuilder;
|
||||
use codex_extension_api::PromptFragment;
|
||||
use codex_extension_api::ThreadStartContributor;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::ThreadId;
|
||||
|
||||
const DEFAULT_ATTRIBUTION_VALUE: &str = "Codex <noreply@openai.com>";
|
||||
|
||||
@@ -42,6 +43,7 @@ struct GitAttributionConfig {
|
||||
impl ThreadStartContributor<Config> for GitAttributionExtension {
|
||||
fn contribute(
|
||||
&self,
|
||||
_thread_id: ThreadId,
|
||||
config: &Config,
|
||||
_session_store: &ExtensionData,
|
||||
thread_store: &ExtensionData,
|
||||
|
||||
6
codex-rs/ext/guardian/BUILD.bazel
Normal file
6
codex-rs/ext/guardian/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "guardian",
|
||||
crate_name = "codex_guardian",
|
||||
)
|
||||
19
codex-rs/ext/guardian/Cargo.toml
Normal file
19
codex-rs/ext/guardian/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-guardian"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "codex_guardian"
|
||||
path = "src/lib.rs"
|
||||
test = false
|
||||
doctest = false
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
codex-core = { workspace = true }
|
||||
codex-extension-api = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
73
codex-rs/ext/guardian/src/lib.rs
Normal file
73
codex-rs/ext/guardian/src/lib.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::config::Config;
|
||||
use codex_extension_api::AgentSpawnFuture;
|
||||
use codex_extension_api::AgentSpawner;
|
||||
use codex_extension_api::ExtensionData;
|
||||
use codex_extension_api::ExtensionRegistryBuilder;
|
||||
use codex_extension_api::ThreadStartContributor;
|
||||
use codex_protocol::ThreadId;
|
||||
|
||||
/// Guardian extension dependencies supplied by the host at construction time.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GuardianExtension<S> {
|
||||
agent_spawner: S,
|
||||
}
|
||||
|
||||
impl<S> GuardianExtension<S> {
|
||||
/// Creates a guardian extension with its host-provided agent spawn helper.
|
||||
pub fn new(agent_spawner: S) -> Self {
|
||||
Self { agent_spawner }
|
||||
}
|
||||
|
||||
/// Delegates one guardian-owned subagent spawn request to the host helper.
|
||||
pub fn spawn_subagent<'a, R>(
|
||||
&'a self,
|
||||
forked_from_thread_id: ThreadId,
|
||||
request: R,
|
||||
) -> AgentSpawnFuture<'a, <S as AgentSpawner<R>>::Spawned, <S as AgentSpawner<R>>::Error>
|
||||
where
|
||||
S: AgentSpawner<R>,
|
||||
{
|
||||
self.agent_spawner
|
||||
.spawn_subagent(forked_from_thread_id, request)
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread-local guardian state captured when the host starts a thread.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GuardianThreadContext {
|
||||
forked_from_thread_id: ThreadId,
|
||||
}
|
||||
|
||||
impl GuardianThreadContext {
|
||||
/// Returns the thread that future guardian subagents should fork from by default.
|
||||
pub fn forked_from_thread_id(&self) -> ThreadId {
|
||||
self.forked_from_thread_id
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ThreadStartContributor<Config> for GuardianExtension<S>
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
fn contribute(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
_input: &Config,
|
||||
_session_store: &ExtensionData,
|
||||
thread_store: &ExtensionData,
|
||||
) {
|
||||
thread_store.insert(GuardianThreadContext {
|
||||
forked_from_thread_id: thread_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Installs the guardian contributors into the extension registry.
|
||||
pub fn install<S>(registry: &mut ExtensionRegistryBuilder<Config>, agent_spawner: S)
|
||||
where
|
||||
S: Send + Sync + 'static,
|
||||
{
|
||||
registry.thread_start_contributor(Arc::new(GuardianExtension::new(agent_spawner)));
|
||||
}
|
||||
Reference in New Issue
Block a user