diff --git a/codex-rs/ext/goal/src/events.rs b/codex-rs/ext/goal/src/events.rs new file mode 100644 index 0000000000..ab9eda4056 --- /dev/null +++ b/codex-rs/ext/goal/src/events.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use codex_extension_api::ExtensionEventSink; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ThreadGoal; +use codex_protocol::protocol::ThreadGoalUpdatedEvent; + +#[derive(Clone)] +pub(crate) struct GoalEventEmitter { + sink: Arc, +} + +impl GoalEventEmitter { + pub(crate) fn new(sink: Arc) -> Self { + Self { sink } + } + + pub(crate) fn thread_goal_updated( + &self, + event_id: impl Into, + turn_id: Option, + goal: ThreadGoal, + ) { + self.sink.emit(Event { + id: event_id.into(), + msg: EventMsg::ThreadGoalUpdated(ThreadGoalUpdatedEvent { + thread_id: goal.thread_id, + turn_id, + goal, + }), + }); + } +} diff --git a/codex-rs/ext/goal/src/extension.rs b/codex-rs/ext/goal/src/extension.rs index 3204b9eb92..a0ce7117f4 100644 --- a/codex-rs/ext/goal/src/extension.rs +++ b/codex-rs/ext/goal/src/extension.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use async_trait::async_trait; use codex_extension_api::ConfigContributor; use codex_extension_api::ExtensionData; +use codex_extension_api::ExtensionEventSink; use codex_extension_api::ExtensionRegistryBuilder; +use codex_extension_api::NoopExtensionEventSink; use codex_extension_api::ThreadLifecycleContributor; use codex_extension_api::ThreadStartInput; use codex_extension_api::TokenUsageContributor; @@ -18,6 +20,7 @@ use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::TurnAbortReason; use crate::accounting::GoalAccountingState; +use crate::events::GoalEventEmitter; use crate::tool::CreateGoalRequest; use crate::tool::GoalToolExecutor; @@ -35,6 +38,7 @@ impl GoalExtensionConfig { #[derive(Clone)] pub struct GoalExtension { backend: Arc, + event_emitter: GoalEventEmitter, goals_enabled: Arc bool + Send + Sync>, } @@ -48,9 +52,18 @@ impl GoalExtension { pub fn new( backend: Arc, goals_enabled: impl Fn(&C) -> bool + Send + Sync + 'static, + ) -> Self { + Self::new_with_event_sink(backend, Arc::new(NoopExtensionEventSink), goals_enabled) + } + + pub fn new_with_event_sink( + backend: Arc, + event_sink: Arc, + goals_enabled: impl Fn(&C) -> bool + Send + Sync + 'static, ) -> Self { Self { backend, + event_emitter: GoalEventEmitter::new(event_sink), goals_enabled: Arc::new(goals_enabled), } } @@ -231,14 +244,20 @@ where return Vec::new(); }; vec![ - Arc::new(GoalToolExecutor::get(thread_id, Arc::clone(&self.backend))), + Arc::new(GoalToolExecutor::get( + thread_id, + Arc::clone(&self.backend), + self.event_emitter.clone(), + )), Arc::new(GoalToolExecutor::create( thread_id, Arc::clone(&self.backend), + self.event_emitter.clone(), )), Arc::new(GoalToolExecutor::update( thread_id, Arc::clone(&self.backend), + self.event_emitter.clone(), )), ] } @@ -260,7 +279,11 @@ pub fn install_with_backend( ) where C: Send + Sync + 'static, { - let extension = Arc::new(GoalExtension::new(backend, goals_enabled)); + let extension = Arc::new(GoalExtension::new_with_event_sink( + backend, + registry.event_sink(), + goals_enabled, + )); registry.thread_lifecycle_contributor(extension.clone()); registry.config_contributor(extension.clone()); registry.turn_lifecycle_contributor(extension.clone()); diff --git a/codex-rs/ext/goal/src/lib.rs b/codex-rs/ext/goal/src/lib.rs index 17a43e4671..38332aee41 100644 --- a/codex-rs/ext/goal/src/lib.rs +++ b/codex-rs/ext/goal/src/lib.rs @@ -5,6 +5,7 @@ //! accounting that can be represented with today's extension API. mod accounting; +mod events; mod extension; mod spec; mod tool; diff --git a/codex-rs/ext/goal/src/tool.rs b/codex-rs/ext/goal/src/tool.rs index d19dc81056..8ad4541a3c 100644 --- a/codex-rs/ext/goal/src/tool.rs +++ b/codex-rs/ext/goal/src/tool.rs @@ -15,6 +15,7 @@ use codex_protocol::protocol::validate_thread_goal_objective; use serde::Deserialize; use serde::Serialize; +use crate::events::GoalEventEmitter; use crate::extension::GoalToolBackend; use crate::spec::CREATE_GOAL_TOOL_NAME; use crate::spec::GET_GOAL_TOOL_NAME; @@ -28,6 +29,7 @@ pub(crate) struct GoalToolExecutor { kind: GoalToolKind, thread_id: ThreadId, backend: Arc, + event_emitter: GoalEventEmitter, } #[derive(Clone, Copy)] @@ -65,27 +67,42 @@ enum CompletionBudgetReport { } impl GoalToolExecutor { - pub(crate) fn get(thread_id: ThreadId, backend: Arc) -> Self { + pub(crate) fn get( + thread_id: ThreadId, + backend: Arc, + event_emitter: GoalEventEmitter, + ) -> Self { Self { kind: GoalToolKind::Get, thread_id, backend, + event_emitter, } } - pub(crate) fn create(thread_id: ThreadId, backend: Arc) -> Self { + pub(crate) fn create( + thread_id: ThreadId, + backend: Arc, + event_emitter: GoalEventEmitter, + ) -> Self { Self { kind: GoalToolKind::Create, thread_id, backend, + event_emitter, } } - pub(crate) fn update(thread_id: ThreadId, backend: Arc) -> Self { + pub(crate) fn update( + thread_id: ThreadId, + backend: Arc, + event_emitter: GoalEventEmitter, + ) -> Self { Self { kind: GoalToolKind::Update, thread_id, backend, + event_emitter, } } } @@ -146,6 +163,7 @@ impl GoalToolExecutor { .create_goal(self.thread_id, request) .await .map_err(FunctionCallError::RespondToModel)?; + self.emit_goal_updated_from_tool_call(&invocation, goal.clone()); goal_response(Some(goal), CompletionBudgetReport::Omit) } @@ -168,8 +186,20 @@ impl GoalToolExecutor { .complete_goal(self.thread_id) .await .map_err(FunctionCallError::RespondToModel)?; + self.emit_goal_updated_from_tool_call(&invocation, goal.clone()); goal_response(Some(goal), CompletionBudgetReport::Include) } + + fn emit_goal_updated_from_tool_call(&self, invocation: &ToolCall, goal: ThreadGoal) { + // TODO: ToolCall should expose the current turn submission id so goal + // tool events can set ThreadGoalUpdatedEvent.turn_id exactly as core + // does today. Until then, correlate the event with the tool call id. + self.event_emitter.thread_goal_updated( + invocation.call_id.clone(), + /*turn_id*/ None, + goal, + ); + } } fn parse_arguments(arguments: &str) -> Result