diff --git a/codex-rs/ext/extension-api/Cargo.toml b/codex-rs/ext/extension-api/Cargo.toml index 19a754ec49..299801737b 100644 --- a/codex-rs/ext/extension-api/Cargo.toml +++ b/codex-rs/ext/extension-api/Cargo.toml @@ -12,9 +12,11 @@ path = "src/lib.rs" workspace = true [dependencies] +codex-protocol = { workspace = true } codex-tools = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { version = "1.49.0", features = ["rt", "rt-multi-thread", "macros"] } [dev-dependencies] codex-git-attribution = { workspace = true } diff --git a/codex-rs/ext/extension-api/examples/enabled_extensions.rs b/codex-rs/ext/extension-api/examples/enabled_extensions.rs index a05940d983..0e29ac8ed5 100644 --- a/codex-rs/ext/extension-api/examples/enabled_extensions.rs +++ b/codex-rs/ext/extension-api/examples/enabled_extensions.rs @@ -1,16 +1,14 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicU64; - -use codex_extension_api::CodexExtension; -use codex_extension_api::ContextContributor; use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionRegistryBuilder; -use codex_extension_api::scopes::Thread; -use codex_extension_api::scopes::Turn; use codex_git_attribution as git_attribution; use codex_memories::MemoriesExtension; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use codex_memories::citation_output::MemoryState; +use codex_protocol::items::{TurnItem, UserMessageItem}; -fn main() { +#[tokio::main] +async fn main() { // 1. Build the concrete extension values owned by the host. let memories = Arc::new(MemoriesExtension::new( Some("Use memories when they help answer the user.".to_string()), @@ -29,48 +27,59 @@ fn main() { // whether they are active for this request. // // Ideally, this is instead the TurnContext or the Config or so - let context = ctx::RuntimeContext { + let runtime = ctx::RuntimeContext { commit_attribution: None, memory_tool_enabled: true, use_memories: true, }; - // 4. Assemble the stores that exist at this insertion point. - // - // Another insertion point could expose only `Thread`, or could add more - // scopes later. Contributors receive the same dynamic bag and address the - // scopes they need by marker type. The idea would be to have some re-usable contributors - // such as the Output one, but I'm happy to negociate this one + // 4. Build the host-owned stores used by the active contribution families. + let session_data = ExtensionData::new(); let thread_data = ExtensionData::new(); let turn_data = ExtensionData::new(); - let stores = codex_extension_api::stores! { - Thread => &thread_data, - Turn => &turn_data, - }; // 5. Invoke whichever contribution families this insertion point needs. let prompt_fragments = registry .context_contributors() .iter() - .flat_map(|contributor| contributor.contribute(&context, &stores)) + .flat_map(|contributor| contributor.contribute(&runtime, &session_data, &thread_data)) .collect::>(); let tools = registry .tool_contributors() .iter() - .flat_map(|contributor| contributor.tools(&context, &stores)) + .flat_map(|contributor| contributor.tools(&runtime, &thread_data)) .collect::>(); + let tools = registry + .tool_contributors() + .iter() + .flat_map(|contributor| contributor.tools(&runtime, &thread_data)) + .collect::>(); + + let mut item =TurnItem::UserMessage(UserMessageItem { + content: vec!(), + id: String::new() + }); + for contributor in registry.turn_item_contributors() { + let _ = contributor.contribute(&runtime, &thread_data, &turn_data, &mut item).await; + } + for contributor in registry.turn_item_contributors() { + let _ = contributor.contribute(&runtime, &thread_data, &turn_data, &mut item).await; + } + + let session_state = session_data.get_or_init::(Default::default); + let thread_state = thread_data.get_or_init::(Default::default); + let turn_state = turn_data.get_or_init::(Default::default); + + println!("Session: {} (expected 1)", session_state.counter.load(Ordering::Relaxed)); + println!("Thread: {} (expected 3)", thread_state.counter.load(Ordering::Relaxed)); + println!("Turn: {} (expected 2)", turn_state.counter.load(Ordering::Relaxed)); + println!("prompt fragments: {}", prompt_fragments.len()); println!("native tools: {}", tools.len()); } -// Just for the machinerie such that it compiles -#[derive(Debug, Default)] -struct ThreadPromptStats { - prompt_builds: AtomicU64, -} - mod ctx { use codex_git_attribution::GitAttributionContext; use codex_memories::ctx::MemoriesContext; diff --git a/codex-rs/ext/extension-api/src/contributors.rs b/codex-rs/ext/extension-api/src/contributors.rs index a5b5ef459d..8e599c805b 100644 --- a/codex-rs/ext/extension-api/src/contributors.rs +++ b/codex-rs/ext/extension-api/src/contributors.rs @@ -1,6 +1,8 @@ use std::future::Future; -use crate::Stores; +use codex_protocol::items::TurnItem; + +use crate::ExtensionData; mod prompt; mod tool; @@ -13,30 +15,37 @@ pub use tool::ToolHandler; /// Extension contribution that adds prompt fragments during prompt assembly. pub trait ContextContributor: Send + Sync { - fn contribute(&self, context: &C, stores: &Stores<'_>) -> Vec; + fn contribute( + &self, + context: &C, + session_store: &ExtensionData, + thread_store: &ExtensionData, + ) -> Vec; } /// Extension contribution that exposes native tools owned by a feature. pub trait ToolContributor: Send + Sync { /// Returns the native tools visible for the supplied runtime context. - fn tools(&self, context: &C, stores: &Stores<'_>) -> Vec>; + fn tools(&self, context: &C, thread_store: &ExtensionData) -> Vec>; } -/// Future returned by one ordered output contribution. -pub type OutputContributionFuture<'a> = +/// Future returned by one ordered turn-item contribution. +pub type TurnItemContributionFuture<'a> = std::pin::Pin> + Send + 'a>>; -/// Ordered post-processing contribution for one completed output value. +/// Ordered post-processing contribution for one parsed turn item. /// -/// This is kept abstract so that we can re-use it at multiple places. I kind of like it but I understand -/// if this is problematic with bad devs -pub trait OutputContributor: Send + Sync { +/// Implementations may mutate the item before it is emitted and may use the +/// explicitly exposed thread- and turn-lifetime stores when they need durable +/// extension-private state. +pub trait TurnItemContributor: Send + Sync { fn contribute<'a>( &'a self, context: &'a C, - stores: &'a Stores<'a>, - output: &'a mut O, - ) -> OutputContributionFuture<'a>; + thread_store: &'a ExtensionData, + turn_store: &'a ExtensionData, + item: &'a mut TurnItem, + ) -> TurnItemContributionFuture<'a>; } // TODO: WIP (do not consider) @@ -44,5 +53,10 @@ pub trait OutputContributor: Send + Sync { /// (ideally we can replace it by a session lifecycle thing or a request contributor?) pub trait ApprovalInterceptorContributor: Send + Sync { /// Returns whether this contributor should intercept approvals in `context`. - fn intercepts_approvals(&self, context: &C, stores: &Stores<'_>) -> bool; + fn intercepts_approvals( + &self, + context: &C, + thread_store: &ExtensionData, + turn_store: &ExtensionData, + ) -> bool; } diff --git a/codex-rs/ext/extension-api/src/lib.rs b/codex-rs/ext/extension-api/src/lib.rs index 5ed00de072..11824a598c 100644 --- a/codex-rs/ext/extension-api/src/lib.rs +++ b/codex-rs/ext/extension-api/src/lib.rs @@ -5,17 +5,15 @@ mod state; pub use contributors::ApprovalInterceptorContributor; pub use contributors::ContextContributor; -pub use contributors::OutputContributionFuture; -pub use contributors::OutputContributor; pub use contributors::PromptFragment; pub use contributors::PromptSlot; pub use contributors::ToolCallError; pub use contributors::ToolContribution; pub use contributors::ToolContributor; pub use contributors::ToolHandler; +pub use contributors::TurnItemContributionFuture; +pub use contributors::TurnItemContributor; pub use extension::CodexExtension; pub use registry::ExtensionRegistry; pub use registry::ExtensionRegistryBuilder; pub use state::ExtensionData; -pub use state::Stores; -pub use state::scopes; diff --git a/codex-rs/ext/extension-api/src/registry.rs b/codex-rs/ext/extension-api/src/registry.rs index 64495cccb4..900c1886bd 100644 --- a/codex-rs/ext/extension-api/src/registry.rs +++ b/codex-rs/ext/extension-api/src/registry.rs @@ -1,19 +1,16 @@ -use std::any::Any; -use std::any::TypeId; -use std::collections::HashMap; use std::sync::Arc; use crate::ApprovalInterceptorContributor; use crate::CodexExtension; use crate::ContextContributor; -use crate::OutputContributor; use crate::ToolContributor; +use crate::TurnItemContributor; /// Mutable registry used while extensions install their typed contributions. pub struct ExtensionRegistryBuilder { context_contributors: Vec>>, tool_contributors: Vec>>, - output_contributors: HashMap>, + turn_item_contributors: Vec>>, approval_interceptor_contributors: Vec>>, } @@ -23,7 +20,7 @@ impl Default for ExtensionRegistryBuilder { approval_interceptor_contributors: Vec::new(), context_contributors: Vec::new(), tool_contributors: Vec::new(), - output_contributors: HashMap::new(), + turn_item_contributors: Vec::new(), } } } @@ -70,21 +67,9 @@ impl ExtensionRegistryBuilder { self.tool_contributors.push(contributor); } - /// Registers one ordered output contributor for output type `O`. - pub fn output_contributor(&mut self, contributor: Arc>) - where - C: 'static, - O: 'static, - { - let Some(contributors) = self - .output_contributors - .entry(TypeId::of::()) - .or_insert_with(|| Box::new(Vec::>>::new())) - .downcast_mut::>>>() - else { - unreachable!("output contributor bucket type must match its registered output type"); - }; - contributors.push(contributor); + /// Registers one ordered turn-item contributor. + pub fn turn_item_contributor(&mut self, contributor: Arc>) { + self.turn_item_contributors.push(contributor); } /// Finishes construction and returns the immutable registry. @@ -93,7 +78,7 @@ impl ExtensionRegistryBuilder { approval_interceptor_contributors: self.approval_interceptor_contributors, context_contributors: self.context_contributors, tool_contributors: self.tool_contributors, - output_contributors: self.output_contributors, + turn_item_contributors: self.turn_item_contributors, } } } @@ -102,7 +87,7 @@ impl ExtensionRegistryBuilder { pub struct ExtensionRegistry { context_contributors: Vec>>, tool_contributors: Vec>>, - output_contributors: HashMap>, + turn_item_contributors: Vec>>, approval_interceptor_contributors: Vec>>, } @@ -124,18 +109,8 @@ impl ExtensionRegistry { &self.tool_contributors } - /// Returns the registered ordered output contributors for output type `O`. - pub fn output_contributors(&self) -> &[Arc>] - where - C: 'static, - O: 'static, - { - self.output_contributors - .get(&TypeId::of::()) - .and_then(|contributors| { - contributors.downcast_ref::>>>() - }) - .map(Vec::as_slice) - .unwrap_or_default() + /// Returns the registered ordered turn-item contributors. + pub fn turn_item_contributors(&self) -> &[Arc>] { + &self.turn_item_contributors } } diff --git a/codex-rs/ext/extension-api/src/state.rs b/codex-rs/ext/extension-api/src/state.rs index 2bc533455f..2fc9a8725f 100644 --- a/codex-rs/ext/extension-api/src/state.rs +++ b/codex-rs/ext/extension-api/src/state.rs @@ -7,12 +7,6 @@ use std::sync::PoisonError; type ErasedData = Arc; -/// Built-in extension-store scope markers. -pub mod scopes { - pub enum Thread {} - pub enum Turn {} -} - /// Typed extension-owned data attached to one host object. #[derive(Default, Debug)] pub struct ExtensionData { @@ -72,130 +66,6 @@ impl ExtensionData { } } -/// Dynamic set of host-owned extension stores visible at one contribution site. -/// -/// The host decides which lifetime scopes are available for each insertion -/// point. Contributors can then address one of those visible scopes by marker -/// type while storing extension-private values inside the selected owner. -#[derive(Default, Debug)] -pub struct Stores<'a> { - stores: HashMap, -} - -impl<'a> Stores<'a> { - /// Creates an empty set of visible stores. - pub fn new() -> Self { - Self::default() - } - - /// Adds or replaces the store identified by scope marker `Scope`. - pub fn insert_scope( - &mut self, - store: &'a ExtensionData, - ) -> Option<&'a ExtensionData> { - self.stores.insert(TypeId::of::(), store) - } - - /// Returns the value of type `T` from `Scope`, if one exists. - /// - /// # Panics - /// - /// Panics when `Scope` is unavailable at this contribution site. That is a - /// host/contributor wiring bug: insertion points define which scopes exist, - /// and contributors should only request scopes promised by their insertion point. - pub fn get(&self) -> Option> - where - Scope: 'static, - T: Any + Send + Sync, - { - self.store::().get::() - } - - /// Returns the value of type `T`, inserting one into `Scope` when absent. - /// - /// # Panics - /// - /// Panics when `Scope` is unavailable at this contribution site. That is a - /// host/contributor wiring bug: insertion points define which scopes exist, - /// and contributors should only request scopes promised by their insertion point. - pub fn get_or_init(&self, init: impl FnOnce() -> T) -> Arc - where - Scope: 'static, - T: Any + Send + Sync, - { - self.store::().get_or_init(init) - } - - /// Returns the value of type `T`, inserting one into `Scope` when available. - /// - /// Use this when a contributor intentionally supports multiple insertion - /// points and can proceed without state from a scope that is not visible. - pub fn try_get_or_init(&self, init: impl FnOnce() -> T) -> Option> - where - Scope: 'static, - T: Any + Send + Sync, - { - self.try_store::() - .map(|store| store.get_or_init(init)) - } - - /// Stores `value` in `Scope`, returning any previous value of type `T`. - /// - /// # Panics - /// - /// Panics when `Scope` is unavailable at this contribution site. That is a - /// host/contributor wiring bug: insertion points define which scopes exist, - /// and contributors should only request scopes promised by their insertion point. - pub fn insert(&self, value: T) -> Option> - where - Scope: 'static, - T: Any + Send + Sync, - { - self.store::().insert(value) - } - - /// Removes and returns the value of type `T` from `Scope`, if one exists. - /// - /// # Panics - /// - /// Panics when `Scope` is unavailable at this contribution site. That is a - /// host/contributor wiring bug: insertion points define which scopes exist, - /// and contributors should only request scopes promised by their insertion point. - pub fn remove(&self) -> Option> - where - Scope: 'static, - T: Any + Send + Sync, - { - self.store::().remove::() - } - - fn try_store(&self) -> Option<&ExtensionData> { - self.stores.get(&TypeId::of::()).copied() - } - - fn store(&self) -> &ExtensionData { - self.try_store::().unwrap_or_else(|| { - // This panic means a mistake made by a developer!!! - panic!( - "extension store for scope `{}` is unavailable", - std::any::type_name::() - ) - }) - } -} - -/// Builds a dynamic [`Stores`] bag from the scopes available at one insertion point. -/// -/// I know people don't like macros but this is super cool IMO -#[macro_export] -macro_rules! stores { - ($($scope:ty => $store:expr),* $(,)?) => {{ - let mut stores = $crate::Stores::new(); - $(stores.insert_scope::<$scope>($store);)* - stores - }}; -} - fn downcast_data(value: ErasedData) -> Arc where T: Any + Send + Sync,