diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d15a1062d9..df0827a0de 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1901,6 +1901,7 @@ dependencies = [ "codex-file-watcher", "codex-git-attribution", "codex-git-utils", + "codex-guardian", "codex-hooks", "codex-login", "codex-mcp", @@ -2928,6 +2929,14 @@ dependencies = [ "walkdir", ] +[[package]] +name = "codex-guardian" +version = "0.0.0" +dependencies = [ + "codex-core", + "codex-extension-api", +] + [[package]] name = "codex-hooks" version = "0.0.0" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 7e5c647421..5a21e98995 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -45,6 +45,7 @@ members = [ "execpolicy", "execpolicy-legacy", "ext/extension-api", + "ext/guardian", "ext/git-attribution", "external-agent-migration", "external-agent-sessions", @@ -162,6 +163,7 @@ codex-file-system = { path = "file-system" } codex-exec-server = { path = "exec-server" } codex-execpolicy = { path = "execpolicy" } codex-extension-api = { path = "ext/extension-api" } +codex-guardian = { path = "ext/guardian" } codex-git-attribution = { path = "ext/git-attribution" } codex-external-agent-migration = { path = "external-agent-migration" } codex-external-agent-sessions = { path = "external-agent-sessions" } diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index 734475f946..fcf731aa9b 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -40,6 +40,7 @@ codex-extension-api = { workspace = true } codex-external-agent-migration = { workspace = true } codex-external-agent-sessions = { workspace = true } codex-features = { workspace = true } +codex-guardian = { workspace = true } codex-git-attribution = { workspace = true } codex-git-utils = { workspace = true } codex-file-watcher = { workspace = true } diff --git a/codex-rs/app-server/src/extensions.rs b/codex-rs/app-server/src/extensions.rs index 9fe468dc05..e3828526b7 100644 --- a/codex-rs/app-server/src/extensions.rs +++ b/codex-rs/app-server/src/extensions.rs @@ -1,11 +1,36 @@ use std::sync::Arc; +use std::sync::Weak; +use codex_core::NewThread; +use codex_core::StartThreadOptions; +use codex_core::ThreadManager; use codex_core::config::Config; +use codex_extension_api::AgentSpawnFuture; +use codex_extension_api::AgentSpawner; use codex_extension_api::ExtensionRegistry; use codex_extension_api::ExtensionRegistryBuilder; +use codex_protocol::error::CodexErr; -pub(crate) fn thread_extensions() -> Arc> { +pub(crate) fn thread_extensions(guardian_agent_spawner: S) -> Arc> +where + S: AgentSpawner + 'static, +{ let mut builder = ExtensionRegistryBuilder::::new(); codex_git_attribution::install(&mut builder); + codex_guardian::install(&mut builder, guardian_agent_spawner); Arc::new(builder.build()) } + +pub(crate) fn guardian_agent_spawner( + thread_manager: Weak, +) -> impl AgentSpawner { + move |options: StartThreadOptions| -> AgentSpawnFuture<'static, NewThread, CodexErr> { + let thread_manager = thread_manager.clone(); + Box::pin(async move { + let thread_manager = thread_manager.upgrade().ok_or_else(|| { + CodexErr::UnsupportedOperation("thread manager dropped".to_string()) + })?; + thread_manager.start_thread_with_options(options).await + }) + } +} diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs index c085e44cf6..31a6f4afd8 100644 --- a/codex-rs/app-server/src/mcp_refresh.rs +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -99,6 +99,7 @@ async fn queue_refresh( #[cfg(test)] mod tests { use super::*; + use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; use async_trait::async_trait; use codex_arg0::Arg0DispatchPaths; @@ -179,18 +180,20 @@ mod tests { .await .expect("refresh tests require state db"); let thread_store = thread_store_from_config(&good_config, Some(state_db.clone())); - let thread_manager = Arc::new(ThreadManager::new( - &good_config, - auth_manager, - SessionSource::Exec, - Arc::new(EnvironmentManager::default_for_tests()), - thread_extensions(), - /*analytics_events_client*/ None, - thread_store, - Some(state_db.clone()), - "11111111-1111-4111-8111-111111111111".to_string(), - /*attestation_provider*/ None, - )); + let thread_manager = Arc::new_cyclic(|thread_manager| { + ThreadManager::new( + &good_config, + auth_manager, + SessionSource::Exec, + Arc::new(EnvironmentManager::default_for_tests()), + thread_extensions(guardian_agent_spawner(thread_manager.clone())), + /*analytics_events_client*/ None, + thread_store, + Some(state_db.clone()), + "11111111-1111-4111-8111-111111111111".to_string(), + /*attestation_provider*/ None, + ) + }); thread_manager.start_thread(good_config).await?; thread_manager.start_thread(bad_config).await?; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 4897684564..867cd64c99 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -8,6 +8,7 @@ use crate::attestation::app_server_attestation_provider; use crate::config_manager::ConfigManager; use crate::connection_rpc_gate::ConnectionRpcGate; use crate::error_code::invalid_request; +use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; use crate::fs_watch::FsWatchManager; use crate::outgoing_message::ConnectionId; @@ -298,21 +299,23 @@ impl MessageProcessor { // affect per-thread behavior, but they must not move newly started, // resumed, or forked threads to a different persistence backend/root. let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone()); - let thread_manager = Arc::new(ThreadManager::new( - config.as_ref(), - auth_manager.clone(), - session_source, - environment_manager, - thread_extensions(), - Some(analytics_events_client.clone()), - Arc::clone(&thread_store), - state_db.clone(), - installation_id, - Some(app_server_attestation_provider( - outgoing.clone(), - thread_state_manager.clone(), - )), - )); + let thread_manager = Arc::new_cyclic(|thread_manager| { + ThreadManager::new( + config.as_ref(), + auth_manager.clone(), + session_source, + environment_manager, + thread_extensions(guardian_agent_spawner(thread_manager.clone())), + Some(analytics_events_client.clone()), + Arc::clone(&thread_store), + state_db.clone(), + installation_id, + Some(app_server_attestation_provider( + outgoing.clone(), + thread_state_manager.clone(), + )), + ) + }); thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); diff --git a/codex-rs/ext/extension-api/src/capabilities/agent.rs b/codex-rs/ext/extension-api/src/capabilities/agent.rs new file mode 100644 index 0000000000..a60a43790e --- /dev/null +++ b/codex-rs/ext/extension-api/src/capabilities/agent.rs @@ -0,0 +1,28 @@ +use std::future::Future; +use std::pin::Pin; + +/// Future returned by one injected agent-spawn helper. +pub type AgentSpawnFuture<'a, T, E> = Pin> + Send + 'a>>; + +/// Constructor-injected host helper for extensions that need to spawn agents. +/// +/// The extension owns the request shape and resulting handle types. The host +/// provides the implementation when it constructs the extension. +pub trait AgentSpawner: Send + Sync { + type Spawned; + type Error; + + fn spawn_agent<'a>(&'a self, request: R) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>; +} + +impl AgentSpawner for F +where + F: Fn(R) -> AgentSpawnFuture<'static, S, E> + Send + Sync, +{ + type Spawned = S; + type Error = E; + + fn spawn_agent<'a>(&'a self, request: R) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> { + self(request) + } +} diff --git a/codex-rs/ext/extension-api/src/capabilities/mod.rs b/codex-rs/ext/extension-api/src/capabilities/mod.rs new file mode 100644 index 0000000000..bcc33da975 --- /dev/null +++ b/codex-rs/ext/extension-api/src/capabilities/mod.rs @@ -0,0 +1,4 @@ +mod agent; + +pub use agent::AgentSpawnFuture; +pub use agent::AgentSpawner; diff --git a/codex-rs/ext/extension-api/src/lib.rs b/codex-rs/ext/extension-api/src/lib.rs index 7c9e412fd6..3c9ee38a02 100644 --- a/codex-rs/ext/extension-api/src/lib.rs +++ b/codex-rs/ext/extension-api/src/lib.rs @@ -1,7 +1,10 @@ +mod capabilities; mod contributors; mod registry; mod state; +pub use capabilities::AgentSpawnFuture; +pub use capabilities::AgentSpawner; pub use codex_tool_api::FunctionToolSpec; pub use codex_tool_api::ToolBundle; pub use codex_tool_api::ToolCall; diff --git a/codex-rs/ext/guardian/BUILD.bazel b/codex-rs/ext/guardian/BUILD.bazel new file mode 100644 index 0000000000..adcbb090bc --- /dev/null +++ b/codex-rs/ext/guardian/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "guardian", + crate_name = "codex_guardian", +) diff --git a/codex-rs/ext/guardian/Cargo.toml b/codex-rs/ext/guardian/Cargo.toml new file mode 100644 index 0000000000..8c062c816a --- /dev/null +++ b/codex-rs/ext/guardian/Cargo.toml @@ -0,0 +1,17 @@ +[package] +edition.workspace = true +license.workspace = true +name = "codex-guardian" +version.workspace = true + +[lib] +name = "codex_guardian" +path = "src/lib.rs" +doctest = false + +[lints] +workspace = true + +[dependencies] +codex-core = { workspace = true } +codex-extension-api = { workspace = true } diff --git a/codex-rs/ext/guardian/src/lib.rs b/codex-rs/ext/guardian/src/lib.rs new file mode 100644 index 0000000000..5e5e02b99a --- /dev/null +++ b/codex-rs/ext/guardian/src/lib.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use codex_core::config::Config; +use codex_extension_api::AgentSpawnFuture; +use codex_extension_api::AgentSpawner; +use codex_extension_api::ExtensionData; +use codex_extension_api::ExtensionRegistryBuilder; +use codex_extension_api::ThreadStartContributor; + +/// Guardian extension dependencies supplied by the host at construction time. +#[derive(Clone, Debug)] +pub struct GuardianExtension { + agent_spawner: S, +} + +impl GuardianExtension { + /// Creates a guardian extension with its host-provided agent spawn helper. + pub fn new(agent_spawner: S) -> Self { + Self { agent_spawner } + } + + /// Returns the host-provided agent spawn helper. + pub fn agent_spawner(&self) -> &S { + &self.agent_spawner + } + + /// Delegates one guardian-owned spawn request to the host-provided helper. + pub fn spawn_agent<'a, R>( + &'a self, + request: R, + ) -> AgentSpawnFuture<'a, >::Spawned, >::Error> + where + S: AgentSpawner, + { + self.agent_spawner.spawn_agent(request) + } +} + +impl ThreadStartContributor for GuardianExtension +where + S: Send + Sync, +{ + fn contribute( + &self, + _input: &Config, + _session_store: &ExtensionData, + _thread_store: &ExtensionData, + ) { + } +} + +/// Installs the guardian contributors into the extension registry. +pub fn install(registry: &mut ExtensionRegistryBuilder, agent_spawner: S) +where + S: Send + Sync + 'static, +{ + registry.thread_start_contributor(Arc::new(GuardianExtension::new(agent_spawner))); +}