app-server: Move watch_id to request of fs/watch (#17026)

It's easier for clients to maintain watchers if they define the watch
id, so move it into the request.
It's not used yet, so should be a safe change.
This commit is contained in:
Ruslan Nigmatullin
2026-04-07 11:22:28 -07:00
committed by GitHub
parent 252d79f5eb
commit 8a13f82204
17 changed files with 171 additions and 113 deletions

View File

@@ -1,3 +1,4 @@
use crate::fs_api::invalid_request;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::FsChangedNotification;
@@ -16,6 +17,7 @@ use codex_core::file_watcher::WatchRegistration;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::hash::Hash;
use std::path::PathBuf;
use std::sync::Arc;
@@ -26,7 +28,6 @@ use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Instant;
use tracing::warn;
use uuid::Uuid;
const FS_CHANGED_NOTIFICATION_DEBOUNCE: Duration = Duration::from_millis(200);
@@ -120,7 +121,11 @@ impl FsWatchManager {
connection_id: ConnectionId,
params: FsWatchParams,
) -> Result<FsWatchResponse, JSONRPCErrorError> {
let watch_id = Uuid::now_v7().to_string();
let watch_id = params.watch_id;
let watch_key = WatchKey {
connection_id,
watch_id: watch_id.clone(),
};
let outgoing = self.outgoing.clone();
let (subscriber, rx) = self.file_watcher.add_subscriber();
let watch_root = params.path.to_path_buf().clone();
@@ -130,17 +135,20 @@ impl FsWatchManager {
}]);
let (terminate_tx, terminate_rx) = oneshot::channel();
self.state.lock().await.entries.insert(
WatchKey {
connection_id,
watch_id: watch_id.clone(),
},
WatchEntry {
terminate_tx,
_subscriber: subscriber,
_registration: registration,
},
);
match self.state.lock().await.entries.entry(watch_key) {
Entry::Occupied(_) => {
return Err(invalid_request(format!(
"watchId already exists: {watch_id}"
)));
}
Entry::Vacant(entry) => {
entry.insert(WatchEntry {
terminate_tx,
_subscriber: subscriber,
_registration: registration,
});
}
}
let task_watch_id = watch_id.clone();
tokio::spawn(async move {
@@ -175,10 +183,7 @@ impl FsWatchManager {
}
});
Ok(FsWatchResponse {
watch_id,
path: params.path,
})
Ok(FsWatchResponse { path: params.path })
}
pub(crate) async fn unwatch(
@@ -216,7 +221,6 @@ mod tests {
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use uuid::Version;
fn absolute_path(path: PathBuf) -> AbsolutePathBuf {
assert!(
@@ -237,28 +241,33 @@ mod tests {
}
#[tokio::test]
async fn watch_returns_a_v7_id_and_tracks_the_owner_scoped_entry() {
async fn watch_uses_client_id_and_tracks_the_owner_scoped_entry() {
let temp_dir = TempDir::new().expect("temp dir");
let head_path = temp_dir.path().join("HEAD");
std::fs::write(&head_path, "ref: refs/heads/main\n").expect("write HEAD");
let manager = manager_with_noop_watcher();
let path = absolute_path(head_path);
let watch_id = "watch-head".to_string();
let response = manager
.watch(ConnectionId(1), FsWatchParams { path: path.clone() })
.watch(
ConnectionId(1),
FsWatchParams {
watch_id: watch_id.clone(),
path: path.clone(),
},
)
.await
.expect("watch should succeed");
assert_eq!(response.path, path);
let watch_id = Uuid::parse_str(&response.watch_id).expect("watch id should be a UUID");
assert_eq!(watch_id.get_version(), Some(Version::SortRand));
let state = manager.state.lock().await;
assert_eq!(
state.entries.keys().cloned().collect::<HashSet<_>>(),
HashSet::from([WatchKey {
connection_id: ConnectionId(1),
watch_id: response.watch_id,
watch_id,
}])
);
}
@@ -270,10 +279,11 @@ mod tests {
std::fs::write(&head_path, "ref: refs/heads/main\n").expect("write HEAD");
let manager = manager_with_noop_watcher();
let response = manager
manager
.watch(
ConnectionId(1),
FsWatchParams {
watch_id: "watch-head".to_string(),
path: absolute_path(head_path),
},
)
@@ -281,14 +291,14 @@ mod tests {
.expect("watch should succeed");
let watch_key = WatchKey {
connection_id: ConnectionId(1),
watch_id: response.watch_id.clone(),
watch_id: "watch-head".to_string(),
};
manager
.unwatch(
ConnectionId(2),
FsUnwatchParams {
watch_id: response.watch_id.clone(),
watch_id: "watch-head".to_string(),
},
)
.await
@@ -299,7 +309,7 @@ mod tests {
.unwatch(
ConnectionId(1),
FsUnwatchParams {
watch_id: response.watch_id,
watch_id: "watch-head".to_string(),
},
)
.await
@@ -307,6 +317,41 @@ mod tests {
assert!(!manager.state.lock().await.entries.contains_key(&watch_key));
}
#[tokio::test]
async fn watch_rejects_duplicate_id_for_the_same_connection() {
let temp_dir = TempDir::new().expect("temp dir");
let head_path = temp_dir.path().join("HEAD");
let fetch_head_path = temp_dir.path().join("FETCH_HEAD");
std::fs::write(&head_path, "ref: refs/heads/main\n").expect("write HEAD");
std::fs::write(&fetch_head_path, "old-fetch\n").expect("write FETCH_HEAD");
let manager = manager_with_noop_watcher();
manager
.watch(
ConnectionId(1),
FsWatchParams {
watch_id: "watch-head".to_string(),
path: absolute_path(head_path),
},
)
.await
.expect("first watch should succeed");
let error = manager
.watch(
ConnectionId(1),
FsWatchParams {
watch_id: "watch-head".to_string(),
path: absolute_path(fetch_head_path),
},
)
.await
.expect_err("duplicate watch should fail");
assert_eq!(error.message, "watchId already exists: watch-head");
assert_eq!(manager.state.lock().await.entries.len(), 1);
}
#[tokio::test]
async fn connection_closed_removes_only_that_connections_watches() {
let temp_dir = TempDir::new().expect("temp dir");
@@ -318,28 +363,31 @@ mod tests {
std::fs::write(&packed_refs_path, "refs\n").expect("write packed-refs");
let manager = manager_with_noop_watcher();
let response_1 = manager
let response = manager
.watch(
ConnectionId(1),
FsWatchParams {
path: absolute_path(head_path),
watch_id: "watch-head".to_string(),
path: absolute_path(head_path.clone()),
},
)
.await
.expect("first watch should succeed");
let response_2 = manager
manager
.watch(
ConnectionId(1),
FsWatchParams {
watch_id: "watch-fetch-head".to_string(),
path: absolute_path(fetch_head_path),
},
)
.await
.expect("second watch should succeed");
let response_3 = manager
manager
.watch(
ConnectionId(2),
FsWatchParams {
watch_id: "watch-packed-refs".to_string(),
path: absolute_path(packed_refs_path),
},
)
@@ -359,9 +407,9 @@ mod tests {
.collect::<HashSet<_>>(),
HashSet::from([WatchKey {
connection_id: ConnectionId(2),
watch_id: response_3.watch_id,
watch_id: "watch-packed-refs".to_string(),
}])
);
assert_ne!(response_1.watch_id, response_2.watch_id);
assert_eq!(response.path, absolute_path(head_path));
}
}