diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index bbb61f8337..e92dbb07da 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1904,6 +1904,7 @@ dependencies = [ "codex-file-watcher", "codex-git-attribution", "codex-git-utils", + "codex-guardian", "codex-hooks", "codex-login", "codex-mcp", @@ -2906,6 +2907,7 @@ dependencies = [ "codex-core", "codex-extension-api", "codex-features", + "codex-protocol", "pretty_assertions", ] @@ -2933,6 +2935,15 @@ dependencies = [ "walkdir", ] +[[package]] +name = "codex-guardian" +version = "0.0.0" +dependencies = [ + "codex-core", + "codex-extension-api", + "codex-protocol", +] + [[package]] name = "codex-hooks" version = "0.0.0" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 7e5c647421..5a21e98995 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -45,6 +45,7 @@ members = [ "execpolicy", "execpolicy-legacy", "ext/extension-api", + "ext/guardian", "ext/git-attribution", "external-agent-migration", "external-agent-sessions", @@ -162,6 +163,7 @@ codex-file-system = { path = "file-system" } codex-exec-server = { path = "exec-server" } codex-execpolicy = { path = "execpolicy" } codex-extension-api = { path = "ext/extension-api" } +codex-guardian = { path = "ext/guardian" } codex-git-attribution = { path = "ext/git-attribution" } codex-external-agent-migration = { path = "external-agent-migration" } codex-external-agent-sessions = { path = "external-agent-sessions" } diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index 734475f946..fcf731aa9b 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -40,6 +40,7 @@ codex-extension-api = { workspace = true } codex-external-agent-migration = { workspace = true } codex-external-agent-sessions = { workspace = true } codex-features = { workspace = true } +codex-guardian = { workspace = true } codex-git-attribution = { workspace = true } codex-git-utils = { workspace = true } codex-file-watcher = { workspace = true } diff --git a/codex-rs/app-server/src/extensions.rs b/codex-rs/app-server/src/extensions.rs index 9fe468dc05..411685e7d8 100644 --- a/codex-rs/app-server/src/extensions.rs +++ b/codex-rs/app-server/src/extensions.rs @@ -1,11 +1,41 @@ use std::sync::Arc; +use std::sync::Weak; +use codex_core::NewThread; +use codex_core::StartThreadOptions; +use codex_core::ThreadManager; use codex_core::config::Config; +use codex_extension_api::AgentSpawnFuture; +use codex_extension_api::AgentSpawner; use codex_extension_api::ExtensionRegistry; use codex_extension_api::ExtensionRegistryBuilder; +use codex_protocol::ThreadId; +use codex_protocol::error::CodexErr; -pub(crate) fn thread_extensions() -> Arc> { +pub(crate) fn thread_extensions(guardian_agent_spawner: S) -> Arc> +where + S: AgentSpawner + 'static, +{ let mut builder = ExtensionRegistryBuilder::::new(); codex_git_attribution::install(&mut builder); + codex_guardian::install(&mut builder, guardian_agent_spawner); Arc::new(builder.build()) } + +pub(crate) fn guardian_agent_spawner( + thread_manager: Weak, +) -> impl AgentSpawner { + move |forked_from_thread_id: ThreadId, + options: StartThreadOptions| + -> AgentSpawnFuture<'static, NewThread, CodexErr> { + let thread_manager = thread_manager.clone(); + Box::pin(async move { + let thread_manager = thread_manager.upgrade().ok_or_else(|| { + CodexErr::UnsupportedOperation("thread manager dropped".to_string()) + })?; + thread_manager + .spawn_subagent(forked_from_thread_id, options) + .await + }) + } +} diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs index c085e44cf6..31a6f4afd8 100644 --- a/codex-rs/app-server/src/mcp_refresh.rs +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -99,6 +99,7 @@ async fn queue_refresh( #[cfg(test)] mod tests { use super::*; + use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; use async_trait::async_trait; use codex_arg0::Arg0DispatchPaths; @@ -179,18 +180,20 @@ mod tests { .await .expect("refresh tests require state db"); let thread_store = thread_store_from_config(&good_config, Some(state_db.clone())); - let thread_manager = Arc::new(ThreadManager::new( - &good_config, - auth_manager, - SessionSource::Exec, - Arc::new(EnvironmentManager::default_for_tests()), - thread_extensions(), - /*analytics_events_client*/ None, - thread_store, - Some(state_db.clone()), - "11111111-1111-4111-8111-111111111111".to_string(), - /*attestation_provider*/ None, - )); + let thread_manager = Arc::new_cyclic(|thread_manager| { + ThreadManager::new( + &good_config, + auth_manager, + SessionSource::Exec, + Arc::new(EnvironmentManager::default_for_tests()), + thread_extensions(guardian_agent_spawner(thread_manager.clone())), + /*analytics_events_client*/ None, + thread_store, + Some(state_db.clone()), + "11111111-1111-4111-8111-111111111111".to_string(), + /*attestation_provider*/ None, + ) + }); thread_manager.start_thread(good_config).await?; thread_manager.start_thread(bad_config).await?; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 4897684564..867cd64c99 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -8,6 +8,7 @@ use crate::attestation::app_server_attestation_provider; use crate::config_manager::ConfigManager; use crate::connection_rpc_gate::ConnectionRpcGate; use crate::error_code::invalid_request; +use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; use crate::fs_watch::FsWatchManager; use crate::outgoing_message::ConnectionId; @@ -298,21 +299,23 @@ impl MessageProcessor { // affect per-thread behavior, but they must not move newly started, // resumed, or forked threads to a different persistence backend/root. let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone()); - let thread_manager = Arc::new(ThreadManager::new( - config.as_ref(), - auth_manager.clone(), - session_source, - environment_manager, - thread_extensions(), - Some(analytics_events_client.clone()), - Arc::clone(&thread_store), - state_db.clone(), - installation_id, - Some(app_server_attestation_provider( - outgoing.clone(), - thread_state_manager.clone(), - )), - )); + let thread_manager = Arc::new_cyclic(|thread_manager| { + ThreadManager::new( + config.as_ref(), + auth_manager.clone(), + session_source, + environment_manager, + thread_extensions(guardian_agent_spawner(thread_manager.clone())), + Some(analytics_events_client.clone()), + Arc::clone(&thread_store), + state_db.clone(), + installation_id, + Some(app_server_attestation_provider( + outgoing.clone(), + thread_state_manager.clone(), + )), + ) + }); thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 32aaf8fb0b..747a6f5b4f 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -815,6 +815,7 @@ impl Session { let thread_extension_data = codex_extension_api::ExtensionData::new(); for contributor in extensions.thread_start_contributors() { contributor.contribute( + thread_id, config.as_ref(), &session_extension_data, &thread_extension_data, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 83bfe3b3d1..27cc600213 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -567,6 +567,36 @@ impl ThreadManager { .await } + // TODO(jif) merge with fork_agent + /// Spawn a subagent by forking persisted history from `forked_from_thread_id`. + pub async fn spawn_subagent( + &self, + forked_from_thread_id: ThreadId, + mut options: StartThreadOptions, + ) -> CodexResult { + let fork_source = self.get_thread(forked_from_thread_id).await?; + // Persist queued rollout updates before reading the fork snapshot. + fork_source.ensure_rollout_materialized().await; + fork_source.flush_rollout().await?; + let stored_thread = fork_source + .read_thread( + /*include_archived*/ true, /*include_history*/ true, + ) + .await + .map_err(|err| { + CodexErr::Fatal(format!( + "failed to read subagent fork source {forked_from_thread_id}: {err}" + )) + })?; + let history = stored_thread_to_initial_history(stored_thread, fork_source.rollout_path())?; + options.initial_history = fork_history_from_snapshot( + ForkSnapshot::Interrupted, + history, + InterruptedTurnHistoryMarker::from_config(&options.config), + ); + self.start_thread_with_options(options).await + } + pub async fn resume_thread_from_rollout( &self, config: Config, diff --git a/codex-rs/ext/extension-api/src/capabilities/agent.rs b/codex-rs/ext/extension-api/src/capabilities/agent.rs new file mode 100644 index 0000000000..3ecdfdf5fd --- /dev/null +++ b/codex-rs/ext/extension-api/src/capabilities/agent.rs @@ -0,0 +1,38 @@ +use std::future::Future; +use std::pin::Pin; + +use codex_protocol::ThreadId; + +/// Future returned by one injected subagent-spawn helper. +pub type AgentSpawnFuture<'a, T, E> = Pin> + Send + 'a>>; + +/// Constructor-injected host helper for extensions that need to spawn subagents. +/// +/// The extension owns the request shape and resulting handle types. The host +/// provides the implementation when it constructs the extension. +pub trait AgentSpawner: Send + Sync { + type Spawned; + type Error; + + fn spawn_subagent<'a>( + &'a self, + forked_from_thread_id: ThreadId, + request: R, + ) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error>; +} + +impl AgentSpawner for F +where + F: Fn(ThreadId, R) -> AgentSpawnFuture<'static, S, E> + Send + Sync, +{ + type Spawned = S; + type Error = E; + + fn spawn_subagent<'a>( + &'a self, + forked_from_thread_id: ThreadId, + request: R, + ) -> AgentSpawnFuture<'a, Self::Spawned, Self::Error> { + self(forked_from_thread_id, request) + } +} diff --git a/codex-rs/ext/extension-api/src/capabilities/mod.rs b/codex-rs/ext/extension-api/src/capabilities/mod.rs new file mode 100644 index 0000000000..bcc33da975 --- /dev/null +++ b/codex-rs/ext/extension-api/src/capabilities/mod.rs @@ -0,0 +1,4 @@ +mod agent; + +pub use agent::AgentSpawnFuture; +pub use agent::AgentSpawner; diff --git a/codex-rs/ext/extension-api/src/contributors.rs b/codex-rs/ext/extension-api/src/contributors.rs index 4475e9271d..18ca7b0299 100644 --- a/codex-rs/ext/extension-api/src/contributors.rs +++ b/codex-rs/ext/extension-api/src/contributors.rs @@ -1,5 +1,6 @@ use std::future::Future; +use codex_protocol::ThreadId; use codex_protocol::items::TurnItem; use codex_tool_api::ToolBundle; @@ -10,10 +11,16 @@ mod prompt; pub use prompt::PromptFragment; pub use prompt::PromptSlot; -/// Contributor that receives host-owned thread-start input before later -/// contributors read from extension stores. +/// Contributor that receives the live thread id and host-owned thread-start +/// input before later contributors read from extension stores. pub trait ThreadStartContributor: Send + Sync { - fn contribute(&self, input: &C, session_store: &ExtensionData, thread_store: &ExtensionData); + fn contribute( + &self, + thread_id: ThreadId, + input: &C, + session_store: &ExtensionData, + thread_store: &ExtensionData, + ); } /// Extension contribution that adds prompt fragments during prompt assembly. diff --git a/codex-rs/ext/extension-api/src/lib.rs b/codex-rs/ext/extension-api/src/lib.rs index 7c9e412fd6..3c9ee38a02 100644 --- a/codex-rs/ext/extension-api/src/lib.rs +++ b/codex-rs/ext/extension-api/src/lib.rs @@ -1,7 +1,10 @@ +mod capabilities; mod contributors; mod registry; mod state; +pub use capabilities::AgentSpawnFuture; +pub use capabilities::AgentSpawner; pub use codex_tool_api::FunctionToolSpec; pub use codex_tool_api::ToolBundle; pub use codex_tool_api::ToolCall; diff --git a/codex-rs/ext/git-attribution/Cargo.toml b/codex-rs/ext/git-attribution/Cargo.toml index edf8c07196..500ce47aec 100644 --- a/codex-rs/ext/git-attribution/Cargo.toml +++ b/codex-rs/ext/git-attribution/Cargo.toml @@ -16,6 +16,7 @@ workspace = true codex-core = { workspace = true } codex-extension-api = { workspace = true } codex-features = { workspace = true } +codex-protocol = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/codex-rs/ext/git-attribution/src/lib.rs b/codex-rs/ext/git-attribution/src/lib.rs index bb73a83011..eae906fa56 100644 --- a/codex-rs/ext/git-attribution/src/lib.rs +++ b/codex-rs/ext/git-attribution/src/lib.rs @@ -7,6 +7,7 @@ use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::PromptFragment; use codex_extension_api::ThreadStartContributor; use codex_features::Feature; +use codex_protocol::ThreadId; const DEFAULT_ATTRIBUTION_VALUE: &str = "Codex "; @@ -42,6 +43,7 @@ struct GitAttributionConfig { impl ThreadStartContributor for GitAttributionExtension { fn contribute( &self, + _thread_id: ThreadId, config: &Config, _session_store: &ExtensionData, thread_store: &ExtensionData, diff --git a/codex-rs/ext/guardian/BUILD.bazel b/codex-rs/ext/guardian/BUILD.bazel new file mode 100644 index 0000000000..adcbb090bc --- /dev/null +++ b/codex-rs/ext/guardian/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "guardian", + crate_name = "codex_guardian", +) diff --git a/codex-rs/ext/guardian/Cargo.toml b/codex-rs/ext/guardian/Cargo.toml new file mode 100644 index 0000000000..513254b7cf --- /dev/null +++ b/codex-rs/ext/guardian/Cargo.toml @@ -0,0 +1,19 @@ +[package] +edition.workspace = true +license.workspace = true +name = "codex-guardian" +version.workspace = true + +[lib] +name = "codex_guardian" +path = "src/lib.rs" +test = false +doctest = false + +[lints] +workspace = true + +[dependencies] +codex-core = { workspace = true } +codex-extension-api = { workspace = true } +codex-protocol = { workspace = true } diff --git a/codex-rs/ext/guardian/src/lib.rs b/codex-rs/ext/guardian/src/lib.rs new file mode 100644 index 0000000000..d8c9834d60 --- /dev/null +++ b/codex-rs/ext/guardian/src/lib.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use codex_core::config::Config; +use codex_extension_api::AgentSpawnFuture; +use codex_extension_api::AgentSpawner; +use codex_extension_api::ExtensionData; +use codex_extension_api::ExtensionRegistryBuilder; +use codex_extension_api::ThreadStartContributor; +use codex_protocol::ThreadId; + +/// Guardian extension dependencies supplied by the host at construction time. +#[derive(Clone, Debug)] +pub struct GuardianExtension { + agent_spawner: S, +} + +impl GuardianExtension { + /// Creates a guardian extension with its host-provided agent spawn helper. + pub fn new(agent_spawner: S) -> Self { + Self { agent_spawner } + } + + /// Delegates one guardian-owned subagent spawn request to the host helper. + pub fn spawn_subagent<'a, R>( + &'a self, + forked_from_thread_id: ThreadId, + request: R, + ) -> AgentSpawnFuture<'a, >::Spawned, >::Error> + where + S: AgentSpawner, + { + self.agent_spawner + .spawn_subagent(forked_from_thread_id, request) + } +} + +/// Thread-local guardian state captured when the host starts a thread. +#[derive(Clone, Copy, Debug)] +pub struct GuardianThreadContext { + forked_from_thread_id: ThreadId, +} + +impl GuardianThreadContext { + /// Returns the thread that future guardian subagents should fork from by default. + pub fn forked_from_thread_id(&self) -> ThreadId { + self.forked_from_thread_id + } +} + +impl ThreadStartContributor for GuardianExtension +where + S: Send + Sync, +{ + fn contribute( + &self, + thread_id: ThreadId, + _input: &Config, + _session_store: &ExtensionData, + thread_store: &ExtensionData, + ) { + thread_store.insert(GuardianThreadContext { + forked_from_thread_id: thread_id, + }); + } +} + +/// Installs the guardian contributors into the extension registry. +pub fn install(registry: &mut ExtensionRegistryBuilder, agent_spawner: S) +where + S: Send + Sync + 'static, +{ + registry.thread_start_contributor(Arc::new(GuardianExtension::new(agent_spawner))); +}