codex: address PR review findings

This commit is contained in:
Yaroslav Volovich
2026-02-26 11:18:14 +00:00
parent 21063c5c18
commit 70de95b7dc
2 changed files with 96 additions and 36 deletions

View File

@@ -5403,16 +5403,20 @@ impl CodexMessageProcessor {
.await;
}
/// Sends the app-server refresh hint after a direct skill mutation.
/// Sends the direct-mutation refresh hint back to the initiating v2 client.
///
/// `skills/list` responses are cached per `cwd`, so app-server initiated
/// writes proactively emit the same invalidation signal that the filesystem
/// watcher path can later surface to connected clients.
async fn send_skills_updated_notification(outgoing: &Arc<OutgoingMessageSender>) {
/// `skills/list` responses are cached per `cwd`, so direct app-server
/// writes notify the caller immediately. The filesystem watcher path can
/// still fan the same invalidation out to other subscribed clients later.
async fn send_skills_updated_notification(
outgoing: &Arc<OutgoingMessageSender>,
connection_id: ConnectionId,
) {
outgoing
.send_server_notification(ServerNotification::SkillsUpdated(
SkillsUpdatedNotification::default(),
))
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::SkillsUpdated(SkillsUpdatedNotification::default()),
)
.await;
}
@@ -5545,7 +5549,8 @@ impl CodexMessageProcessor {
},
)
.await;
Self::send_skills_updated_notification(&self.outgoing).await;
Self::send_skills_updated_notification(&self.outgoing, request_id.connection_id)
.await;
}
Err(err) => {
self.send_internal_error(
@@ -5580,7 +5585,8 @@ impl CodexMessageProcessor {
},
)
.await;
Self::send_skills_updated_notification(&self.outgoing).await;
Self::send_skills_updated_notification(&self.outgoing, request_id.connection_id)
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -7594,6 +7600,8 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
#[cfg(test)]
mod tests {
use super::*;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use anyhow::Result;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
@@ -7624,6 +7632,29 @@ mod tests {
validate_dynamic_tools(&tools).expect("valid schema");
}
#[tokio::test]
async fn send_skills_updated_notification_targets_requesting_connection() {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let connection_id = ConnectionId(7);
CodexMessageProcessor::send_skills_updated_notification(&outgoing, connection_id).await;
let envelope = rx.recv().await.expect("notification envelope");
match envelope {
OutgoingEnvelope::ToConnection {
connection_id: actual_connection_id,
message:
OutgoingMessage::AppServerNotification(ServerNotification::SkillsUpdated(payload)),
} => {
assert_eq!(actual_connection_id, connection_id);
assert_eq!(payload, SkillsUpdatedNotification::default());
}
other => panic!("unexpected envelope: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
#[test]
fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> {
let conversation_id = ThreadId::from_string("3f941c35-29b3-493b-b0a4-e25800d9aeb0")?;

View File

@@ -37,6 +37,7 @@ pub enum FileWatcherEvent {
/// Missing repo-scoped `.agents/skills` directories are watched through their
/// nearest existing ancestor until the concrete directory exists, which means
/// multiple logical roots can temporarily share one underlying watch target.
/// Other missing skill roots remain unwatched until they exist.
struct WatchState {
skills_root_ref_counts: HashMap<PathBuf, usize>,
watch_target_by_skills_root: HashMap<PathBuf, PathBuf>,
@@ -149,24 +150,27 @@ impl FileWatcher {
}
pub(crate) fn register_config(self: &Arc<Self>, config: &Config) -> WatchRegistration {
let deduped_roots: HashSet<PathBuf> =
let mut registered_roots: HashMap<PathBuf, bool> =
skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd)
.into_iter()
.map(|root| root.path)
.chain(repo_agents_skill_root_candidates(
&config.config_layer_stack,
&config.cwd,
))
.map(|root| (root.path, false))
.collect();
let mut registered_roots: Vec<PathBuf> = deduped_roots.into_iter().collect();
registered_roots.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
for root in &registered_roots {
self.register_skills_root(root.clone());
for root in repo_agents_skill_root_candidates(&config.config_layer_stack, &config.cwd) {
registered_roots
.entry(root)
.and_modify(|watch_missing_ancestor| *watch_missing_ancestor = true)
.or_insert(true);
}
let mut registered_roots: Vec<(PathBuf, bool)> = registered_roots.into_iter().collect();
registered_roots.sort_unstable_by(|(a, _), (b, _)| a.as_os_str().cmp(b.as_os_str()));
for (root, watch_missing_ancestor) in &registered_roots {
self.register_skills_root(root.clone(), *watch_missing_ancestor);
}
WatchRegistration {
file_watcher: Arc::downgrade(self),
roots: registered_roots,
roots: registered_roots.into_iter().map(|(root, _)| root).collect(),
}
}
@@ -231,7 +235,7 @@ impl FileWatcher {
}
}
fn register_skills_root(&self, root: PathBuf) {
fn register_skills_root(&self, root: PathBuf, watch_missing_ancestor: bool) {
let mut state = self
.state
.write()
@@ -245,13 +249,15 @@ impl FileWatcher {
return;
}
// If the concrete skills directory does not exist yet, watch the nearest
// existing ancestor so creating the directory later still triggers a
// refresh.
let watch_target = root
.ancestors()
.find(|path| path.exists())
.map_or_else(|| root.clone(), Path::to_path_buf);
// Only repo `.agents/skills` candidates fall back to an ancestor watch.
// Other missing roots stay unwatched until they exist.
let watch_target = if watch_missing_ancestor && !root.exists() {
root.ancestors()
.find(|path| path.exists())
.map_or_else(|| root.clone(), Path::to_path_buf)
} else {
root.clone()
};
state
.watch_target_by_skills_root
.insert(root, watch_target.clone());
@@ -513,9 +519,9 @@ mod tests {
fn register_skills_root_dedupes_state_entries() {
let watcher = FileWatcher::noop();
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root);
watcher.register_skills_root(path("/tmp/other-skills"));
watcher.register_skills_root(root.clone(), false);
watcher.register_skills_root(root, false);
watcher.register_skills_root(path("/tmp/other-skills"), false);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.len(), 2);
@@ -525,7 +531,7 @@ mod tests {
fn watch_registration_drop_unregisters_roots() {
let watcher = Arc::new(FileWatcher::noop());
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root.clone(), false);
let registration = WatchRegistration {
file_watcher: Arc::downgrade(&watcher),
roots: vec![root],
@@ -543,7 +549,7 @@ mod tests {
let watcher = FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher");
let missing_root = temp_dir.path().join(".agents").join("skills");
watcher.register_skills_root(missing_root.clone());
watcher.register_skills_root(missing_root.clone(), true);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.get(&missing_root), Some(&1));
@@ -562,6 +568,29 @@ mod tests {
);
}
#[test]
fn register_missing_non_repo_skills_root_does_not_watch_ancestor() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let watcher = FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher");
let missing_root = temp_dir.path().join("skills");
watcher.register_skills_root(missing_root.clone(), false);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.get(&missing_root), Some(&1));
assert_eq!(
state.watch_target_by_skills_root.get(&missing_root),
Some(&missing_root)
);
assert_eq!(state.watch_target_ref_counts.get(&missing_root), Some(&1));
drop(state);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(inner.watched_paths.get(&missing_root), None);
assert_eq!(inner.watched_paths.get(temp_dir.path()), None);
}
#[test]
fn unregister_holds_state_lock_until_unwatch_finishes() {
let temp_dir = tempfile::tempdir().expect("temp dir");
@@ -569,7 +598,7 @@ mod tests {
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher"));
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root.clone(), false);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner_guard = inner.lock().expect("inner lock");
@@ -592,7 +621,7 @@ mod tests {
let register_watcher = Arc::clone(&watcher);
let register_root = root.clone();
let register_thread = std::thread::spawn(move || {
register_watcher.register_skills_root(register_root);
register_watcher.register_skills_root(register_root, false);
});
drop(inner_guard);