mirror of
https://github.com/openai/codex.git
synced 2026-05-20 03:05:02 +00:00
refactor states
This commit is contained in:
@@ -12,9 +12,11 @@ path = "src/lib.rs"
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
codex-protocol = { workspace = true }
|
||||
codex-tools = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { version = "1.49.0", features = ["rt", "rt-multi-thread", "macros"] }
|
||||
|
||||
[dev-dependencies]
|
||||
codex-git-attribution = { workspace = true }
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use codex_extension_api::CodexExtension;
|
||||
use codex_extension_api::ContextContributor;
|
||||
use codex_extension_api::ExtensionData;
|
||||
use codex_extension_api::ExtensionRegistryBuilder;
|
||||
use codex_extension_api::scopes::Thread;
|
||||
use codex_extension_api::scopes::Turn;
|
||||
use codex_git_attribution as git_attribution;
|
||||
use codex_memories::MemoriesExtension;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use codex_memories::citation_output::MemoryState;
|
||||
use codex_protocol::items::{TurnItem, UserMessageItem};
|
||||
|
||||
fn main() {
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// 1. Build the concrete extension values owned by the host.
|
||||
let memories = Arc::new(MemoriesExtension::new(
|
||||
Some("Use memories when they help answer the user.".to_string()),
|
||||
@@ -29,48 +27,59 @@ fn main() {
|
||||
// whether they are active for this request.
|
||||
//
|
||||
// Ideally, this is instead the TurnContext or the Config or so
|
||||
let context = ctx::RuntimeContext {
|
||||
let runtime = ctx::RuntimeContext {
|
||||
commit_attribution: None,
|
||||
memory_tool_enabled: true,
|
||||
use_memories: true,
|
||||
};
|
||||
|
||||
// 4. Assemble the stores that exist at this insertion point.
|
||||
//
|
||||
// Another insertion point could expose only `Thread`, or could add more
|
||||
// scopes later. Contributors receive the same dynamic bag and address the
|
||||
// scopes they need by marker type. The idea would be to have some re-usable contributors
|
||||
// such as the Output one, but I'm happy to negociate this one
|
||||
// 4. Build the host-owned stores used by the active contribution families.
|
||||
let session_data = ExtensionData::new();
|
||||
let thread_data = ExtensionData::new();
|
||||
let turn_data = ExtensionData::new();
|
||||
let stores = codex_extension_api::stores! {
|
||||
Thread => &thread_data,
|
||||
Turn => &turn_data,
|
||||
};
|
||||
|
||||
// 5. Invoke whichever contribution families this insertion point needs.
|
||||
let prompt_fragments = registry
|
||||
.context_contributors()
|
||||
.iter()
|
||||
.flat_map(|contributor| contributor.contribute(&context, &stores))
|
||||
.flat_map(|contributor| contributor.contribute(&runtime, &session_data, &thread_data))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let tools = registry
|
||||
.tool_contributors()
|
||||
.iter()
|
||||
.flat_map(|contributor| contributor.tools(&context, &stores))
|
||||
.flat_map(|contributor| contributor.tools(&runtime, &thread_data))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let tools = registry
|
||||
.tool_contributors()
|
||||
.iter()
|
||||
.flat_map(|contributor| contributor.tools(&runtime, &thread_data))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut item =TurnItem::UserMessage(UserMessageItem {
|
||||
content: vec!(),
|
||||
id: String::new()
|
||||
});
|
||||
for contributor in registry.turn_item_contributors() {
|
||||
let _ = contributor.contribute(&runtime, &thread_data, &turn_data, &mut item).await;
|
||||
}
|
||||
for contributor in registry.turn_item_contributors() {
|
||||
let _ = contributor.contribute(&runtime, &thread_data, &turn_data, &mut item).await;
|
||||
}
|
||||
|
||||
let session_state = session_data.get_or_init::<MemoryState>(Default::default);
|
||||
let thread_state = thread_data.get_or_init::<MemoryState>(Default::default);
|
||||
let turn_state = turn_data.get_or_init::<MemoryState>(Default::default);
|
||||
|
||||
println!("Session: {} (expected 1)", session_state.counter.load(Ordering::Relaxed));
|
||||
println!("Thread: {} (expected 3)", thread_state.counter.load(Ordering::Relaxed));
|
||||
println!("Turn: {} (expected 2)", turn_state.counter.load(Ordering::Relaxed));
|
||||
|
||||
println!("prompt fragments: {}", prompt_fragments.len());
|
||||
println!("native tools: {}", tools.len());
|
||||
}
|
||||
|
||||
// Just for the machinerie such that it compiles
|
||||
#[derive(Debug, Default)]
|
||||
struct ThreadPromptStats {
|
||||
prompt_builds: AtomicU64,
|
||||
}
|
||||
|
||||
mod ctx {
|
||||
use codex_git_attribution::GitAttributionContext;
|
||||
use codex_memories::ctx::MemoriesContext;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::future::Future;
|
||||
|
||||
use crate::Stores;
|
||||
use codex_protocol::items::TurnItem;
|
||||
|
||||
use crate::ExtensionData;
|
||||
|
||||
mod prompt;
|
||||
mod tool;
|
||||
@@ -13,30 +15,37 @@ pub use tool::ToolHandler;
|
||||
|
||||
/// Extension contribution that adds prompt fragments during prompt assembly.
|
||||
pub trait ContextContributor<C>: Send + Sync {
|
||||
fn contribute(&self, context: &C, stores: &Stores<'_>) -> Vec<PromptFragment>;
|
||||
fn contribute(
|
||||
&self,
|
||||
context: &C,
|
||||
session_store: &ExtensionData,
|
||||
thread_store: &ExtensionData,
|
||||
) -> Vec<PromptFragment>;
|
||||
}
|
||||
|
||||
/// Extension contribution that exposes native tools owned by a feature.
|
||||
pub trait ToolContributor<C>: Send + Sync {
|
||||
/// Returns the native tools visible for the supplied runtime context.
|
||||
fn tools(&self, context: &C, stores: &Stores<'_>) -> Vec<ToolContribution<C>>;
|
||||
fn tools(&self, context: &C, thread_store: &ExtensionData) -> Vec<ToolContribution<C>>;
|
||||
}
|
||||
|
||||
/// Future returned by one ordered output contribution.
|
||||
pub type OutputContributionFuture<'a> =
|
||||
/// Future returned by one ordered turn-item contribution.
|
||||
pub type TurnItemContributionFuture<'a> =
|
||||
std::pin::Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
|
||||
|
||||
/// Ordered post-processing contribution for one completed output value.
|
||||
/// Ordered post-processing contribution for one parsed turn item.
|
||||
///
|
||||
/// This is kept abstract so that we can re-use it at multiple places. I kind of like it but I understand
|
||||
/// if this is problematic with bad devs
|
||||
pub trait OutputContributor<C, O>: Send + Sync {
|
||||
/// Implementations may mutate the item before it is emitted and may use the
|
||||
/// explicitly exposed thread- and turn-lifetime stores when they need durable
|
||||
/// extension-private state.
|
||||
pub trait TurnItemContributor<C>: Send + Sync {
|
||||
fn contribute<'a>(
|
||||
&'a self,
|
||||
context: &'a C,
|
||||
stores: &'a Stores<'a>,
|
||||
output: &'a mut O,
|
||||
) -> OutputContributionFuture<'a>;
|
||||
thread_store: &'a ExtensionData,
|
||||
turn_store: &'a ExtensionData,
|
||||
item: &'a mut TurnItem,
|
||||
) -> TurnItemContributionFuture<'a>;
|
||||
}
|
||||
|
||||
// TODO: WIP (do not consider)
|
||||
@@ -44,5 +53,10 @@ pub trait OutputContributor<C, O>: Send + Sync {
|
||||
/// (ideally we can replace it by a session lifecycle thing or a request contributor?)
|
||||
pub trait ApprovalInterceptorContributor<C>: Send + Sync {
|
||||
/// Returns whether this contributor should intercept approvals in `context`.
|
||||
fn intercepts_approvals(&self, context: &C, stores: &Stores<'_>) -> bool;
|
||||
fn intercepts_approvals(
|
||||
&self,
|
||||
context: &C,
|
||||
thread_store: &ExtensionData,
|
||||
turn_store: &ExtensionData,
|
||||
) -> bool;
|
||||
}
|
||||
|
||||
@@ -5,17 +5,15 @@ mod state;
|
||||
|
||||
pub use contributors::ApprovalInterceptorContributor;
|
||||
pub use contributors::ContextContributor;
|
||||
pub use contributors::OutputContributionFuture;
|
||||
pub use contributors::OutputContributor;
|
||||
pub use contributors::PromptFragment;
|
||||
pub use contributors::PromptSlot;
|
||||
pub use contributors::ToolCallError;
|
||||
pub use contributors::ToolContribution;
|
||||
pub use contributors::ToolContributor;
|
||||
pub use contributors::ToolHandler;
|
||||
pub use contributors::TurnItemContributionFuture;
|
||||
pub use contributors::TurnItemContributor;
|
||||
pub use extension::CodexExtension;
|
||||
pub use registry::ExtensionRegistry;
|
||||
pub use registry::ExtensionRegistryBuilder;
|
||||
pub use state::ExtensionData;
|
||||
pub use state::Stores;
|
||||
pub use state::scopes;
|
||||
|
||||
@@ -1,19 +1,16 @@
|
||||
use std::any::Any;
|
||||
use std::any::TypeId;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::ApprovalInterceptorContributor;
|
||||
use crate::CodexExtension;
|
||||
use crate::ContextContributor;
|
||||
use crate::OutputContributor;
|
||||
use crate::ToolContributor;
|
||||
use crate::TurnItemContributor;
|
||||
|
||||
/// Mutable registry used while extensions install their typed contributions.
|
||||
pub struct ExtensionRegistryBuilder<C> {
|
||||
context_contributors: Vec<Arc<dyn ContextContributor<C>>>,
|
||||
tool_contributors: Vec<Arc<dyn ToolContributor<C>>>,
|
||||
output_contributors: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
|
||||
turn_item_contributors: Vec<Arc<dyn TurnItemContributor<C>>>,
|
||||
approval_interceptor_contributors: Vec<Arc<dyn ApprovalInterceptorContributor<C>>>,
|
||||
}
|
||||
|
||||
@@ -23,7 +20,7 @@ impl<C> Default for ExtensionRegistryBuilder<C> {
|
||||
approval_interceptor_contributors: Vec::new(),
|
||||
context_contributors: Vec::new(),
|
||||
tool_contributors: Vec::new(),
|
||||
output_contributors: HashMap::new(),
|
||||
turn_item_contributors: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -70,21 +67,9 @@ impl<C> ExtensionRegistryBuilder<C> {
|
||||
self.tool_contributors.push(contributor);
|
||||
}
|
||||
|
||||
/// Registers one ordered output contributor for output type `O`.
|
||||
pub fn output_contributor<O>(&mut self, contributor: Arc<dyn OutputContributor<C, O>>)
|
||||
where
|
||||
C: 'static,
|
||||
O: 'static,
|
||||
{
|
||||
let Some(contributors) = self
|
||||
.output_contributors
|
||||
.entry(TypeId::of::<O>())
|
||||
.or_insert_with(|| Box::new(Vec::<Arc<dyn OutputContributor<C, O>>>::new()))
|
||||
.downcast_mut::<Vec<Arc<dyn OutputContributor<C, O>>>>()
|
||||
else {
|
||||
unreachable!("output contributor bucket type must match its registered output type");
|
||||
};
|
||||
contributors.push(contributor);
|
||||
/// Registers one ordered turn-item contributor.
|
||||
pub fn turn_item_contributor(&mut self, contributor: Arc<dyn TurnItemContributor<C>>) {
|
||||
self.turn_item_contributors.push(contributor);
|
||||
}
|
||||
|
||||
/// Finishes construction and returns the immutable registry.
|
||||
@@ -93,7 +78,7 @@ impl<C> ExtensionRegistryBuilder<C> {
|
||||
approval_interceptor_contributors: self.approval_interceptor_contributors,
|
||||
context_contributors: self.context_contributors,
|
||||
tool_contributors: self.tool_contributors,
|
||||
output_contributors: self.output_contributors,
|
||||
turn_item_contributors: self.turn_item_contributors,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -102,7 +87,7 @@ impl<C> ExtensionRegistryBuilder<C> {
|
||||
pub struct ExtensionRegistry<C> {
|
||||
context_contributors: Vec<Arc<dyn ContextContributor<C>>>,
|
||||
tool_contributors: Vec<Arc<dyn ToolContributor<C>>>,
|
||||
output_contributors: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
|
||||
turn_item_contributors: Vec<Arc<dyn TurnItemContributor<C>>>,
|
||||
approval_interceptor_contributors: Vec<Arc<dyn ApprovalInterceptorContributor<C>>>,
|
||||
}
|
||||
|
||||
@@ -124,18 +109,8 @@ impl<C> ExtensionRegistry<C> {
|
||||
&self.tool_contributors
|
||||
}
|
||||
|
||||
/// Returns the registered ordered output contributors for output type `O`.
|
||||
pub fn output_contributors<O>(&self) -> &[Arc<dyn OutputContributor<C, O>>]
|
||||
where
|
||||
C: 'static,
|
||||
O: 'static,
|
||||
{
|
||||
self.output_contributors
|
||||
.get(&TypeId::of::<O>())
|
||||
.and_then(|contributors| {
|
||||
contributors.downcast_ref::<Vec<Arc<dyn OutputContributor<C, O>>>>()
|
||||
})
|
||||
.map(Vec::as_slice)
|
||||
.unwrap_or_default()
|
||||
/// Returns the registered ordered turn-item contributors.
|
||||
pub fn turn_item_contributors(&self) -> &[Arc<dyn TurnItemContributor<C>>] {
|
||||
&self.turn_item_contributors
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,12 +7,6 @@ use std::sync::PoisonError;
|
||||
|
||||
type ErasedData = Arc<dyn Any + Send + Sync>;
|
||||
|
||||
/// Built-in extension-store scope markers.
|
||||
pub mod scopes {
|
||||
pub enum Thread {}
|
||||
pub enum Turn {}
|
||||
}
|
||||
|
||||
/// Typed extension-owned data attached to one host object.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct ExtensionData {
|
||||
@@ -72,130 +66,6 @@ impl ExtensionData {
|
||||
}
|
||||
}
|
||||
|
||||
/// Dynamic set of host-owned extension stores visible at one contribution site.
|
||||
///
|
||||
/// The host decides which lifetime scopes are available for each insertion
|
||||
/// point. Contributors can then address one of those visible scopes by marker
|
||||
/// type while storing extension-private values inside the selected owner.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Stores<'a> {
|
||||
stores: HashMap<TypeId, &'a ExtensionData>,
|
||||
}
|
||||
|
||||
impl<'a> Stores<'a> {
|
||||
/// Creates an empty set of visible stores.
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Adds or replaces the store identified by scope marker `Scope`.
|
||||
pub fn insert_scope<Scope: 'static>(
|
||||
&mut self,
|
||||
store: &'a ExtensionData,
|
||||
) -> Option<&'a ExtensionData> {
|
||||
self.stores.insert(TypeId::of::<Scope>(), store)
|
||||
}
|
||||
|
||||
/// Returns the value of type `T` from `Scope`, if one exists.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics when `Scope` is unavailable at this contribution site. That is a
|
||||
/// host/contributor wiring bug: insertion points define which scopes exist,
|
||||
/// and contributors should only request scopes promised by their insertion point.
|
||||
pub fn get<Scope, T>(&self) -> Option<Arc<T>>
|
||||
where
|
||||
Scope: 'static,
|
||||
T: Any + Send + Sync,
|
||||
{
|
||||
self.store::<Scope>().get::<T>()
|
||||
}
|
||||
|
||||
/// Returns the value of type `T`, inserting one into `Scope` when absent.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics when `Scope` is unavailable at this contribution site. That is a
|
||||
/// host/contributor wiring bug: insertion points define which scopes exist,
|
||||
/// and contributors should only request scopes promised by their insertion point.
|
||||
pub fn get_or_init<Scope, T>(&self, init: impl FnOnce() -> T) -> Arc<T>
|
||||
where
|
||||
Scope: 'static,
|
||||
T: Any + Send + Sync,
|
||||
{
|
||||
self.store::<Scope>().get_or_init(init)
|
||||
}
|
||||
|
||||
/// Returns the value of type `T`, inserting one into `Scope` when available.
|
||||
///
|
||||
/// Use this when a contributor intentionally supports multiple insertion
|
||||
/// points and can proceed without state from a scope that is not visible.
|
||||
pub fn try_get_or_init<Scope, T>(&self, init: impl FnOnce() -> T) -> Option<Arc<T>>
|
||||
where
|
||||
Scope: 'static,
|
||||
T: Any + Send + Sync,
|
||||
{
|
||||
self.try_store::<Scope>()
|
||||
.map(|store| store.get_or_init(init))
|
||||
}
|
||||
|
||||
/// Stores `value` in `Scope`, returning any previous value of type `T`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics when `Scope` is unavailable at this contribution site. That is a
|
||||
/// host/contributor wiring bug: insertion points define which scopes exist,
|
||||
/// and contributors should only request scopes promised by their insertion point.
|
||||
pub fn insert<Scope, T>(&self, value: T) -> Option<Arc<T>>
|
||||
where
|
||||
Scope: 'static,
|
||||
T: Any + Send + Sync,
|
||||
{
|
||||
self.store::<Scope>().insert(value)
|
||||
}
|
||||
|
||||
/// Removes and returns the value of type `T` from `Scope`, if one exists.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics when `Scope` is unavailable at this contribution site. That is a
|
||||
/// host/contributor wiring bug: insertion points define which scopes exist,
|
||||
/// and contributors should only request scopes promised by their insertion point.
|
||||
pub fn remove<Scope, T>(&self) -> Option<Arc<T>>
|
||||
where
|
||||
Scope: 'static,
|
||||
T: Any + Send + Sync,
|
||||
{
|
||||
self.store::<Scope>().remove::<T>()
|
||||
}
|
||||
|
||||
fn try_store<Scope: 'static>(&self) -> Option<&ExtensionData> {
|
||||
self.stores.get(&TypeId::of::<Scope>()).copied()
|
||||
}
|
||||
|
||||
fn store<Scope: 'static>(&self) -> &ExtensionData {
|
||||
self.try_store::<Scope>().unwrap_or_else(|| {
|
||||
// This panic means a mistake made by a developer!!!
|
||||
panic!(
|
||||
"extension store for scope `{}` is unavailable",
|
||||
std::any::type_name::<Scope>()
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a dynamic [`Stores`] bag from the scopes available at one insertion point.
|
||||
///
|
||||
/// I know people don't like macros but this is super cool IMO
|
||||
#[macro_export]
|
||||
macro_rules! stores {
|
||||
($($scope:ty => $store:expr),* $(,)?) => {{
|
||||
let mut stores = $crate::Stores::new();
|
||||
$(stores.insert_scope::<$scope>($store);)*
|
||||
stores
|
||||
}};
|
||||
}
|
||||
|
||||
fn downcast_data<T>(value: ErasedData) -> Arc<T>
|
||||
where
|
||||
T: Any + Send + Sync,
|
||||
|
||||
Reference in New Issue
Block a user