mirror of
https://github.com/openai/codex.git
synced 2026-03-04 21:53:21 +00:00
Compare commits
4 Commits
fix/notify
...
codex/skil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42b85a431f | ||
|
|
70de95b7dc | ||
|
|
21063c5c18 | ||
|
|
1eac244927 |
@@ -1449,6 +1449,10 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SkillsUpdatedNotification": {
|
||||
"description": "Notification emitted when the server's local skill view changes.\n\nThe payload is intentionally empty because clients should treat this as a refresh hint and call `skills/list` again, typically with `forceReload: true`, instead of trying to apply an incremental update.",
|
||||
"type": "object"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -3522,6 +3526,26 @@
|
||||
"title": "App/list/updatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"method": {
|
||||
"enum": [
|
||||
"skills/updated"
|
||||
],
|
||||
"title": "Skills/updatedNotificationMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/SkillsUpdatedNotification"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Skills/updatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"method": {
|
||||
|
||||
@@ -6294,6 +6294,26 @@
|
||||
"title": "App/list/updatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"method": {
|
||||
"enum": [
|
||||
"skills/updated"
|
||||
],
|
||||
"title": "Skills/updatedNotificationMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/v2/SkillsUpdatedNotification"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Skills/updatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"method": {
|
||||
@@ -12207,6 +12227,12 @@
|
||||
"title": "SkillsRemoteWriteResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SkillsUpdatedNotification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"description": "Notification emitted when the server's local skill view changes.\n\nThe payload is intentionally empty because clients should treat this as a refresh hint and call `skills/list` again, typically with `forceReload: true`, instead of trying to apply an incremental update.",
|
||||
"title": "SkillsUpdatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"description": "Notification emitted when the server's local skill view changes.\n\nThe payload is intentionally empty because clients should treat this as a refresh hint and call `skills/list` again, typically with `forceReload: true`, instead of trying to apply an incremental update.",
|
||||
"title": "SkillsUpdatedNotification",
|
||||
"type": "object"
|
||||
}
|
||||
@@ -27,6 +27,7 @@ import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemC
|
||||
import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification";
|
||||
import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification";
|
||||
import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification";
|
||||
import type { SkillsUpdatedNotification } from "./v2/SkillsUpdatedNotification";
|
||||
import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification";
|
||||
import type { ThreadArchivedNotification } from "./v2/ThreadArchivedNotification";
|
||||
import type { ThreadClosedNotification } from "./v2/ThreadClosedNotification";
|
||||
@@ -50,4 +51,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
|
||||
/**
|
||||
* Notification sent from the server to the client.
|
||||
*/
|
||||
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
|
||||
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "skills/updated", "params": SkillsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
/**
|
||||
* Notification emitted when the server's local skill view changes.
|
||||
*
|
||||
* The payload is intentionally empty because clients should treat this as a
|
||||
* refresh hint and call `skills/list` again, typically with `forceReload:
|
||||
* true`, instead of trying to apply an incremental update.
|
||||
*/
|
||||
export type SkillsUpdatedNotification = Record<string, never>;
|
||||
@@ -157,6 +157,7 @@ export type { SkillsRemoteReadParams } from "./SkillsRemoteReadParams";
|
||||
export type { SkillsRemoteReadResponse } from "./SkillsRemoteReadResponse";
|
||||
export type { SkillsRemoteWriteParams } from "./SkillsRemoteWriteParams";
|
||||
export type { SkillsRemoteWriteResponse } from "./SkillsRemoteWriteResponse";
|
||||
export type { SkillsUpdatedNotification } from "./SkillsUpdatedNotification";
|
||||
export type { TerminalInteractionNotification } from "./TerminalInteractionNotification";
|
||||
export type { TextElement } from "./TextElement";
|
||||
export type { TextPosition } from "./TextPosition";
|
||||
|
||||
@@ -843,6 +843,7 @@ server_notification_definitions! {
|
||||
AccountUpdated => "account/updated" (v2::AccountUpdatedNotification),
|
||||
AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification),
|
||||
AppListUpdated => "app/list/updated" (v2::AppListUpdatedNotification),
|
||||
SkillsUpdated => "skills/updated" (v2::SkillsUpdatedNotification),
|
||||
ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification),
|
||||
ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification),
|
||||
ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification),
|
||||
@@ -1468,6 +1469,20 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_skills_updated_notification() -> Result<()> {
|
||||
let notification =
|
||||
ServerNotification::SkillsUpdated(v2::SkillsUpdatedNotification::default());
|
||||
assert_eq!(
|
||||
json!({
|
||||
"method": "skills/updated",
|
||||
"params": {}
|
||||
}),
|
||||
serde_json::to_value(¬ification)?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mock_experimental_method_is_marked_experimental() {
|
||||
let request = ClientRequest::MockExperimentalMethod {
|
||||
|
||||
@@ -1673,6 +1673,16 @@ pub struct AppListUpdatedNotification {
|
||||
pub data: Vec<AppInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
/// Notification emitted when the server's local skill view changes.
|
||||
///
|
||||
/// The payload is intentionally empty because clients should treat this as a
|
||||
/// refresh hint and call `skills/list` again, typically with `forceReload:
|
||||
/// true`, instead of trying to apply an incremental update.
|
||||
pub struct SkillsUpdatedNotification {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -576,7 +576,7 @@ Notes:
|
||||
|
||||
## Events
|
||||
|
||||
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `thread/closed`, `turn/*`, and `item/*` notifications.
|
||||
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `thread/closed`, `turn/*`, `item/*`, and `skills/updated` notifications.
|
||||
|
||||
Thread realtime uses a separate thread-scoped notification surface. `thread/realtime/*` notifications are ephemeral transport events, not `ThreadItem`s, and are not returned by `thread/read`, `thread/resume`, or `thread/fork`.
|
||||
|
||||
@@ -804,6 +804,7 @@ Use `skills/list` to fetch the available skills (optionally scoped by `cwds`, wi
|
||||
You can also add `perCwdExtraUserRoots` to scan additional absolute paths as `user` scope for specific `cwd` entries.
|
||||
Entries whose `cwd` is not present in `cwds` are ignored.
|
||||
`skills/list` might reuse a cached skills result per `cwd`; setting `forceReload` to `true` refreshes the result from disk.
|
||||
The server emits `skills/updated` when local skill files or skill configuration changes. The empty payload is a refresh hint rather than a delta, so re-run `skills/list` (typically with `forceReload: true`) when that notification arrives.
|
||||
|
||||
```json
|
||||
{ "method": "skills/list", "id": 25, "params": {
|
||||
@@ -835,10 +836,14 @@ Entries whose `cwd` is not present in `cwds` are ignored.
|
||||
}
|
||||
],
|
||||
"errors": []
|
||||
}]
|
||||
}]
|
||||
} }
|
||||
```
|
||||
|
||||
```json
|
||||
{ "method": "skills/updated", "params": {} }
|
||||
```
|
||||
|
||||
To enable or disable a skill by path:
|
||||
|
||||
```json
|
||||
|
||||
@@ -58,6 +58,7 @@ use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
|
||||
use codex_app_server_protocol::ReasoningTextDeltaNotification;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::SkillsUpdatedNotification;
|
||||
use codex_app_server_protocol::TerminalInteractionNotification;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadNameUpdatedNotification;
|
||||
@@ -1459,6 +1460,9 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
)
|
||||
.await;
|
||||
}
|
||||
EventMsg::SkillsUpdateAvailable => {
|
||||
handle_skills_updated(api_version, &outgoing).await;
|
||||
}
|
||||
EventMsg::ShutdownComplete => {
|
||||
thread_watch_manager
|
||||
.note_thread_shutdown(&conversation_id.to_string())
|
||||
@@ -1513,6 +1517,23 @@ async fn handle_turn_plan_update(
|
||||
}
|
||||
}
|
||||
|
||||
/// Bridges core's skill invalidation event into the v2 app-server notification stream.
|
||||
///
|
||||
/// Earlier protocol versions do not expose a dedicated skills refresh
|
||||
/// notification, so they intentionally ignore this event.
|
||||
async fn handle_skills_updated(
|
||||
api_version: ApiVersion,
|
||||
outgoing: &ThreadScopedOutgoingMessageSender,
|
||||
) {
|
||||
if let ApiVersion::V2 = api_version {
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::SkillsUpdated(
|
||||
SkillsUpdatedNotification::default(),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn emit_turn_completed_with_status(
|
||||
conversation_id: ThreadId,
|
||||
event_turn_id: String,
|
||||
@@ -2536,6 +2557,29 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_skills_updated_emits_notification_for_v2() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
ThreadId::new(),
|
||||
);
|
||||
|
||||
handle_skills_updated(ApiVersion::V2, &outgoing).await;
|
||||
|
||||
let msg = recv_broadcast_message(&mut rx).await?;
|
||||
match msg {
|
||||
OutgoingMessage::AppServerNotification(ServerNotification::SkillsUpdated(payload)) => {
|
||||
assert_eq!(payload, SkillsUpdatedNotification::default());
|
||||
}
|
||||
other => bail!("unexpected message: {other:?}"),
|
||||
}
|
||||
assert!(rx.try_recv().is_err(), "no extra messages expected");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_token_count_event_emits_usage_and_rate_limits() -> Result<()> {
|
||||
let conversation_id = ThreadId::new();
|
||||
|
||||
@@ -123,6 +123,7 @@ use codex_app_server_protocol::SkillsRemoteReadParams;
|
||||
use codex_app_server_protocol::SkillsRemoteReadResponse;
|
||||
use codex_app_server_protocol::SkillsRemoteWriteParams;
|
||||
use codex_app_server_protocol::SkillsRemoteWriteResponse;
|
||||
use codex_app_server_protocol::SkillsUpdatedNotification;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
@@ -5402,6 +5403,23 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Sends the direct-mutation refresh hint back to the initiating v2 client.
|
||||
///
|
||||
/// `skills/list` responses are cached per `cwd`, so direct app-server
|
||||
/// writes notify the caller immediately. The filesystem watcher path can
|
||||
/// still fan the same invalidation out to other subscribed clients later.
|
||||
async fn send_skills_updated_notification(
|
||||
outgoing: &Arc<OutgoingMessageSender>,
|
||||
connection_id: ConnectionId,
|
||||
) {
|
||||
outgoing
|
||||
.send_server_notification_to_connections(
|
||||
&[connection_id],
|
||||
ServerNotification::SkillsUpdated(SkillsUpdatedNotification::default()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) {
|
||||
let SkillsListParams {
|
||||
cwds,
|
||||
@@ -5521,6 +5539,8 @@ impl CodexMessageProcessor {
|
||||
|
||||
match response {
|
||||
Ok(downloaded) => {
|
||||
let connection_id = request_id.connection_id;
|
||||
self.thread_manager.skills_manager().clear_cache();
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
@@ -5530,6 +5550,7 @@ impl CodexMessageProcessor {
|
||||
},
|
||||
)
|
||||
.await;
|
||||
Self::send_skills_updated_notification(&self.outgoing, connection_id).await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
@@ -5555,6 +5576,7 @@ impl CodexMessageProcessor {
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
let connection_id = request_id.connection_id;
|
||||
self.thread_manager.skills_manager().clear_cache();
|
||||
self.outgoing
|
||||
.send_response(
|
||||
@@ -5564,6 +5586,7 @@ impl CodexMessageProcessor {
|
||||
},
|
||||
)
|
||||
.await;
|
||||
Self::send_skills_updated_notification(&self.outgoing, connection_id).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -7577,6 +7600,8 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::outgoing_message::OutgoingEnvelope;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
@@ -7607,6 +7632,29 @@ mod tests {
|
||||
validate_dynamic_tools(&tools).expect("valid schema");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_skills_updated_notification_targets_requesting_connection() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let connection_id = ConnectionId(7);
|
||||
|
||||
CodexMessageProcessor::send_skills_updated_notification(&outgoing, connection_id).await;
|
||||
|
||||
let envelope = rx.recv().await.expect("notification envelope");
|
||||
match envelope {
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id: actual_connection_id,
|
||||
message:
|
||||
OutgoingMessage::AppServerNotification(ServerNotification::SkillsUpdated(payload)),
|
||||
} => {
|
||||
assert_eq!(actual_connection_id, connection_id);
|
||||
assert_eq!(payload, SkillsUpdatedNotification::default());
|
||||
}
|
||||
other => panic!("unexpected envelope: {other:?}"),
|
||||
}
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> {
|
||||
let conversation_id = ThreadId::from_string("3f941c35-29b3-493b-b0a4-e25800d9aeb0")?;
|
||||
|
||||
@@ -23,6 +23,7 @@ use tokio::time::sleep_until;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::skills::loader::repo_agents_skill_root_candidates;
|
||||
use crate::skills::loader::skill_roots_from_layer_stack_with_agents;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -30,8 +31,17 @@ pub enum FileWatcherEvent {
|
||||
SkillsChanged { paths: Vec<PathBuf> },
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
/// Tracks logical skill roots separately from the concrete paths watched by `notify`.
|
||||
///
|
||||
/// Missing repo-scoped `.agents/skills` directories are watched through their
|
||||
/// nearest existing ancestor until the concrete directory exists, which means
|
||||
/// multiple logical roots can temporarily share one underlying watch target.
|
||||
/// Other missing skill roots remain unwatched until they exist.
|
||||
struct WatchState {
|
||||
skills_root_ref_counts: HashMap<PathBuf, usize>,
|
||||
watch_target_by_skills_root: HashMap<PathBuf, PathBuf>,
|
||||
watch_target_ref_counts: HashMap<PathBuf, usize>,
|
||||
}
|
||||
|
||||
struct FileWatcherInner {
|
||||
@@ -116,9 +126,7 @@ impl FileWatcher {
|
||||
watched_paths: HashMap::new(),
|
||||
};
|
||||
let (tx, _) = broadcast::channel(128);
|
||||
let state = Arc::new(RwLock::new(WatchState {
|
||||
skills_root_ref_counts: HashMap::new(),
|
||||
}));
|
||||
let state = Arc::new(RwLock::new(WatchState::default()));
|
||||
let file_watcher = Self {
|
||||
inner: Some(Mutex::new(inner)),
|
||||
state: Arc::clone(&state),
|
||||
@@ -132,9 +140,7 @@ impl FileWatcher {
|
||||
let (tx, _) = broadcast::channel(1);
|
||||
Self {
|
||||
inner: None,
|
||||
state: Arc::new(RwLock::new(WatchState {
|
||||
skills_root_ref_counts: HashMap::new(),
|
||||
})),
|
||||
state: Arc::new(RwLock::new(WatchState::default())),
|
||||
tx,
|
||||
}
|
||||
}
|
||||
@@ -144,20 +150,27 @@ impl FileWatcher {
|
||||
}
|
||||
|
||||
pub(crate) fn register_config(self: &Arc<Self>, config: &Config) -> WatchRegistration {
|
||||
let deduped_roots: HashSet<PathBuf> =
|
||||
let mut registered_roots: HashMap<PathBuf, bool> =
|
||||
skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd)
|
||||
.into_iter()
|
||||
.map(|root| root.path)
|
||||
.map(|root| (root.path, false))
|
||||
.collect();
|
||||
let mut registered_roots: Vec<PathBuf> = deduped_roots.into_iter().collect();
|
||||
registered_roots.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
|
||||
for root in ®istered_roots {
|
||||
self.register_skills_root(root.clone());
|
||||
for root in repo_agents_skill_root_candidates(&config.config_layer_stack, &config.cwd) {
|
||||
registered_roots
|
||||
.entry(root)
|
||||
.and_modify(|watch_missing_ancestor| *watch_missing_ancestor = true)
|
||||
.or_insert(true);
|
||||
}
|
||||
|
||||
let mut registered_roots: Vec<(PathBuf, bool)> = registered_roots.into_iter().collect();
|
||||
registered_roots.sort_unstable_by(|(a, _), (b, _)| a.as_os_str().cmp(b.as_os_str()));
|
||||
for (root, watch_missing_ancestor) in ®istered_roots {
|
||||
self.register_skills_root(root.clone(), *watch_missing_ancestor);
|
||||
}
|
||||
|
||||
WatchRegistration {
|
||||
file_watcher: Arc::downgrade(self),
|
||||
roots: registered_roots,
|
||||
roots: registered_roots.into_iter().map(|(root, _)| root).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,7 +235,7 @@ impl FileWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
fn register_skills_root(&self, root: PathBuf) {
|
||||
fn register_skills_root(&self, root: PathBuf, watch_missing_ancestor: bool) {
|
||||
let mut state = self
|
||||
.state
|
||||
.write()
|
||||
@@ -232,8 +245,29 @@ impl FileWatcher {
|
||||
.entry(root.clone())
|
||||
.or_insert(0);
|
||||
*count += 1;
|
||||
if *count == 1 {
|
||||
self.watch_path(root, RecursiveMode::Recursive);
|
||||
if *count > 1 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only repo `.agents/skills` candidates fall back to an ancestor watch.
|
||||
// Other missing roots stay unwatched until they exist.
|
||||
let watch_target = if watch_missing_ancestor && !root.exists() {
|
||||
root.ancestors()
|
||||
.find(|path| path.exists())
|
||||
.map_or_else(|| root.clone(), Path::to_path_buf)
|
||||
} else {
|
||||
root.clone()
|
||||
};
|
||||
state
|
||||
.watch_target_by_skills_root
|
||||
.insert(root, watch_target.clone());
|
||||
let watch_count = state
|
||||
.watch_target_ref_counts
|
||||
.entry(watch_target.clone())
|
||||
.or_insert(0);
|
||||
*watch_count += 1;
|
||||
if *watch_count == 1 {
|
||||
self.watch_path(watch_target, RecursiveMode::Recursive);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,22 +276,34 @@ impl FileWatcher {
|
||||
.state
|
||||
.write()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let mut watch_targets_to_unwatch = Vec::new();
|
||||
let mut inner_guard: Option<std::sync::MutexGuard<'_, FileWatcherInner>> = None;
|
||||
|
||||
for root in roots {
|
||||
let mut should_unwatch = false;
|
||||
if let Some(count) = state.skills_root_ref_counts.get_mut(root) {
|
||||
if *count > 1 {
|
||||
*count -= 1;
|
||||
} else {
|
||||
state.skills_root_ref_counts.remove(root);
|
||||
should_unwatch = true;
|
||||
}
|
||||
}
|
||||
|
||||
if !should_unwatch {
|
||||
if let Some(count) = state.skills_root_ref_counts.get_mut(root)
|
||||
&& *count > 1
|
||||
{
|
||||
*count -= 1;
|
||||
continue;
|
||||
}
|
||||
state.skills_root_ref_counts.remove(root);
|
||||
|
||||
let Some(watch_target) = state.watch_target_by_skills_root.remove(root) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(count) = state.watch_target_ref_counts.get_mut(&watch_target) else {
|
||||
continue;
|
||||
};
|
||||
if *count > 1 {
|
||||
*count -= 1;
|
||||
continue;
|
||||
}
|
||||
state.watch_target_ref_counts.remove(&watch_target);
|
||||
watch_targets_to_unwatch.push(watch_target);
|
||||
}
|
||||
|
||||
for watch_target in &watch_targets_to_unwatch {
|
||||
let Some(inner) = &self.inner else {
|
||||
continue;
|
||||
};
|
||||
@@ -271,11 +317,11 @@ impl FileWatcher {
|
||||
let Some(guard) = inner_guard.as_mut() else {
|
||||
continue;
|
||||
};
|
||||
if guard.watched_paths.remove(root).is_none() {
|
||||
if guard.watched_paths.remove(watch_target).is_none() {
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = guard.watcher.unwatch(root) {
|
||||
warn!("failed to unwatch {}: {err}", root.display());
|
||||
if let Err(err) = guard.watcher.unwatch(watch_target) {
|
||||
warn!("failed to unwatch {}: {err}", watch_target.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -409,6 +455,7 @@ mod tests {
|
||||
let root = path("/tmp/skills");
|
||||
let state = RwLock::new(WatchState {
|
||||
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
|
||||
..Default::default()
|
||||
});
|
||||
let event = notify_event(
|
||||
EventKind::Create(CreateKind::Any),
|
||||
@@ -428,6 +475,7 @@ mod tests {
|
||||
let root_b = path("/tmp/workspace/.codex/skills");
|
||||
let state = RwLock::new(WatchState {
|
||||
skills_root_ref_counts: HashMap::from([(root_a.clone(), 1), (root_b.clone(), 1)]),
|
||||
..Default::default()
|
||||
});
|
||||
let event = notify_event(
|
||||
EventKind::Modify(ModifyKind::Any),
|
||||
@@ -450,6 +498,7 @@ mod tests {
|
||||
let root = path("/tmp/skills");
|
||||
let state = RwLock::new(WatchState {
|
||||
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
|
||||
..Default::default()
|
||||
});
|
||||
let path = root.join("demo/SKILL.md");
|
||||
|
||||
@@ -470,9 +519,9 @@ mod tests {
|
||||
fn register_skills_root_dedupes_state_entries() {
|
||||
let watcher = FileWatcher::noop();
|
||||
let root = path("/tmp/skills");
|
||||
watcher.register_skills_root(root.clone());
|
||||
watcher.register_skills_root(root);
|
||||
watcher.register_skills_root(path("/tmp/other-skills"));
|
||||
watcher.register_skills_root(root.clone(), false);
|
||||
watcher.register_skills_root(root, false);
|
||||
watcher.register_skills_root(path("/tmp/other-skills"), false);
|
||||
|
||||
let state = watcher.state.read().expect("state lock");
|
||||
assert_eq!(state.skills_root_ref_counts.len(), 2);
|
||||
@@ -482,7 +531,7 @@ mod tests {
|
||||
fn watch_registration_drop_unregisters_roots() {
|
||||
let watcher = Arc::new(FileWatcher::noop());
|
||||
let root = path("/tmp/skills");
|
||||
watcher.register_skills_root(root.clone());
|
||||
watcher.register_skills_root(root.clone(), false);
|
||||
let registration = WatchRegistration {
|
||||
file_watcher: Arc::downgrade(&watcher),
|
||||
roots: vec![root],
|
||||
@@ -494,6 +543,54 @@ mod tests {
|
||||
assert_eq!(state.skills_root_ref_counts.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_missing_skills_root_watches_nearest_existing_ancestor() {
|
||||
let temp_dir = tempfile::tempdir().expect("temp dir");
|
||||
let watcher = FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher");
|
||||
let missing_root = temp_dir.path().join(".agents").join("skills");
|
||||
|
||||
watcher.register_skills_root(missing_root.clone(), true);
|
||||
|
||||
let state = watcher.state.read().expect("state lock");
|
||||
assert_eq!(state.skills_root_ref_counts.get(&missing_root), Some(&1));
|
||||
assert_eq!(
|
||||
state.watch_target_by_skills_root.get(&missing_root),
|
||||
Some(&temp_dir.path().to_path_buf())
|
||||
);
|
||||
assert_eq!(state.watch_target_ref_counts.get(temp_dir.path()), Some(&1));
|
||||
drop(state);
|
||||
|
||||
let inner = watcher.inner.as_ref().expect("watcher inner");
|
||||
let inner = inner.lock().expect("inner lock");
|
||||
assert_eq!(
|
||||
inner.watched_paths.get(temp_dir.path()),
|
||||
Some(&RecursiveMode::Recursive)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_missing_non_repo_skills_root_does_not_watch_ancestor() {
|
||||
let temp_dir = tempfile::tempdir().expect("temp dir");
|
||||
let watcher = FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher");
|
||||
let missing_root = temp_dir.path().join("skills");
|
||||
|
||||
watcher.register_skills_root(missing_root.clone(), false);
|
||||
|
||||
let state = watcher.state.read().expect("state lock");
|
||||
assert_eq!(state.skills_root_ref_counts.get(&missing_root), Some(&1));
|
||||
assert_eq!(
|
||||
state.watch_target_by_skills_root.get(&missing_root),
|
||||
Some(&missing_root)
|
||||
);
|
||||
assert_eq!(state.watch_target_ref_counts.get(&missing_root), Some(&1));
|
||||
drop(state);
|
||||
|
||||
let inner = watcher.inner.as_ref().expect("watcher inner");
|
||||
let inner = inner.lock().expect("inner lock");
|
||||
assert_eq!(inner.watched_paths.get(&missing_root), None);
|
||||
assert_eq!(inner.watched_paths.get(temp_dir.path()), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unregister_holds_state_lock_until_unwatch_finishes() {
|
||||
let temp_dir = tempfile::tempdir().expect("temp dir");
|
||||
@@ -501,7 +598,7 @@ mod tests {
|
||||
std::fs::create_dir(&root).expect("create root");
|
||||
|
||||
let watcher = Arc::new(FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher"));
|
||||
watcher.register_skills_root(root.clone());
|
||||
watcher.register_skills_root(root.clone(), false);
|
||||
|
||||
let inner = watcher.inner.as_ref().expect("watcher inner");
|
||||
let inner_guard = inner.lock().expect("inner lock");
|
||||
@@ -524,7 +621,7 @@ mod tests {
|
||||
let register_watcher = Arc::clone(&watcher);
|
||||
let register_root = root.clone();
|
||||
let register_thread = std::thread::spawn(move || {
|
||||
register_watcher.register_skills_root(register_root);
|
||||
register_watcher.register_skills_root(register_root, false);
|
||||
});
|
||||
|
||||
drop(inner_guard);
|
||||
|
||||
@@ -287,21 +287,32 @@ fn dedupe_skill_roots_by_path(roots: &mut Vec<SkillRoot>) {
|
||||
roots.retain(|root| seen.insert(root.path.clone()));
|
||||
}
|
||||
|
||||
fn repo_agents_skill_roots(config_layer_stack: &ConfigLayerStack, cwd: &Path) -> Vec<SkillRoot> {
|
||||
/// Returns every repo-scoped `.agents/skills` path between the project root and `cwd`.
|
||||
///
|
||||
/// The loader filters these candidates down to existing directories, but the
|
||||
/// file watcher also consumes the full list so it can watch an ancestor before a
|
||||
/// new `.agents/skills` directory exists.
|
||||
pub(crate) fn repo_agents_skill_root_candidates(
|
||||
config_layer_stack: &ConfigLayerStack,
|
||||
cwd: &Path,
|
||||
) -> Vec<PathBuf> {
|
||||
let project_root_markers = project_root_markers_from_stack(config_layer_stack);
|
||||
let project_root = find_project_root(cwd, &project_root_markers);
|
||||
let dirs = dirs_between_project_root_and_cwd(cwd, &project_root);
|
||||
let mut roots = Vec::new();
|
||||
for dir in dirs {
|
||||
let agents_skills = dir.join(AGENTS_DIR_NAME).join(SKILLS_DIR_NAME);
|
||||
if agents_skills.is_dir() {
|
||||
roots.push(SkillRoot {
|
||||
path: agents_skills,
|
||||
scope: SkillScope::Repo,
|
||||
});
|
||||
}
|
||||
}
|
||||
roots
|
||||
dirs_between_project_root_and_cwd(cwd, &project_root)
|
||||
.into_iter()
|
||||
.map(|dir| dir.join(AGENTS_DIR_NAME).join(SKILLS_DIR_NAME))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn repo_agents_skill_roots(config_layer_stack: &ConfigLayerStack, cwd: &Path) -> Vec<SkillRoot> {
|
||||
repo_agents_skill_root_candidates(config_layer_stack, cwd)
|
||||
.into_iter()
|
||||
.filter(|path| path.is_dir())
|
||||
.map(|path| SkillRoot {
|
||||
path,
|
||||
scope: SkillScope::Repo,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn project_root_markers_from_stack(config_layer_stack: &ConfigLayerStack) -> Vec<String> {
|
||||
|
||||
@@ -27,7 +27,7 @@ Skills from {repo}:
|
||||
Which ones would you like installed?
|
||||
"""
|
||||
|
||||
After installing a skill, tell the user: "Restart Codex to pick up new skills."
|
||||
After installing a skill, tell the user: "Codex should pick up new skills automatically. If it does not, reopen the skills list or restart Codex as a fallback."
|
||||
|
||||
## Scripts
|
||||
|
||||
|
||||
Reference in New Issue
Block a user