Compare commits

...

4 Commits

Author SHA1 Message Date
Yaroslav Volovich
42b85a431f codex: fix rebase fallout on green base 2026-02-26 11:24:44 +00:00
Yaroslav Volovich
70de95b7dc codex: address PR review findings 2026-02-26 11:24:06 +00:00
Yaroslav Volovich
21063c5c18 codex: fix CI failure on PR #12803 2026-02-26 11:24:06 +00:00
Yaroslav Volovich
1eac244927 Add live skill refresh notifications 2026-02-26 11:24:05 +00:00
14 changed files with 353 additions and 53 deletions

View File

@@ -1449,6 +1449,10 @@
}
]
},
"SkillsUpdatedNotification": {
"description": "Notification emitted when the server's local skill view changes.\n\nThe payload is intentionally empty because clients should treat this as a refresh hint and call `skills/list` again, typically with `forceReload: true`, instead of trying to apply an incremental update.",
"type": "object"
},
"SubAgentSource": {
"oneOf": [
{
@@ -3522,6 +3526,26 @@
"title": "App/list/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"skills/updated"
],
"title": "Skills/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/SkillsUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Skills/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -6294,6 +6294,26 @@
"title": "App/list/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"skills/updated"
],
"title": "Skills/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/SkillsUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Skills/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -12207,6 +12227,12 @@
"title": "SkillsRemoteWriteResponse",
"type": "object"
},
"SkillsUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Notification emitted when the server's local skill view changes.\n\nThe payload is intentionally empty because clients should treat this as a refresh hint and call `skills/list` again, typically with `forceReload: true`, instead of trying to apply an incremental update.",
"title": "SkillsUpdatedNotification",
"type": "object"
},
"SubAgentSource": {
"oneOf": [
{

View File

@@ -0,0 +1,6 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Notification emitted when the server's local skill view changes.\n\nThe payload is intentionally empty because clients should treat this as a refresh hint and call `skills/list` again, typically with `forceReload: true`, instead of trying to apply an incremental update.",
"title": "SkillsUpdatedNotification",
"type": "object"
}

View File

@@ -27,6 +27,7 @@ import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemC
import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification";
import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification";
import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification";
import type { SkillsUpdatedNotification } from "./v2/SkillsUpdatedNotification";
import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification";
import type { ThreadArchivedNotification } from "./v2/ThreadArchivedNotification";
import type { ThreadClosedNotification } from "./v2/ThreadClosedNotification";
@@ -50,4 +51,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "skills/updated", "params": SkillsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };

View File

@@ -0,0 +1,12 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* Notification emitted when the server's local skill view changes.
*
* The payload is intentionally empty because clients should treat this as a
* refresh hint and call `skills/list` again, typically with `forceReload:
* true`, instead of trying to apply an incremental update.
*/
export type SkillsUpdatedNotification = Record<string, never>;

View File

@@ -157,6 +157,7 @@ export type { SkillsRemoteReadParams } from "./SkillsRemoteReadParams";
export type { SkillsRemoteReadResponse } from "./SkillsRemoteReadResponse";
export type { SkillsRemoteWriteParams } from "./SkillsRemoteWriteParams";
export type { SkillsRemoteWriteResponse } from "./SkillsRemoteWriteResponse";
export type { SkillsUpdatedNotification } from "./SkillsUpdatedNotification";
export type { TerminalInteractionNotification } from "./TerminalInteractionNotification";
export type { TextElement } from "./TextElement";
export type { TextPosition } from "./TextPosition";

View File

@@ -843,6 +843,7 @@ server_notification_definitions! {
AccountUpdated => "account/updated" (v2::AccountUpdatedNotification),
AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification),
AppListUpdated => "app/list/updated" (v2::AppListUpdatedNotification),
SkillsUpdated => "skills/updated" (v2::SkillsUpdatedNotification),
ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification),
ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification),
ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification),
@@ -1468,6 +1469,20 @@ mod tests {
Ok(())
}
#[test]
fn serialize_skills_updated_notification() -> Result<()> {
let notification =
ServerNotification::SkillsUpdated(v2::SkillsUpdatedNotification::default());
assert_eq!(
json!({
"method": "skills/updated",
"params": {}
}),
serde_json::to_value(&notification)?,
);
Ok(())
}
#[test]
fn mock_experimental_method_is_marked_experimental() {
let request = ClientRequest::MockExperimentalMethod {

View File

@@ -1673,6 +1673,16 @@ pub struct AppListUpdatedNotification {
pub data: Vec<AppInfo>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// Notification emitted when the server's local skill view changes.
///
/// The payload is intentionally empty because clients should treat this as a
/// refresh hint and call `skills/list` again, typically with `forceReload:
/// true`, instead of trying to apply an incremental update.
pub struct SkillsUpdatedNotification {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -576,7 +576,7 @@ Notes:
## Events
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `thread/closed`, `turn/*`, and `item/*` notifications.
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `thread/closed`, `turn/*`, `item/*`, and `skills/updated` notifications.
Thread realtime uses a separate thread-scoped notification surface. `thread/realtime/*` notifications are ephemeral transport events, not `ThreadItem`s, and are not returned by `thread/read`, `thread/resume`, or `thread/fork`.
@@ -804,6 +804,7 @@ Use `skills/list` to fetch the available skills (optionally scoped by `cwds`, wi
You can also add `perCwdExtraUserRoots` to scan additional absolute paths as `user` scope for specific `cwd` entries.
Entries whose `cwd` is not present in `cwds` are ignored.
`skills/list` might reuse a cached skills result per `cwd`; setting `forceReload` to `true` refreshes the result from disk.
The server emits `skills/updated` when local skill files or skill configuration changes. The empty payload is a refresh hint rather than a delta, so re-run `skills/list` (typically with `forceReload: true`) when that notification arrives.
```json
{ "method": "skills/list", "id": 25, "params": {
@@ -835,10 +836,14 @@ Entries whose `cwd` is not present in `cwds` are ignored.
}
],
"errors": []
}]
}]
} }
```
```json
{ "method": "skills/updated", "params": {} }
```
To enable or disable a skill by path:
```json

View File

@@ -58,6 +58,7 @@ use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ReasoningTextDeltaNotification;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SkillsUpdatedNotification;
use codex_app_server_protocol::TerminalInteractionNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadNameUpdatedNotification;
@@ -1459,6 +1460,9 @@ pub(crate) async fn apply_bespoke_event_handling(
)
.await;
}
EventMsg::SkillsUpdateAvailable => {
handle_skills_updated(api_version, &outgoing).await;
}
EventMsg::ShutdownComplete => {
thread_watch_manager
.note_thread_shutdown(&conversation_id.to_string())
@@ -1513,6 +1517,23 @@ async fn handle_turn_plan_update(
}
}
/// Bridges core's skill invalidation event into the v2 app-server notification stream.
///
/// Earlier protocol versions do not expose a dedicated skills refresh
/// notification, so they intentionally ignore this event.
async fn handle_skills_updated(
api_version: ApiVersion,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
if let ApiVersion::V2 = api_version {
outgoing
.send_server_notification(ServerNotification::SkillsUpdated(
SkillsUpdatedNotification::default(),
))
.await;
}
}
async fn emit_turn_completed_with_status(
conversation_id: ThreadId,
event_turn_id: String,
@@ -2536,6 +2557,29 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_handle_skills_updated_emits_notification_for_v2() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_skills_updated(ApiVersion::V2, &outgoing).await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::SkillsUpdated(payload)) => {
assert_eq!(payload, SkillsUpdatedNotification::default());
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_token_count_event_emits_usage_and_rate_limits() -> Result<()> {
let conversation_id = ThreadId::new();

View File

@@ -123,6 +123,7 @@ use codex_app_server_protocol::SkillsRemoteReadParams;
use codex_app_server_protocol::SkillsRemoteReadResponse;
use codex_app_server_protocol::SkillsRemoteWriteParams;
use codex_app_server_protocol::SkillsRemoteWriteResponse;
use codex_app_server_protocol::SkillsUpdatedNotification;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
@@ -5402,6 +5403,23 @@ impl CodexMessageProcessor {
.await;
}
/// Sends the direct-mutation refresh hint back to the initiating v2 client.
///
/// `skills/list` responses are cached per `cwd`, so direct app-server
/// writes notify the caller immediately. The filesystem watcher path can
/// still fan the same invalidation out to other subscribed clients later.
async fn send_skills_updated_notification(
outgoing: &Arc<OutgoingMessageSender>,
connection_id: ConnectionId,
) {
outgoing
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::SkillsUpdated(SkillsUpdatedNotification::default()),
)
.await;
}
async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) {
let SkillsListParams {
cwds,
@@ -5521,6 +5539,8 @@ impl CodexMessageProcessor {
match response {
Ok(downloaded) => {
let connection_id = request_id.connection_id;
self.thread_manager.skills_manager().clear_cache();
self.outgoing
.send_response(
request_id,
@@ -5530,6 +5550,7 @@ impl CodexMessageProcessor {
},
)
.await;
Self::send_skills_updated_notification(&self.outgoing, connection_id).await;
}
Err(err) => {
self.send_internal_error(
@@ -5555,6 +5576,7 @@ impl CodexMessageProcessor {
match result {
Ok(()) => {
let connection_id = request_id.connection_id;
self.thread_manager.skills_manager().clear_cache();
self.outgoing
.send_response(
@@ -5564,6 +5586,7 @@ impl CodexMessageProcessor {
},
)
.await;
Self::send_skills_updated_notification(&self.outgoing, connection_id).await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -7577,6 +7600,8 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
#[cfg(test)]
mod tests {
use super::*;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use anyhow::Result;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
@@ -7607,6 +7632,29 @@ mod tests {
validate_dynamic_tools(&tools).expect("valid schema");
}
#[tokio::test]
async fn send_skills_updated_notification_targets_requesting_connection() {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let connection_id = ConnectionId(7);
CodexMessageProcessor::send_skills_updated_notification(&outgoing, connection_id).await;
let envelope = rx.recv().await.expect("notification envelope");
match envelope {
OutgoingEnvelope::ToConnection {
connection_id: actual_connection_id,
message:
OutgoingMessage::AppServerNotification(ServerNotification::SkillsUpdated(payload)),
} => {
assert_eq!(actual_connection_id, connection_id);
assert_eq!(payload, SkillsUpdatedNotification::default());
}
other => panic!("unexpected envelope: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
#[test]
fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> {
let conversation_id = ThreadId::from_string("3f941c35-29b3-493b-b0a4-e25800d9aeb0")?;

View File

@@ -23,6 +23,7 @@ use tokio::time::sleep_until;
use tracing::warn;
use crate::config::Config;
use crate::skills::loader::repo_agents_skill_root_candidates;
use crate::skills::loader::skill_roots_from_layer_stack_with_agents;
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -30,8 +31,17 @@ pub enum FileWatcherEvent {
SkillsChanged { paths: Vec<PathBuf> },
}
#[derive(Default)]
/// Tracks logical skill roots separately from the concrete paths watched by `notify`.
///
/// Missing repo-scoped `.agents/skills` directories are watched through their
/// nearest existing ancestor until the concrete directory exists, which means
/// multiple logical roots can temporarily share one underlying watch target.
/// Other missing skill roots remain unwatched until they exist.
struct WatchState {
skills_root_ref_counts: HashMap<PathBuf, usize>,
watch_target_by_skills_root: HashMap<PathBuf, PathBuf>,
watch_target_ref_counts: HashMap<PathBuf, usize>,
}
struct FileWatcherInner {
@@ -116,9 +126,7 @@ impl FileWatcher {
watched_paths: HashMap::new(),
};
let (tx, _) = broadcast::channel(128);
let state = Arc::new(RwLock::new(WatchState {
skills_root_ref_counts: HashMap::new(),
}));
let state = Arc::new(RwLock::new(WatchState::default()));
let file_watcher = Self {
inner: Some(Mutex::new(inner)),
state: Arc::clone(&state),
@@ -132,9 +140,7 @@ impl FileWatcher {
let (tx, _) = broadcast::channel(1);
Self {
inner: None,
state: Arc::new(RwLock::new(WatchState {
skills_root_ref_counts: HashMap::new(),
})),
state: Arc::new(RwLock::new(WatchState::default())),
tx,
}
}
@@ -144,20 +150,27 @@ impl FileWatcher {
}
pub(crate) fn register_config(self: &Arc<Self>, config: &Config) -> WatchRegistration {
let deduped_roots: HashSet<PathBuf> =
let mut registered_roots: HashMap<PathBuf, bool> =
skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd)
.into_iter()
.map(|root| root.path)
.map(|root| (root.path, false))
.collect();
let mut registered_roots: Vec<PathBuf> = deduped_roots.into_iter().collect();
registered_roots.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
for root in &registered_roots {
self.register_skills_root(root.clone());
for root in repo_agents_skill_root_candidates(&config.config_layer_stack, &config.cwd) {
registered_roots
.entry(root)
.and_modify(|watch_missing_ancestor| *watch_missing_ancestor = true)
.or_insert(true);
}
let mut registered_roots: Vec<(PathBuf, bool)> = registered_roots.into_iter().collect();
registered_roots.sort_unstable_by(|(a, _), (b, _)| a.as_os_str().cmp(b.as_os_str()));
for (root, watch_missing_ancestor) in &registered_roots {
self.register_skills_root(root.clone(), *watch_missing_ancestor);
}
WatchRegistration {
file_watcher: Arc::downgrade(self),
roots: registered_roots,
roots: registered_roots.into_iter().map(|(root, _)| root).collect(),
}
}
@@ -222,7 +235,7 @@ impl FileWatcher {
}
}
fn register_skills_root(&self, root: PathBuf) {
fn register_skills_root(&self, root: PathBuf, watch_missing_ancestor: bool) {
let mut state = self
.state
.write()
@@ -232,8 +245,29 @@ impl FileWatcher {
.entry(root.clone())
.or_insert(0);
*count += 1;
if *count == 1 {
self.watch_path(root, RecursiveMode::Recursive);
if *count > 1 {
return;
}
// Only repo `.agents/skills` candidates fall back to an ancestor watch.
// Other missing roots stay unwatched until they exist.
let watch_target = if watch_missing_ancestor && !root.exists() {
root.ancestors()
.find(|path| path.exists())
.map_or_else(|| root.clone(), Path::to_path_buf)
} else {
root.clone()
};
state
.watch_target_by_skills_root
.insert(root, watch_target.clone());
let watch_count = state
.watch_target_ref_counts
.entry(watch_target.clone())
.or_insert(0);
*watch_count += 1;
if *watch_count == 1 {
self.watch_path(watch_target, RecursiveMode::Recursive);
}
}
@@ -242,22 +276,34 @@ impl FileWatcher {
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut watch_targets_to_unwatch = Vec::new();
let mut inner_guard: Option<std::sync::MutexGuard<'_, FileWatcherInner>> = None;
for root in roots {
let mut should_unwatch = false;
if let Some(count) = state.skills_root_ref_counts.get_mut(root) {
if *count > 1 {
*count -= 1;
} else {
state.skills_root_ref_counts.remove(root);
should_unwatch = true;
}
}
if !should_unwatch {
if let Some(count) = state.skills_root_ref_counts.get_mut(root)
&& *count > 1
{
*count -= 1;
continue;
}
state.skills_root_ref_counts.remove(root);
let Some(watch_target) = state.watch_target_by_skills_root.remove(root) else {
continue;
};
let Some(count) = state.watch_target_ref_counts.get_mut(&watch_target) else {
continue;
};
if *count > 1 {
*count -= 1;
continue;
}
state.watch_target_ref_counts.remove(&watch_target);
watch_targets_to_unwatch.push(watch_target);
}
for watch_target in &watch_targets_to_unwatch {
let Some(inner) = &self.inner else {
continue;
};
@@ -271,11 +317,11 @@ impl FileWatcher {
let Some(guard) = inner_guard.as_mut() else {
continue;
};
if guard.watched_paths.remove(root).is_none() {
if guard.watched_paths.remove(watch_target).is_none() {
continue;
}
if let Err(err) = guard.watcher.unwatch(root) {
warn!("failed to unwatch {}: {err}", root.display());
if let Err(err) = guard.watcher.unwatch(watch_target) {
warn!("failed to unwatch {}: {err}", watch_target.display());
}
}
}
@@ -409,6 +455,7 @@ mod tests {
let root = path("/tmp/skills");
let state = RwLock::new(WatchState {
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
..Default::default()
});
let event = notify_event(
EventKind::Create(CreateKind::Any),
@@ -428,6 +475,7 @@ mod tests {
let root_b = path("/tmp/workspace/.codex/skills");
let state = RwLock::new(WatchState {
skills_root_ref_counts: HashMap::from([(root_a.clone(), 1), (root_b.clone(), 1)]),
..Default::default()
});
let event = notify_event(
EventKind::Modify(ModifyKind::Any),
@@ -450,6 +498,7 @@ mod tests {
let root = path("/tmp/skills");
let state = RwLock::new(WatchState {
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
..Default::default()
});
let path = root.join("demo/SKILL.md");
@@ -470,9 +519,9 @@ mod tests {
fn register_skills_root_dedupes_state_entries() {
let watcher = FileWatcher::noop();
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root);
watcher.register_skills_root(path("/tmp/other-skills"));
watcher.register_skills_root(root.clone(), false);
watcher.register_skills_root(root, false);
watcher.register_skills_root(path("/tmp/other-skills"), false);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.len(), 2);
@@ -482,7 +531,7 @@ mod tests {
fn watch_registration_drop_unregisters_roots() {
let watcher = Arc::new(FileWatcher::noop());
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root.clone(), false);
let registration = WatchRegistration {
file_watcher: Arc::downgrade(&watcher),
roots: vec![root],
@@ -494,6 +543,54 @@ mod tests {
assert_eq!(state.skills_root_ref_counts.len(), 0);
}
#[test]
fn register_missing_skills_root_watches_nearest_existing_ancestor() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let watcher = FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher");
let missing_root = temp_dir.path().join(".agents").join("skills");
watcher.register_skills_root(missing_root.clone(), true);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.get(&missing_root), Some(&1));
assert_eq!(
state.watch_target_by_skills_root.get(&missing_root),
Some(&temp_dir.path().to_path_buf())
);
assert_eq!(state.watch_target_ref_counts.get(temp_dir.path()), Some(&1));
drop(state);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(
inner.watched_paths.get(temp_dir.path()),
Some(&RecursiveMode::Recursive)
);
}
#[test]
fn register_missing_non_repo_skills_root_does_not_watch_ancestor() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let watcher = FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher");
let missing_root = temp_dir.path().join("skills");
watcher.register_skills_root(missing_root.clone(), false);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.get(&missing_root), Some(&1));
assert_eq!(
state.watch_target_by_skills_root.get(&missing_root),
Some(&missing_root)
);
assert_eq!(state.watch_target_ref_counts.get(&missing_root), Some(&1));
drop(state);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(inner.watched_paths.get(&missing_root), None);
assert_eq!(inner.watched_paths.get(temp_dir.path()), None);
}
#[test]
fn unregister_holds_state_lock_until_unwatch_finishes() {
let temp_dir = tempfile::tempdir().expect("temp dir");
@@ -501,7 +598,7 @@ mod tests {
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher"));
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root.clone(), false);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner_guard = inner.lock().expect("inner lock");
@@ -524,7 +621,7 @@ mod tests {
let register_watcher = Arc::clone(&watcher);
let register_root = root.clone();
let register_thread = std::thread::spawn(move || {
register_watcher.register_skills_root(register_root);
register_watcher.register_skills_root(register_root, false);
});
drop(inner_guard);

View File

@@ -287,21 +287,32 @@ fn dedupe_skill_roots_by_path(roots: &mut Vec<SkillRoot>) {
roots.retain(|root| seen.insert(root.path.clone()));
}
fn repo_agents_skill_roots(config_layer_stack: &ConfigLayerStack, cwd: &Path) -> Vec<SkillRoot> {
/// Returns every repo-scoped `.agents/skills` path between the project root and `cwd`.
///
/// The loader filters these candidates down to existing directories, but the
/// file watcher also consumes the full list so it can watch an ancestor before a
/// new `.agents/skills` directory exists.
pub(crate) fn repo_agents_skill_root_candidates(
config_layer_stack: &ConfigLayerStack,
cwd: &Path,
) -> Vec<PathBuf> {
let project_root_markers = project_root_markers_from_stack(config_layer_stack);
let project_root = find_project_root(cwd, &project_root_markers);
let dirs = dirs_between_project_root_and_cwd(cwd, &project_root);
let mut roots = Vec::new();
for dir in dirs {
let agents_skills = dir.join(AGENTS_DIR_NAME).join(SKILLS_DIR_NAME);
if agents_skills.is_dir() {
roots.push(SkillRoot {
path: agents_skills,
scope: SkillScope::Repo,
});
}
}
roots
dirs_between_project_root_and_cwd(cwd, &project_root)
.into_iter()
.map(|dir| dir.join(AGENTS_DIR_NAME).join(SKILLS_DIR_NAME))
.collect()
}
fn repo_agents_skill_roots(config_layer_stack: &ConfigLayerStack, cwd: &Path) -> Vec<SkillRoot> {
repo_agents_skill_root_candidates(config_layer_stack, cwd)
.into_iter()
.filter(|path| path.is_dir())
.map(|path| SkillRoot {
path,
scope: SkillScope::Repo,
})
.collect()
}
fn project_root_markers_from_stack(config_layer_stack: &ConfigLayerStack) -> Vec<String> {

View File

@@ -27,7 +27,7 @@ Skills from {repo}:
Which ones would you like installed?
"""
After installing a skill, tell the user: "Restart Codex to pick up new skills."
After installing a skill, tell the user: "Codex should pick up new skills automatically. If it does not, reopen the skills list or restart Codex as a fallback."
## Scripts