Fix fs/watch debounce batching (#24716)

## Summary

`fs/watch` was using a local debounce wrapper whose deadline was
initialized once and then reused after the first batch. Once that stale
deadline was in the past, later file changes could bypass the intended
200ms debounce and send noisier `fs/changed` notifications.

This moves the debounce wrapper into `codex-file-watcher` as
`DebouncedWatchReceiver`, resets the debounce deadline for each event
batch, preserves pending paths across cancelled receives, and updates
app-server `fs/watch` to use the shared wrapper.

Fixes #24692.
This commit is contained in:
Eric Traut
2026-05-28 23:09:55 -07:00
committed by GitHub
parent 6e10142199
commit 522f549922
3 changed files with 103 additions and 44 deletions

View File

@@ -8,66 +8,24 @@ use codex_app_server_protocol::FsWatchParams;
use codex_app_server_protocol::FsWatchResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::ServerNotification;
use codex_file_watcher::DebouncedWatchReceiver;
use codex_file_watcher::FileWatcher;
use codex_file_watcher::FileWatcherEvent;
use codex_file_watcher::FileWatcherSubscriber;
use codex_file_watcher::Receiver;
use codex_file_watcher::WatchPath;
use codex_file_watcher::WatchRegistration;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::hash::Hash;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex as AsyncMutex;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Instant;
use tracing::warn;
const FS_CHANGED_NOTIFICATION_DEBOUNCE: Duration = Duration::from_millis(200);
struct DebouncedReceiver {
rx: Receiver,
interval: Duration,
changed_paths: HashSet<PathBuf>,
next_allowance: Option<Instant>,
}
impl DebouncedReceiver {
fn new(rx: Receiver, interval: Duration) -> Self {
Self {
rx,
interval,
changed_paths: HashSet::new(),
next_allowance: None,
}
}
async fn recv(&mut self) -> Option<FileWatcherEvent> {
while self.changed_paths.is_empty() {
self.changed_paths.extend(self.rx.recv().await?.paths);
}
let next_allowance = *self
.next_allowance
.get_or_insert_with(|| Instant::now() + self.interval);
loop {
tokio::select! {
event = self.rx.recv() => self.changed_paths.extend(event?.paths),
_ = tokio::time::sleep_until(next_allowance) => break,
}
}
Some(FileWatcherEvent {
paths: self.changed_paths.drain().collect(),
})
}
}
#[derive(Clone)]
pub(crate) struct FsWatchManager {
outgoing: Arc<OutgoingMessageSender>,
@@ -151,7 +109,7 @@ impl FsWatchManager {
let task_watch_id = watch_id.clone();
tokio::spawn(async move {
let mut rx = DebouncedReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE);
let mut rx = DebouncedWatchReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE);
tokio::pin!(terminate_rx);
loop {
let event = tokio::select! {
@@ -219,6 +177,8 @@ mod tests {
use super::*;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use std::collections::HashSet;
use std::path::PathBuf;
use tempfile::TempDir;
fn absolute_path(path: PathBuf) -> AbsolutePathBuf {

View File

@@ -86,6 +86,62 @@ async fn throttled_receiver_flushes_pending_on_shutdown() {
assert_eq!(closed, None);
}
#[tokio::test]
async fn debounced_receiver_coalesces_each_event_batch() {
let (tx, rx) = watch_channel();
let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
tx.add_changed_paths(&[path("a")]).await;
let first = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv())
.await
.expect("first emit timeout");
assert_eq!(
first,
Some(FileWatcherEvent {
paths: vec![path("a")],
})
);
tx.add_changed_paths(&[path("c")]).await;
let blocked = timeout(TEST_THROTTLE_INTERVAL / 2, debounced.recv()).await;
assert_eq!(blocked.is_err(), true);
tx.add_changed_paths(&[path("d")]).await;
let second = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv())
.await
.expect("second emit timeout");
assert_eq!(
second,
Some(FileWatcherEvent {
paths: vec![path("c"), path("d")],
})
);
}
#[tokio::test]
async fn debounced_receiver_flushes_pending_on_shutdown() {
let (tx, rx) = watch_channel();
let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
tx.add_changed_paths(&[path("a")]).await;
drop(tx);
let flushed = timeout(Duration::from_secs(1), debounced.recv())
.await
.expect("shutdown flush timeout");
assert_eq!(
flushed,
Some(FileWatcherEvent {
paths: vec![path("a")],
})
);
let closed = timeout(Duration::from_secs(1), debounced.recv())
.await
.expect("closed recv timeout");
assert_eq!(closed, None);
}
#[test]
fn is_mutating_event_filters_non_mutating_event_kinds() {
assert_eq!(

View File

@@ -253,6 +253,49 @@ impl ThrottledWatchReceiver {
}
}
/// Coalesces file watcher notifications that arrive within a fixed debounce
/// window after the first event in each batch.
pub struct DebouncedWatchReceiver {
rx: Receiver,
interval: Duration,
changed_paths: BTreeSet<PathBuf>,
}
impl DebouncedWatchReceiver {
/// Creates a debouncing wrapper around a raw watcher [`Receiver`].
pub fn new(rx: Receiver, interval: Duration) -> Self {
Self {
rx,
interval,
changed_paths: BTreeSet::new(),
}
}
/// Receives the next debounced event batch.
pub async fn recv(&mut self) -> Option<FileWatcherEvent> {
while self.changed_paths.is_empty() {
self.changed_paths.extend(self.rx.recv().await?.paths);
}
let deadline = Instant::now() + self.interval;
loop {
tokio::select! {
event = self.rx.recv() => match event {
Some(event) => self.changed_paths.extend(event.paths),
None => break,
},
_ = sleep_until(deadline) => break,
}
}
Some(FileWatcherEvent {
paths: std::mem::take(&mut self.changed_paths)
.into_iter()
.collect(),
})
}
}
/// Handle used to register watched paths for one logical consumer.
pub struct FileWatcherSubscriber {
id: SubscriberId,