Added throttling for file watcher events

This commit is contained in:
Eric Traut
2026-01-28 22:17:36 -08:00
parent f3100c77f7
commit 41e1d66a72

View File

@@ -8,6 +8,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::time::Duration;
use notify::Event;
use notify::RecommendedWatcher;
@@ -16,6 +17,8 @@ use notify::Watcher;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio::time::sleep_until;
use tracing::warn;
use crate::config::Config;
@@ -43,6 +46,52 @@ struct FileWatcherInner {
watched_paths: HashMap<PathBuf, RecursiveMode>,
}
const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
/// Coalesces bursts of paths and emits at most once per interval.
struct ThrottledPaths {
pending: HashSet<PathBuf>,
next_allowed_at: Instant,
}
impl ThrottledPaths {
fn new(now: Instant) -> Self {
Self {
pending: HashSet::new(),
next_allowed_at: now,
}
}
fn add(&mut self, paths: Vec<PathBuf>) {
self.pending.extend(paths);
}
fn next_deadline(&self, now: Instant) -> Option<Instant> {
(!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at)
}
fn take_ready(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
if self.pending.is_empty() || now < self.next_allowed_at {
return None;
}
Some(self.take_with_next_allowed(now))
}
fn take_pending(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
if self.pending.is_empty() {
return None;
}
Some(self.take_with_next_allowed(now))
}
fn take_with_next_allowed(&mut self, now: Instant) -> Vec<PathBuf> {
let mut paths: Vec<PathBuf> = self.pending.drain().collect();
paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL;
paths
}
}
pub(crate) struct FileWatcher {
inner: Option<Mutex<FileWatcherInner>>,
state: Arc<RwLock<WatchState>>,
@@ -152,23 +201,65 @@ impl FileWatcher {
) {
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
while let Some(res) = raw_rx.recv().await {
match res {
Ok(event) => {
let (agents_paths, skills_paths) = classify_event(&event, &state);
if !agents_paths.is_empty() {
let _ = tx.send(FileWatcherEvent::AgentsChanged {
paths: agents_paths,
});
}
if !skills_paths.is_empty() {
let _ = tx.send(FileWatcherEvent::SkillsChanged {
paths: skills_paths,
});
let now = Instant::now();
let mut agents = ThrottledPaths::new(now);
let mut skills = ThrottledPaths::new(now);
loop {
let now = Instant::now();
let next_deadline = match (agents.next_deadline(now), skills.next_deadline(now))
{
(Some(a), Some(s)) => Some(a.min(s)),
(Some(a), None) => Some(a),
(None, Some(s)) => Some(s),
(None, None) => None,
};
let timer_deadline = next_deadline
.unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365));
let timer = sleep_until(timer_deadline);
tokio::pin!(timer);
tokio::select! {
res = raw_rx.recv() => {
match res {
Some(Ok(event)) => {
let (agents_paths, skills_paths) = classify_event(&event, &state);
let now = Instant::now();
agents.add(agents_paths);
skills.add(skills_paths);
if let Some(paths) = agents.take_ready(now) {
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
}
if let Some(paths) = skills.take_ready(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
}
Some(Err(err)) => {
warn!("file watcher error: {err}");
}
None => {
// Flush any pending changes before shutdown so subscribers
// see the latest state.
let now = Instant::now();
if let Some(paths) = agents.take_pending(now) {
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
}
if let Some(paths) = skills.take_pending(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
break;
}
}
}
Err(err) => {
warn!("file watcher error: {err}");
_ = &mut timer => {
let now = Instant::now();
if let Some(paths) = agents.take_ready(now) {
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
}
if let Some(paths) = skills.take_ready(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
}
}
}
@@ -276,3 +367,48 @@ fn nearest_existing_ancestor(path: &Path) -> Option<PathBuf> {
cursor = cursor.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
fn path(name: &str) -> PathBuf {
PathBuf::from(name)
}
#[test]
fn throttles_and_coalesces_within_interval() {
let start = Instant::now();
let mut throttled = ThrottledPaths::new(start);
throttled.add(vec![path("a")]);
let first = throttled.take_ready(start).expect("first emit");
assert_eq!(first, vec![path("a")]);
throttled.add(vec![path("b"), path("c")]);
assert_eq!(throttled.take_ready(start), None);
let second = throttled
.take_ready(start + WATCHER_THROTTLE_INTERVAL)
.expect("coalesced emit");
assert_eq!(second, vec![path("b"), path("c")]);
}
#[test]
fn flushes_pending_on_shutdown() {
let start = Instant::now();
let mut throttled = ThrottledPaths::new(start);
throttled.add(vec![path("a")]);
let _ = throttled.take_ready(start).expect("first emit");
throttled.add(vec![path("b")]);
assert_eq!(throttled.take_ready(start), None);
let flushed = throttled
.take_pending(start)
.expect("shutdown flush emits pending paths");
assert_eq!(flushed, vec![path("b")]);
}
}