diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 1f2f289b05..b4348f53bd 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -49,7 +49,6 @@ use codex_app_server_protocol::RawResponseItemCompletedNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; -use codex_app_server_protocol::SkillsChangedNotification; use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadRealtimeClosedNotification; @@ -194,13 +193,6 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; } - EventMsg::SkillsUpdateAvailable => { - outgoing - .send_server_notification(ServerNotification::SkillsChanged( - SkillsChangedNotification {}, - )) - .await; - } EventMsg::McpStartupUpdate(update) => { let (status, error) = match update.status { codex_protocol::protocol::McpStartupStatus::Starting => { diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index dea4a20e5a..6999948b04 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -94,6 +94,7 @@ mod outgoing_message; mod request_processors; mod request_serialization; mod server_request_error; +mod skills_watcher; mod thread_state; mod thread_status; mod transport; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 3853244958..c8204e05b6 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -36,6 +36,7 @@ use crate::request_processors::WindowsSandboxRequestProcessor; use crate::request_serialization::QueuedInitializedRequest; use crate::request_serialization::RequestSerializationQueueKey; use crate::request_serialization::RequestSerializationQueues; +use crate::skills_watcher::SkillsWatcher; use crate::thread_state::ConnectionCapabilities; use crate::thread_state::ThreadStateManager; use crate::transport::AppServerTransport; @@ -314,6 +315,7 @@ impl MessageProcessor { thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); + let skills_watcher = SkillsWatcher::new(thread_manager.skills_manager(), outgoing.clone()); let pending_thread_unloads = Arc::new(Mutex::new(HashSet::new())); let thread_watch_manager = @@ -405,6 +407,7 @@ impl MessageProcessor { Arc::clone(&thread_list_state_permit), thread_goal_processor.clone(), state_db, + Arc::clone(&skills_watcher), ); let turn_processor = TurnRequestProcessor::new( auth_manager.clone(), @@ -418,6 +421,7 @@ impl MessageProcessor { thread_state_manager, thread_watch_manager, thread_list_state_permit, + Arc::clone(&skills_watcher), ); if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) { // Keep plugin startup warmups aligned at app-server startup. diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 284d0fa1ff..9de844c6cd 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -11,6 +11,7 @@ use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::RequestContext; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; +use crate::skills_watcher::SkillsWatcher; use crate::thread_status::ThreadWatchManager; use crate::thread_status::resolve_thread_status; use chrono::Duration as ChronoDuration; diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index ef44a2b178..45031490b0 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -12,6 +12,7 @@ pub(super) struct ListenerTaskContext { pub(super) thread_list_state_permit: Arc, pub(super) fallback_model_provider: String, pub(super) codex_home: PathBuf, + pub(super) skills_watcher: Arc, } struct UnloadingState { @@ -226,12 +227,22 @@ pub(super) async fn ensure_listener_task_running( "thread {conversation_id} is closing; retry after the thread is closed" ))); }; + let config = conversation.config().await; + let environments = conversation.environment_selections().await; + let watch_registration = listener_task_context + .skills_watcher + .register_thread_config( + config.as_ref(), + listener_task_context.thread_manager.as_ref(), + &environments, + ) + .await; let (mut listener_command_rx, listener_generation) = { let mut thread_state = thread_state.lock().await; if thread_state.listener_matches(&conversation) { return Ok(()); } - thread_state.set_listener(cancel_tx, &conversation) + thread_state.set_listener(cancel_tx, &conversation, watch_registration) }; let ListenerTaskContext { outgoing, @@ -242,6 +253,7 @@ pub(super) async fn ensure_listener_task_running( thread_list_state_permit, fallback_model_provider, codex_home, + .. } = listener_task_context; let outgoing_for_task = Arc::clone(&outgoing); tokio::spawn(async move { diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 015ae5daf6..132a03afde 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -317,6 +317,7 @@ pub(crate) struct ThreadRequestProcessor { pub(super) thread_goal_processor: ThreadGoalRequestProcessor, pub(super) state_db: Option, pub(super) background_tasks: TaskTracker, + pub(super) skills_watcher: Arc, } impl ThreadRequestProcessor { @@ -335,6 +336,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: Arc, thread_goal_processor: ThreadGoalRequestProcessor, state_db: Option, + skills_watcher: Arc, ) -> Self { Self { auth_manager, @@ -351,6 +353,7 @@ impl ThreadRequestProcessor { thread_goal_processor, state_db, background_tasks: TaskTracker::new(), + skills_watcher, } } @@ -752,6 +755,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), + skills_watcher: Arc::clone(&self.skills_watcher), } } @@ -849,6 +853,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), + skills_watcher: Arc::clone(&self.skills_watcher), }; let request_trace = request_context.request_trace(); let config_manager = self.config_manager.clone(); @@ -1049,7 +1054,6 @@ impl ThreadRequestProcessor { .collect() }; let core_dynamic_tool_count = core_dynamic_tools.len(); - let NewThread { thread_id, thread, diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index bdc5847b0d..d1dae4ef46 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -13,6 +13,7 @@ pub(crate) struct TurnRequestProcessor { thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, + skills_watcher: Arc, } impl TurnRequestProcessor { @@ -29,6 +30,7 @@ impl TurnRequestProcessor { thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, + skills_watcher: Arc, ) -> Self { Self { auth_manager, @@ -42,6 +44,7 @@ impl TurnRequestProcessor { thread_state_manager, thread_watch_manager, thread_list_state_permit, + skills_watcher, } } @@ -1087,6 +1090,7 @@ impl TurnRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), + skills_watcher: Arc::clone(&self.skills_watcher), } } diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs new file mode 100644 index 0000000000..33acf65335 --- /dev/null +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; +use std::time::Duration; + +use crate::outgoing_message::OutgoingMessageSender; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::SkillsChangedNotification; +use codex_core::ThreadManager; +use codex_core::config::Config; +use codex_core::file_watcher::FileWatcher; +use codex_core::file_watcher::FileWatcherSubscriber; +use codex_core::file_watcher::Receiver; +use codex_core::file_watcher::ThrottledWatchReceiver; +use codex_core::file_watcher::WatchPath; +use codex_core::file_watcher::WatchRegistration; +use codex_core::skills::SkillsLoadInput; +use codex_core::skills::SkillsManager; +use codex_protocol::protocol::TurnEnvironmentSelection; +use tracing::warn; + +#[cfg(not(test))] +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10); +#[cfg(test)] +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); + +pub(crate) struct SkillsWatcher { + subscriber: FileWatcherSubscriber, +} + +impl SkillsWatcher { + pub(crate) fn new( + skills_manager: Arc, + outgoing: Arc, + ) -> Arc { + let file_watcher = match FileWatcher::new() { + Ok(file_watcher) => Arc::new(file_watcher), + Err(err) => { + warn!("failed to initialize skills file watcher: {err}"); + Arc::new(FileWatcher::noop()) + } + }; + let (subscriber, rx) = file_watcher.add_subscriber(); + Self::spawn_event_loop(rx, skills_manager, outgoing); + Arc::new(Self { subscriber }) + } + + pub(crate) async fn register_thread_config( + &self, + config: &Config, + thread_manager: &ThreadManager, + environments: &[TurnEnvironmentSelection], + ) -> WatchRegistration { + let Some(environment_selection) = environments.first() else { + return WatchRegistration::default(); + }; + let Some(environment) = thread_manager + .environment_manager() + .get_environment(&environment_selection.environment_id) + else { + warn!( + "failed to register skills watcher for unknown environment `{}`", + environment_selection.environment_id + ); + return WatchRegistration::default(); + }; + if environment.is_remote() { + return WatchRegistration::default(); + } + + let plugins_input = config.plugins_config_input(); + let plugins_manager = thread_manager.plugins_manager(); + let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await; + let skills_input = SkillsLoadInput::new( + config.cwd.clone(), + plugin_outcome.effective_plugin_skill_roots(), + config.config_layer_stack.clone(), + config.bundled_skills_enabled(), + ); + let roots = thread_manager + .skills_manager() + .skill_roots_for_config(&skills_input, Some(environment.get_filesystem())) + .await + .into_iter() + .map(|root| WatchPath { + path: root.path.into_path_buf(), + recursive: true, + }) + .collect(); + self.subscriber.register_paths(roots) + } + + fn spawn_event_loop( + rx: Receiver, + skills_manager: Arc, + outgoing: Arc, + ) { + let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); + let Ok(handle) = tokio::runtime::Handle::try_current() else { + warn!("skills watcher listener skipped: no Tokio runtime available"); + return; + }; + handle.spawn(async move { + while rx.recv().await.is_some() { + skills_manager.clear_cache(); + outgoing + .send_server_notification(ServerNotification::SkillsChanged( + SkillsChangedNotification {}, + )) + .await; + } + }); + } +} diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 82871fca8b..f31257e8cf 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -7,6 +7,7 @@ use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; use codex_core::CodexThread; use codex_core::ThreadConfigSnapshot; +use codex_core::file_watcher::WatchRegistration; use codex_protocol::ThreadId; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -77,6 +78,7 @@ pub(crate) struct ThreadState { listener_command_tx: Option>, current_turn_history: ThreadHistoryBuilder, listener_thread: Option>, + watch_registration: WatchRegistration, } impl ThreadState { @@ -91,6 +93,7 @@ impl ThreadState { &mut self, cancel_tx: oneshot::Sender<()>, conversation: &Arc, + watch_registration: WatchRegistration, ) -> (mpsc::UnboundedReceiver, u64) { if let Some(previous) = self.cancel_tx.replace(cancel_tx) { let _ = previous.send(()); @@ -99,6 +102,7 @@ impl ThreadState { let (listener_command_tx, listener_command_rx) = mpsc::unbounded_channel(); self.listener_command_tx = Some(listener_command_tx); self.listener_thread = Some(Arc::downgrade(conversation)); + self.watch_registration = watch_registration; (listener_command_rx, self.listener_generation) } @@ -109,6 +113,7 @@ impl ThreadState { self.listener_command_tx = None; self.current_turn_history.reset(); self.listener_thread = None; + self.watch_registration = WatchRegistration::default(); } pub(crate) fn set_experimental_raw_events(&mut self, enabled: bool) { diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index 39dae06bd0..416b2515ad 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -4,8 +4,10 @@ use anyhow::Context; use anyhow::Result; use app_test_support::ChatGptAuthFixture; use app_test_support::McpProcess; +use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; use app_test_support::write_chatgpt_auth; +use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::PluginListParams; use codex_app_server_protocol::PluginListResponse; @@ -573,11 +575,39 @@ async fn skills_list_uses_cached_result_until_force_reload() -> Result<()> { #[tokio::test] async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + )?; write_skill(&codex_home, "demo")?; - let mut mcp = McpProcess::new(codex_home.path()).await?; + let mut mcp = + McpProcess::new_with_env(codex_home.path(), &[(CODEX_EXEC_SERVER_URL_ENV_VAR, None)]) + .await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + let initial_skills_request_id = mcp + .send_skills_list_request(SkillsListParams { + cwds: vec![codex_home.path().to_path_buf()], + force_reload: true, + }) + .await?; + let initial_skills_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(initial_skills_request_id)), + ) + .await??; + let SkillsListResponse { data } = to_response(initial_skills_response)?; + assert_eq!(data.len(), 1); + assert!( + data[0] + .skills + .iter() + .any(|skill| { skill.name == "demo" && skill.description == "demo description" }) + ); + let thread_start_request_id = mcp .send_thread_start_request(ThreadStartParams { model: None, @@ -630,5 +660,24 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( let notification: SkillsChangedNotification = serde_json::from_value(params)?; assert_eq!(notification, SkillsChangedNotification {}); + let updated_skills_request_id = mcp + .send_skills_list_request(SkillsListParams { + cwds: vec![codex_home.path().to_path_buf()], + force_reload: false, + }) + .await?; + let updated_skills_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(updated_skills_request_id)), + ) + .await??; + let SkillsListResponse { data } = to_response(updated_skills_response)?; + assert_eq!(data.len(), 1); + assert!( + data[0] + .skills + .iter() + .any(|skill| skill.name == "demo" && skill.description == "updated") + ); Ok(()) } diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 2503764904..793162cb2d 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -83,7 +83,6 @@ pub(crate) async fn run_codex_thread_interactive( skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), - skills_watcher: Arc::clone(&parent_session.services.skills_watcher), conversation_history: initial_history.unwrap_or(InitialHistory::New), session_source: SessionSource::SubAgent(subagent_source.clone()), thread_source: Some(ThreadSource::Subagent), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 9fe235640a..a0dd95c37b 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -1,6 +1,5 @@ use crate::agent::AgentStatus; use crate::config::ConstraintResult; -use crate::file_watcher::WatchRegistration; use crate::goals::ExternalGoalSet; use crate::goals::GoalRuntimeEvent; use crate::session::Codex; @@ -31,6 +30,7 @@ use codex_protocol::protocol::Submission; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsageInfo; +use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; use codex_thread_store::StoredThread; @@ -101,7 +101,6 @@ pub struct CodexThread { session_configured: SessionConfiguredEvent, rollout_path: Option, out_of_band_elicitation_count: Mutex, - _watch_registration: WatchRegistration, } /// Conduit for the bidirectional stream of messages that compose a thread @@ -112,7 +111,6 @@ impl CodexThread { session_configured: SessionConfiguredEvent, rollout_path: Option, session_source: SessionSource, - watch_registration: WatchRegistration, ) -> Self { Self { codex, @@ -120,7 +118,6 @@ impl CodexThread { session_configured, rollout_path, out_of_band_elicitation_count: Mutex::new(0), - _watch_registration: watch_registration, } } @@ -471,6 +468,10 @@ impl CodexThread { self.codex.session.refresh_runtime_config(next_config).await; } + pub async fn environment_selections(&self) -> Vec { + self.codex.thread_environment_selections().await + } + pub async fn read_mcp_resource( &self, server: &str, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 57ac9b4a59..41205c14db 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -100,7 +100,6 @@ pub(crate) use skills::manager; pub(crate) use skills::maybe_emit_implicit_skill_invocation; pub(crate) use skills::resolve_skill_dependencies_for_turn; pub(crate) use skills::skills_load_input_from_config; -mod skills_watcher; mod stream_events_utils; pub mod test_support; mod unified_exec; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index c6acf16f67..7e4c8d7073 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -113,6 +113,7 @@ use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::TurnContextNetworkItem; +use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; @@ -283,8 +284,6 @@ use crate::rollout::map_session_init_error; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::shell; use crate::shell_snapshot::ShellSnapshot; -use crate::skills_watcher::SkillsWatcher; -use crate::skills_watcher::SkillsWatcherEvent; use crate::state::ActiveTurn; use crate::state::MailboxDeliveryPhase; use crate::state::PendingRequestPermissions; @@ -393,7 +392,6 @@ pub(crate) struct CodexSpawnArgs { pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, - pub(crate) skills_watcher: Arc, pub(crate) conversation_history: InitialHistory, pub(crate) session_source: SessionSource, pub(crate) thread_source: Option, @@ -457,7 +455,6 @@ impl Codex { skills_manager, plugins_manager, mcp_manager, - skills_watcher, conversation_history, session_source, thread_source, @@ -653,7 +650,6 @@ impl Codex { skills_manager, plugins_manager, mcp_manager.clone(), - skills_watcher, agent_control, environment_manager, analytics_events_client, @@ -788,6 +784,11 @@ impl Codex { state.session_configuration.thread_config_snapshot() } + pub(crate) async fn thread_environment_selections(&self) -> Vec { + let state = self.session.state.lock().await; + state.session_configuration.environments.clone() + } + pub(crate) fn state_db(&self) -> Option { self.session.state_db() } @@ -1021,29 +1022,6 @@ impl Session { self.out_of_band_elicitation_paused.send_replace(paused); } - fn start_skills_watcher_listener(self: &Arc) { - let mut rx = self.services.skills_watcher.subscribe(); - let weak_sess = Arc::downgrade(self); - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { - let Some(sess) = weak_sess.upgrade() else { - break; - }; - let event = Event { - id: sess.next_internal_sub_id(), - msg: EventMsg::SkillsUpdateAvailable, - }; - sess.send_event_raw(event).await; - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - } - } - }); - } - pub(crate) fn get_tx_event(&self) -> Sender { self.tx_event.clone() } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 1a790314d5..23d22f94a5 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -364,7 +364,6 @@ impl Session { skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, - skills_watcher: Arc, agent_control: AgentControl, environment_manager: Arc, analytics_events_client: Option, @@ -846,7 +845,6 @@ impl Session { skills_manager, plugins_manager: Arc::clone(&plugins_manager), mcp_manager: Arc::clone(&mcp_manager), - skills_watcher, agent_control, network_proxy, network_approval: Arc::clone(&network_approval), @@ -935,8 +933,6 @@ impl Session { sess.send_event_raw(event).await; } - // Start the watcher after SessionConfigured so it cannot emit earlier events. - sess.start_skills_watcher_listener(); let mut required_mcp_servers: Vec = mcp_servers .iter() .filter(|(_, server)| server.enabled() && server.required()) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 5090fc3f07..0cb433c3e5 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3725,7 +3725,6 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { skills_manager, plugins_manager, mcp_manager, - Arc::new(SkillsWatcher::noop()), AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -3837,7 +3836,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { .expect("create environment"), ); - let skills_watcher = Arc::new(SkillsWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( &config.permissions.approval_policy, @@ -3873,7 +3871,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { skills_manager, plugins_manager, mcp_manager, - skills_watcher, agent_control, network_proxy: None, network_approval: Arc::clone(&network_approval), @@ -4064,7 +4061,6 @@ async fn make_session_with_config_and_rx( skills_manager, plugins_manager, mcp_manager, - Arc::new(SkillsWatcher::noop()), AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -4167,7 +4163,6 @@ async fn make_session_with_history_source_and_agent_control_and_rx( skills_manager, plugins_manager, mcp_manager, - Arc::new(SkillsWatcher::noop()), agent_control, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -5556,7 +5551,6 @@ where .expect("create environment"), ); - let skills_watcher = Arc::new(SkillsWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( &config.permissions.approval_policy, @@ -5592,7 +5586,6 @@ where skills_manager, plugins_manager, mcp_manager, - skills_watcher, agent_control, network_proxy: None, network_approval: Arc::clone(&network_approval), diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 37a5e6bb09..8d4bfd9cbe 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -728,7 +728,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { /*bundled_skills_enabled*/ true, )); let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager))); - let skills_watcher = Arc::new(SkillsWatcher::noop()); let thread_store = Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(&config), /*state_db*/ None, @@ -743,7 +742,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { skills_manager, plugins_manager, mcp_manager, - skills_watcher, conversation_history: InitialHistory::New, session_source: SessionSource::SubAgent(SubAgentSource::Other( GUARDIAN_REVIEWER_NAME.to_string(), diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 1723904cff..bf552a79fd 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1505,7 +1505,6 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { | EventMsg::StreamError(_) | EventMsg::TurnDiff(_) | EventMsg::RealtimeConversationListVoicesResponse(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) | EventMsg::ShutdownComplete diff --git a/codex-rs/core/src/skills_watcher.rs b/codex-rs/core/src/skills_watcher.rs deleted file mode 100644 index fb271ca876..0000000000 --- a/codex-rs/core/src/skills_watcher.rs +++ /dev/null @@ -1,125 +0,0 @@ -//! Skills-specific watcher built on top of the generic [`FileWatcher`]. - -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; - -use tokio::runtime::Handle; -use tokio::sync::broadcast; -use tracing::warn; - -use crate::SkillsManager; -use crate::config::Config; -use crate::file_watcher::FileWatcher; -use crate::file_watcher::FileWatcherSubscriber; -use crate::file_watcher::Receiver; -use crate::file_watcher::ThrottledWatchReceiver; -use crate::file_watcher::WatchPath; -use crate::file_watcher::WatchRegistration; -use crate::skills_load_input_from_config; -use codex_core_plugins::PluginsManager; - -#[cfg(not(test))] -const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10); -#[cfg(test)] -const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum SkillsWatcherEvent { - SkillsChanged { paths: Vec }, -} - -pub(crate) struct SkillsWatcher { - subscriber: FileWatcherSubscriber, - tx: broadcast::Sender, -} - -impl SkillsWatcher { - pub(crate) fn new(file_watcher: &Arc) -> Self { - let (subscriber, rx) = file_watcher.add_subscriber(); - let (tx, _) = broadcast::channel(128); - let skills_watcher = Self { - subscriber, - tx: tx.clone(), - }; - Self::spawn_event_loop(rx, tx); - skills_watcher - } - - pub(crate) fn noop() -> Self { - Self::new(&Arc::new(FileWatcher::noop())) - } - - pub(crate) fn subscribe(&self) -> broadcast::Receiver { - self.tx.subscribe() - } - - pub(crate) async fn register_config( - &self, - config: &Config, - skills_manager: &SkillsManager, - plugins_manager: &PluginsManager, - fs: Option>, - ) -> WatchRegistration { - let plugins_input = config.plugins_config_input(); - let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await; - let effective_skill_roots = plugin_outcome.effective_plugin_skill_roots(); - let skills_input = skills_load_input_from_config(config, effective_skill_roots); - let roots = skills_manager - .skill_roots_for_config(&skills_input, fs) - .await - .into_iter() - .map(|root| WatchPath { - path: root.path.into_path_buf(), - recursive: true, - }) - .collect(); - self.subscriber.register_paths(roots) - } - - fn spawn_event_loop(rx: Receiver, tx: broadcast::Sender) { - let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - while let Some(event) = rx.recv().await { - let _ = tx.send(SkillsWatcherEvent::SkillsChanged { paths: event.paths }); - } - }); - } else { - warn!("skills watcher listener skipped: no Tokio runtime available"); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - use tokio::time::Duration; - use tokio::time::timeout; - - #[tokio::test] - async fn forwards_file_watcher_events() { - let file_watcher = Arc::new(FileWatcher::noop()); - let skills_watcher = SkillsWatcher::new(&file_watcher); - let mut rx = skills_watcher.subscribe(); - let _registration = skills_watcher - .subscriber - .register_path(PathBuf::from("/tmp/skill"), /*recursive*/ true); - - file_watcher - .send_paths_for_test(vec![PathBuf::from("/tmp/skill/SKILL.md")]) - .await; - - let event = timeout(Duration::from_secs(2), rx.recv()) - .await - .expect("skills watcher event") - .expect("broadcast recv"); - assert_eq!( - event, - SkillsWatcherEvent::SkillsChanged { - paths: vec![PathBuf::from("/tmp/skill/SKILL.md")], - } - ); - } -} diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 0dba931296..7e13eddb76 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -10,7 +10,6 @@ use crate::exec_policy::ExecPolicyManager; use crate::guardian::GuardianRejection; use crate::guardian::GuardianRejectionCircuitBreaker; use crate::mcp::McpManager; -use crate::skills_watcher::SkillsWatcher; use crate::tools::code_mode::CodeModeService; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::sandboxing::ApprovalStore; @@ -60,7 +59,6 @@ pub(crate) struct SessionServices { pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, - pub(crate) skills_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) network_proxy: Option, pub(crate) network_approval: Arc, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index de4114189b..bfd91f68f8 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -6,7 +6,6 @@ use crate::config::Config; use crate::config::ThreadStoreConfig; use crate::environment_selection::default_thread_environment_selections; use crate::environment_selection::resolve_environment_selections; -use crate::file_watcher::FileWatcher; use crate::mcp::McpManager; use crate::rollout::truncation; use crate::session::Codex; @@ -14,8 +13,6 @@ use crate::session::CodexSpawnArgs; use crate::session::CodexSpawnOk; use crate::session::INITIAL_SUBMIT_ID; use crate::shell_snapshot::ShellSnapshot; -use crate::skills_watcher::SkillsWatcher; -use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::InterruptedTurnHistoryMarker; use crate::tasks::interrupted_turn_history_marker; use codex_analytics::AnalyticsEventsClient; @@ -71,8 +68,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio::runtime::Handle; -use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; use tokio::sync::broadcast; use tracing::warn; @@ -106,47 +101,6 @@ impl Drop for TempCodexHomeGuard { } } -fn build_skills_watcher(skills_manager: Arc) -> Arc { - if should_use_test_thread_manager_behavior() - && let Ok(handle) = Handle::try_current() - && handle.runtime_flavor() == RuntimeFlavor::CurrentThread - { - // The real watcher spins background tasks that can starve the - // current-thread test runtime and cause event waits to time out. - warn!("using noop skills watcher under current-thread test runtime"); - return Arc::new(SkillsWatcher::noop()); - } - - let file_watcher = match FileWatcher::new() { - Ok(file_watcher) => Arc::new(file_watcher), - Err(err) => { - warn!("failed to initialize file watcher: {err}"); - Arc::new(FileWatcher::noop()) - } - }; - let skills_watcher = Arc::new(SkillsWatcher::new(&file_watcher)); - - let mut rx = skills_watcher.subscribe(); - let skills_manager = Arc::clone(&skills_manager); - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - loop { - match rx.recv().await { - Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { - skills_manager.clear_cache(); - } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_)) => continue, - } - } - }); - } else { - warn!("skills watcher listener skipped: no Tokio runtime available"); - } - - skills_watcher -} - /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewThread { @@ -247,7 +201,6 @@ pub(crate) struct ThreadManagerState { skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, - skills_watcher: Arc, thread_store: Arc, attestation_provider: Option>, session_source: SessionSource, @@ -308,7 +261,6 @@ impl ThreadManager { config.bundled_skills_enabled(), restriction_product, )); - let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), @@ -318,7 +270,6 @@ impl ThreadManager { skills_manager, plugins_manager, mcp_manager, - skills_watcher, thread_store, attestation_provider, auth_manager, @@ -399,7 +350,6 @@ impl ThreadManager { /*bundled_skills_enabled*/ true, restriction_product, )); - let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager)); // This test constructor has no Config input. Tests that need a non-local // process store should construct ThreadManager::new with an explicit store. let thread_store: Arc = Arc::new(LocalThreadStore::new( @@ -420,7 +370,6 @@ impl ThreadManager { skills_manager, plugins_manager, mcp_manager, - skills_watcher, thread_store, attestation_provider: None, auth_manager, @@ -1165,19 +1114,6 @@ impl ThreadManagerState { } let environment_selections = resolve_environment_selections(self.environment_manager.as_ref(), &environments)?; - let watch_registration = match environment_selections.primary() { - Some(turn_environment) if !turn_environment.environment.is_remote() => { - self.skills_watcher - .register_config( - &config, - self.skills_manager.as_ref(), - self.plugins_manager.as_ref(), - Some(turn_environment.environment.get_filesystem()), - ) - .await - } - Some(_) | None => crate::file_watcher::WatchRegistration::default(), - }; let parent_rollout_thread_trace = self .parent_rollout_thread_trace_for_source(&session_source, &initial_history) .await; @@ -1193,7 +1129,6 @@ impl ThreadManagerState { skills_manager: Arc::clone(&self.skills_manager), plugins_manager: Arc::clone(&self.plugins_manager), mcp_manager: Arc::clone(&self.mcp_manager), - skills_watcher: Arc::clone(&self.skills_watcher), conversation_history: initial_history, session_source, thread_source, @@ -1213,7 +1148,7 @@ impl ThreadManagerState { }) .await?; let new_thread = self - .finalize_thread_spawn(codex, thread_id, tracked_session_source, watch_registration) + .finalize_thread_spawn(codex, thread_id, tracked_session_source) .await?; if is_resumed_thread && let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await @@ -1228,7 +1163,6 @@ impl ThreadManagerState { codex: Codex, thread_id: ThreadId, session_source: SessionSource, - watch_registration: crate::file_watcher::WatchRegistration, ) -> CodexResult { let event = codex.next_event().await?; let session_configured = match event { @@ -1249,7 +1183,6 @@ impl ThreadManagerState { session_configured.clone(), session_configured.rollout_path.clone(), session_source, - watch_registration, )); e.insert(thread.clone()); return Ok(NewThread { diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs deleted file mode 100644 index c422073e4f..0000000000 --- a/codex-rs/core/tests/suite/live_reload.rs +++ /dev/null @@ -1,157 +0,0 @@ -#![allow(clippy::expect_used, clippy::unwrap_used)] - -use std::fs; -use std::path::Path; -use std::path::PathBuf; -use std::time::Duration; - -use anyhow::Result; -use codex_config::config_toml::ProjectConfig; -use codex_protocol::config_types::TrustLevel; -use codex_protocol::models::PermissionProfile; -use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::Op; -use codex_protocol::user_input::UserInput; -use core_test_support::responses; -use core_test_support::responses::ResponsesRequest; -use core_test_support::responses::mount_sse_sequence; -use core_test_support::responses::start_mock_server; -use core_test_support::test_codex::TestCodex; -use core_test_support::test_codex::test_codex; -use core_test_support::test_codex::turn_permission_fields; -use core_test_support::wait_for_event; -use tokio::time::timeout; - -fn enable_trusted_project(config: &mut codex_core::config::Config) { - config.active_project = ProjectConfig { - trust_level: Some(TrustLevel::Trusted), - }; -} - -fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf { - let skill_dir = home.join("skills").join(name); - fs::create_dir_all(&skill_dir).expect("create skill dir"); - let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n"); - let path = skill_dir.join("SKILL.md"); - fs::write(&path, contents).expect("write skill"); - path -} - -fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool { - request - .message_input_texts("user") - .iter() - .any(|text| text.contains(skill_body) && text.contains("")) -} - -async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> { - let session_model = test.session_configured.model.clone(); - let (sandbox_policy, permission_profile) = - turn_permission_fields(PermissionProfile::Disabled, test.cwd_path()); - test.codex - .submit(Op::UserTurn { - environments: None, - items: vec![ - UserInput::Text { - text: prompt.to_string(), - text_elements: Vec::new(), - }, - UserInput::Skill { - name: "demo".to_string(), - path: skill_path, - }, - ], - final_output_json_schema: None, - cwd: test.cwd_path().to_path_buf(), - approval_policy: AskForApproval::Never, - approvals_reviewer: None, - sandbox_policy, - permission_profile, - model: session_model, - effort: None, - summary: None, - service_tier: None, - collaboration_mode: None, - personality: None, - }) - .await?; - - wait_for_event(test.codex.as_ref(), |event| { - matches!(event, EventMsg::TurnComplete(_)) - }) - .await; - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> { - let server = start_mock_server().await; - let responses = mount_sse_sequence( - &server, - vec![ - responses::sse(vec![responses::ev_completed("resp-1")]), - responses::sse(vec![responses::ev_completed("resp-2")]), - ], - ) - .await; - - let skill_v1 = "skill body v1"; - let skill_v2 = "skill body v2"; - let mut builder = test_codex() - .with_pre_build_hook(move |home| { - write_skill(home, "demo", "demo skill", skill_v1); - }) - .with_config(|config| { - enable_trusted_project(config); - }); - let test = builder.build(&server).await?; - - let skill_path = dunce::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?; - - submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?; - let first_request = responses - .requests() - .first() - .cloned() - .expect("first request captured"); - assert!( - contains_skill_body(&first_request, skill_v1), - "expected initial skill body in request" - ); - - write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); - - let saw_skills_update = timeout(Duration::from_secs(5), async { - loop { - match test.codex.next_event().await { - Ok(event) => { - if matches!(event.msg, EventMsg::SkillsUpdateAvailable) { - break; - } - } - Err(err) => panic!("event stream ended unexpectedly: {err}"), - } - } - }) - .await; - - if saw_skills_update.is_err() { - // Some environments do not reliably surface file watcher events for - // skill changes. Clear the cache explicitly so we can still validate - // that the updated skill body is injected on the next turn. - test.thread_manager.skills_manager().clear_cache(); - } - - submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?; - let last_request = responses - .last_request() - .expect("request captured after skill update"); - - assert!( - contains_skill_body(&last_request, skill_v2), - "expected updated skill body after reload" - ); - - Ok(()) -} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 0c91654c83..843b558ebf 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -57,7 +57,6 @@ mod image_rollout; mod items; mod json_result; mod live_cli; -mod live_reload; mod model_overrides; mod model_switching; mod model_visible_layout; diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index ff273a77c4..31d30d824e 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -754,6 +754,9 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res config.experimental_realtime_ws_model = Some("realtime-test-model".to_string()); config.experimental_realtime_ws_startup_context = Some(String::new()); config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string()); + // Keep the failure-path test inside wait_for_event's timeout on Windows, + // where refused localhost websocket connects can take around two seconds. + config.model_provider.request_max_retries = Some(0); config.realtime.version = RealtimeWsVersion::V1; }); let test = builder.build(&server).await?; diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 4e2d5c08e5..9e3ac1bae9 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -364,7 +364,6 @@ async fn run_codex_tool_session_inner( | EventMsg::AgentMessageContentDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::ExitedReviewMode(_) | EventMsg::RequestUserInput(_) | EventMsg::RequestPermissions(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 30e33abe43..91fd02d858 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1401,9 +1401,6 @@ pub enum EventMsg { /// List of voices supported by realtime conversation streams. RealtimeConversationListVoicesResponse(RealtimeConversationListVoicesResponseEvent), - /// Notification that skill data may have been updated and clients may want to reload. - SkillsUpdateAvailable, - PlanUpdate(UpdatePlanArgs), TurnAborted(TurnAbortedEvent), diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs index 3d52798b8d..1e49c82be2 100644 --- a/codex-rs/rollout-trace/src/protocol_event.rs +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -260,7 +260,6 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option Option<&'static s | EventMsg::PatchApplyEnd(_) | EventMsg::TurnDiff(_) | EventMsg::RealtimeConversationListVoicesResponse(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::PlanUpdate(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 558c3fef98..21b98b4e8d 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -169,7 +169,6 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::ImageGenerationBegin(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::CollabAgentSpawnBegin(_) | EventMsg::CollabAgentInteractionBegin(_) | EventMsg::CollabWaitingBegin(_)