fix: file watcher (#12105)

The issue was that the file_watcher never unsubscribe a file watch. All
of them leave in the owning of the ThreadManager. As a result, for each
newly created thread we create a new file watcher but this one never get
deleted even if we close the thread. On Unix system, a file watcher uses
an `inotify` and after some time we end up having consumed all of them.

This PR adds a mechanism to unsubscribe a file watcher when a thread is
dropped
This commit is contained in:
jif-oai
2026-02-18 18:28:34 +00:00
committed by GitHub
parent 999576f7b8
commit f675bf9334
3 changed files with 182 additions and 29 deletions

View File

@@ -3,6 +3,7 @@ use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::file_watcher::WatchRegistration;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
@@ -33,15 +34,21 @@ pub struct ThreadConfigSnapshot {
pub struct CodexThread {
codex: Codex,
rollout_path: Option<PathBuf>,
_watch_registration: WatchRegistration,
}
/// Conduit for the bidirectional stream of messages that compose a thread
/// (formerly called a conversation) in Codex.
impl CodexThread {
pub(crate) fn new(codex: Codex, rollout_path: Option<PathBuf>) -> Self {
pub(crate) fn new(
codex: Codex,
rollout_path: Option<PathBuf>,
watch_registration: WatchRegistration,
) -> Self {
Self {
codex,
rollout_path,
_watch_registration: watch_registration,
}
}

View File

@@ -31,7 +31,7 @@ pub enum FileWatcherEvent {
}
struct WatchState {
skills_roots: HashSet<PathBuf>,
skills_root_ref_counts: HashMap<PathBuf, usize>,
}
struct FileWatcherInner {
@@ -91,6 +91,19 @@ pub(crate) struct FileWatcher {
tx: broadcast::Sender<FileWatcherEvent>,
}
pub(crate) struct WatchRegistration {
file_watcher: std::sync::Weak<FileWatcher>,
roots: Vec<PathBuf>,
}
impl Drop for WatchRegistration {
fn drop(&mut self) {
if let Some(file_watcher) = self.file_watcher.upgrade() {
file_watcher.unregister_roots(&self.roots);
}
}
}
impl FileWatcher {
pub(crate) fn new(_codex_home: PathBuf) -> notify::Result<Self> {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
@@ -104,7 +117,7 @@ impl FileWatcher {
};
let (tx, _) = broadcast::channel(128);
let state = Arc::new(RwLock::new(WatchState {
skills_roots: HashSet::new(),
skills_root_ref_counts: HashMap::new(),
}));
let file_watcher = Self {
inner: Some(Mutex::new(inner)),
@@ -120,7 +133,7 @@ impl FileWatcher {
Self {
inner: None,
state: Arc::new(RwLock::new(WatchState {
skills_roots: HashSet::new(),
skills_root_ref_counts: HashMap::new(),
})),
tx,
}
@@ -130,11 +143,21 @@ impl FileWatcher {
self.tx.subscribe()
}
pub(crate) fn register_config(&self, config: &Config) {
let roots =
skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd);
for root in roots {
self.register_skills_root(root.path);
pub(crate) fn register_config(self: &Arc<Self>, config: &Config) -> WatchRegistration {
let deduped_roots: HashSet<PathBuf> =
skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd)
.into_iter()
.map(|root| root.path)
.collect();
let mut registered_roots: Vec<PathBuf> = deduped_roots.into_iter().collect();
registered_roots.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
for root in &registered_roots {
self.register_skills_root(root.clone());
}
WatchRegistration {
file_watcher: Arc::downgrade(self),
roots: registered_roots,
}
}
@@ -200,14 +223,61 @@ impl FileWatcher {
}
fn register_skills_root(&self, root: PathBuf) {
{
let mut state = match self.state.write() {
Ok(state) => state,
Err(err) => err.into_inner(),
};
state.skills_roots.insert(root.clone());
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let count = state
.skills_root_ref_counts
.entry(root.clone())
.or_insert(0);
*count += 1;
if *count == 1 {
self.watch_path(root, RecursiveMode::Recursive);
}
}
fn unregister_roots(&self, roots: &[PathBuf]) {
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut inner_guard: Option<std::sync::MutexGuard<'_, FileWatcherInner>> = None;
for root in roots {
let mut should_unwatch = false;
if let Some(count) = state.skills_root_ref_counts.get_mut(root) {
if *count > 1 {
*count -= 1;
} else {
state.skills_root_ref_counts.remove(root);
should_unwatch = true;
}
}
if !should_unwatch {
continue;
}
let Some(inner) = &self.inner else {
continue;
};
if inner_guard.is_none() {
let guard = inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
inner_guard = Some(guard);
}
let Some(guard) = inner_guard.as_mut() else {
continue;
};
if guard.watched_paths.remove(root).is_none() {
continue;
}
if let Err(err) = guard.watcher.unwatch(root) {
warn!("failed to unwatch {}: {err}", root.display());
}
}
self.watch_path(root, RecursiveMode::Recursive);
}
fn watch_path(&self, path: PathBuf, mode: RecursiveMode) {
@@ -218,10 +288,9 @@ impl FileWatcher {
return;
}
let watch_path = path;
let mut guard = match inner.lock() {
Ok(guard) => guard,
Err(err) => err.into_inner(),
};
let mut guard = inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(existing) = guard.watched_paths.get(&watch_path) {
if *existing == RecursiveMode::Recursive || *existing == mode {
return;
@@ -248,10 +317,18 @@ fn classify_event(event: &Event, state: &RwLock<WatchState>) -> Vec<PathBuf> {
let mut skills_paths = Vec::new();
let skills_roots = match state.read() {
Ok(state) => state.skills_roots.clone(),
Ok(state) => state
.skills_root_ref_counts
.keys()
.cloned()
.collect::<HashSet<_>>(),
Err(err) => {
let state = err.into_inner();
state.skills_roots.clone()
state
.skills_root_ref_counts
.keys()
.cloned()
.collect::<HashSet<_>>()
}
};
@@ -331,7 +408,7 @@ mod tests {
fn classify_event_filters_to_skills_roots() {
let root = path("/tmp/skills");
let state = RwLock::new(WatchState {
skills_roots: HashSet::from([root.clone()]),
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
});
let event = notify_event(
EventKind::Create(CreateKind::Any),
@@ -350,7 +427,7 @@ mod tests {
let root_a = path("/tmp/skills");
let root_b = path("/tmp/workspace/.codex/skills");
let state = RwLock::new(WatchState {
skills_roots: HashSet::from([root_a.clone(), root_b.clone()]),
skills_root_ref_counts: HashMap::from([(root_a.clone(), 1), (root_b.clone(), 1)]),
});
let event = notify_event(
EventKind::Modify(ModifyKind::Any),
@@ -372,7 +449,7 @@ mod tests {
fn classify_event_ignores_non_mutating_event_kinds() {
let root = path("/tmp/skills");
let state = RwLock::new(WatchState {
skills_roots: HashSet::from([root.clone()]),
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
});
let path = root.join("demo/SKILL.md");
@@ -398,7 +475,73 @@ mod tests {
watcher.register_skills_root(path("/tmp/other-skills"));
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_roots.len(), 2);
assert_eq!(state.skills_root_ref_counts.len(), 2);
}
#[test]
fn watch_registration_drop_unregisters_roots() {
let watcher = Arc::new(FileWatcher::noop());
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
let registration = WatchRegistration {
file_watcher: Arc::downgrade(&watcher),
roots: vec![root],
};
drop(registration);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.len(), 0);
}
#[test]
fn unregister_holds_state_lock_until_unwatch_finishes() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let root = temp_dir.path().join("skills");
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher"));
watcher.register_skills_root(root.clone());
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner_guard = inner.lock().expect("inner lock");
let unregister_watcher = Arc::clone(&watcher);
let unregister_root = root.clone();
let unregister_thread = std::thread::spawn(move || {
unregister_watcher.unregister_roots(&[unregister_root]);
});
let state_lock_observed = (0..100).any(|_| {
let locked = watcher.state.try_write().is_err();
if !locked {
std::thread::sleep(Duration::from_millis(10));
}
locked
});
assert_eq!(state_lock_observed, true);
let register_watcher = Arc::clone(&watcher);
let register_root = root.clone();
let register_thread = std::thread::spawn(move || {
register_watcher.register_skills_root(register_root);
});
drop(inner_guard);
unregister_thread.join().expect("unregister join");
register_thread.join().expect("register join");
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.get(&root), Some(&1));
drop(state);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(
inner.watched_paths.get(&root),
Some(&RecursiveMode::Recursive)
);
}
#[tokio::test]
@@ -407,7 +550,7 @@ mod tests {
let root = path("/tmp/skills");
{
let mut state = watcher.state.write().expect("state lock");
state.skills_roots.insert(root.clone());
state.skills_root_ref_counts.insert(root.clone(), 1);
}
let (raw_tx, raw_rx) = mpsc::unbounded_channel();

View File

@@ -491,7 +491,7 @@ impl ThreadManagerState {
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.file_watcher.register_config(&config);
let watch_registration = self.file_watcher.register_config(&config);
let CodexSpawnOk {
codex, thread_id, ..
} = Codex::spawn(
@@ -507,13 +507,15 @@ impl ThreadManagerState {
persist_extended_history,
)
.await?;
self.finalize_thread_spawn(codex, thread_id).await
self.finalize_thread_spawn(codex, thread_id, watch_registration)
.await
}
async fn finalize_thread_spawn(
&self,
codex: Codex,
thread_id: ThreadId,
watch_registration: crate::file_watcher::WatchRegistration,
) -> CodexResult<NewThread> {
let event = codex.next_event().await?;
let session_configured = match event {
@@ -529,6 +531,7 @@ impl ThreadManagerState {
let thread = Arc::new(CodexThread::new(
codex,
session_configured.rollout_path.clone(),
watch_registration,
));
let mut threads = self.threads.write().await;
threads.insert(thread_id, thread.clone());