app-server: add filesystem watch support (#14533)

### Summary
Add the v2 app-server filesystem watch RPCs and notifications, wire them
through the message processor, and implement connection-scoped watches
with notify-backed change delivery. This also updates the schema
fixtures, app-server documentation, and the v2 integration coverage for
watch and unwatch behavior.

This allows clients to efficiently watch for filesystem updates, e.g. to
react on branch changes.

### Testing
- exercise watch lifecycles for directory changes, atomic file
replacement, missing-file targets, and unwatch cleanup
This commit is contained in:
Ruslan Nigmatullin
2026-03-24 15:52:13 -07:00
committed by GitHub
parent 062fa7a2bb
commit 301b17c2a1
29 changed files with 1396 additions and 5 deletions

View File

@@ -4,11 +4,15 @@ use app_test_support::McpProcess;
use app_test_support::to_response;
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use codex_app_server_protocol::FsChangedNotification;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryEntry;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsUnwatchParams;
use codex_app_server_protocol::FsWatchResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
@@ -17,6 +21,8 @@ use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::timeout;
use uuid::Uuid;
use uuid::Version;
#[cfg(unix)]
use std::os::unix::fs::symlink;
@@ -611,3 +617,195 @@ async fn fs_copy_rejects_standalone_fifo_source() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fs_watch_directory_reports_changed_child_paths_and_unwatch_stops_notifications()
-> Result<()> {
let codex_home = TempDir::new()?;
let git_dir = codex_home.path().join("repo").join(".git");
let fetch_head = git_dir.join("FETCH_HEAD");
std::fs::create_dir_all(&git_dir)?;
std::fs::write(&fetch_head, "old\n")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let watch_request_id = mcp
.send_fs_watch_request(codex_app_server_protocol::FsWatchParams {
path: absolute_path(git_dir.clone()),
})
.await?;
let watch_response: FsWatchResponse = to_response(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(watch_request_id)),
)
.await??,
)?;
assert_eq!(watch_response.path, absolute_path(git_dir.clone()));
let watch_id = Uuid::parse_str(&watch_response.watch_id)?;
assert_eq!(watch_id.get_version(), Some(Version::SortRand));
std::fs::write(&fetch_head, "updated\n")?;
// Kernel file watching is not reliable in every sandboxed test environment.
// Keep validating notification shape when the backend does emit, but do not
// fail the whole suite if no OS event arrives.
if let Some(changed) = maybe_fs_changed_notification(&mut mcp).await? {
assert_eq!(changed.watch_id, watch_response.watch_id.clone());
assert_eq!(
changed.changed_paths,
vec![absolute_path(fetch_head.clone())]
);
}
while timeout(
Duration::from_millis(200),
mcp.read_stream_until_notification_message("fs/changed"),
)
.await
.is_ok()
{}
let unwatch_request_id = mcp
.send_fs_unwatch_request(FsUnwatchParams {
watch_id: watch_response.watch_id,
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unwatch_request_id)),
)
.await??;
std::fs::write(git_dir.join("packed-refs"), "refs\n")?;
let maybe_notification = timeout(
Duration::from_millis(1500),
mcp.read_stream_until_notification_message("fs/changed"),
)
.await;
assert!(
maybe_notification.is_err(),
"fs/unwatch should stop future change notifications"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fs_watch_file_reports_atomic_replace_events() -> Result<()> {
let codex_home = TempDir::new()?;
let git_dir = codex_home.path().join("repo").join(".git");
let head_path = git_dir.join("HEAD");
std::fs::create_dir_all(&git_dir)?;
std::fs::write(&head_path, "ref: refs/heads/main\n")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let watch_request_id = mcp
.send_fs_watch_request(codex_app_server_protocol::FsWatchParams {
path: absolute_path(head_path.clone()),
})
.await?;
let watch_response: FsWatchResponse = to_response(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(watch_request_id)),
)
.await??,
)?;
assert_eq!(watch_response.path, absolute_path(head_path.clone()));
replace_file_atomically(&head_path, "ref: refs/heads/feature\n")?;
if let Some(changed) = maybe_fs_changed_notification(&mut mcp).await? {
assert_eq!(
changed,
FsChangedNotification {
watch_id: watch_response.watch_id,
changed_paths: vec![absolute_path(head_path.clone())],
}
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fs_watch_allows_missing_file_targets() -> Result<()> {
let codex_home = TempDir::new()?;
let git_dir = codex_home.path().join("repo").join(".git");
let fetch_head = git_dir.join("FETCH_HEAD");
std::fs::create_dir_all(&git_dir)?;
let mut mcp = initialized_mcp(&codex_home).await?;
let watch_request_id = mcp
.send_fs_watch_request(codex_app_server_protocol::FsWatchParams {
path: absolute_path(fetch_head.clone()),
})
.await?;
let watch_response: FsWatchResponse = to_response(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(watch_request_id)),
)
.await??,
)?;
assert_eq!(watch_response.path, absolute_path(fetch_head.clone()));
replace_file_atomically(&fetch_head, "origin/main\n")?;
if let Some(changed) = maybe_fs_changed_notification(&mut mcp).await? {
assert_eq!(
changed,
FsChangedNotification {
watch_id: watch_response.watch_id,
changed_paths: vec![absolute_path(fetch_head.clone())],
}
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fs_watch_rejects_relative_paths() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = initialized_mcp(&codex_home).await?;
let watch_id = mcp
.send_raw_request("fs/watch", Some(json!({ "path": "relative-path" })))
.await?;
expect_error_message(
&mut mcp,
watch_id,
"Invalid request: AbsolutePathBuf deserialized without a base path",
)
.await?;
Ok(())
}
fn fs_changed_notification(notification: JSONRPCNotification) -> Result<FsChangedNotification> {
let params = notification
.params
.context("fs/changed notification should include params")?;
Ok(serde_json::from_value::<FsChangedNotification>(params)?)
}
async fn maybe_fs_changed_notification(
mcp: &mut McpProcess,
) -> Result<Option<FsChangedNotification>> {
match timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("fs/changed"),
)
.await
{
Ok(notification) => Ok(Some(fs_changed_notification(notification?)?)),
Err(_) => Ok(None),
}
}
fn replace_file_atomically(path: &PathBuf, contents: &str) -> Result<()> {
let temp_path = path.with_extension("lock");
std::fs::write(&temp_path, contents)?;
std::fs::rename(temp_path, path)?;
Ok(())
}