Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
186a149e6f Deflake app-server fs watch integration tests
Reduce the app-server fs/watch subprocess tests to deterministic request/response coverage so CI no longer blocks on optional kernel file watcher events. Add deterministic manager-level coverage for converting watcher events into fs/changed notifications, delivering them through the app-server outgoing channel, and stopping the forwarding task before fs/unwatch returns.

Co-authored-by: Codex <noreply@openai.com>
2026-04-20 09:55:50 -07:00
2 changed files with 350 additions and 133 deletions

View File

@@ -14,6 +14,7 @@ use codex_core::file_watcher::FileWatcherSubscriber;
use codex_core::file_watcher::Receiver;
use codex_core::file_watcher::WatchPath;
use codex_core::file_watcher::WatchRegistration;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
@@ -25,11 +26,17 @@ use tokio::sync::Mutex as AsyncMutex;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::warn;
const FS_CHANGED_NOTIFICATION_DEBOUNCE: Duration = Duration::from_millis(200);
// FileWatcher can emit multiple low-level events for one user-visible edit
// (for example, write + rename + metadata updates). App-server clients only
// need a coarse "something under this watch changed" signal, so this receiver
// batches paths until the debounce window closes and then emits one sorted
// notification payload.
struct DebouncedReceiver {
rx: Receiver,
interval: Duration,
@@ -82,8 +89,36 @@ struct FsWatchState {
struct WatchEntry {
terminate_tx: oneshot::Sender<oneshot::Sender<()>>,
_subscriber: FileWatcherSubscriber,
_registration: WatchRegistration,
_registration: WatchRegistrationGuard,
}
// A watch entry owns both halves of the core watcher subscription:
// - `_subscriber` keeps the per-client receiver alive.
// - `_registration` keeps the path registered with the shared FileWatcher.
// Dropping the entry unregisters the path and closes the receiver.
enum WatchRegistrationGuard {
Core {
_subscriber: FileWatcherSubscriber,
_registration: WatchRegistration,
},
#[cfg(test)]
Synthetic,
}
enum FsWatchEventReceiver {
Core(DebouncedReceiver),
#[cfg(test)]
Synthetic(mpsc::Receiver<FileWatcherEvent>),
}
impl FsWatchEventReceiver {
async fn recv(&mut self) -> Option<FileWatcherEvent> {
match self {
Self::Core(rx) => rx.recv().await,
#[cfg(test)]
Self::Synthetic(rx) => rx.recv().await,
}
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
@@ -119,21 +154,47 @@ impl FsWatchManager {
&self,
connection_id: ConnectionId,
params: FsWatchParams,
) -> Result<FsWatchResponse, JSONRPCErrorError> {
let outgoing = self.outgoing.clone();
let (subscriber, rx) = self.file_watcher.add_subscriber();
// fs/watch registers the requested path as non-recursive. The core
// watcher owns the platform-specific matching rules for which backend
// event paths are forwarded to this app-server watch.
let registration = subscriber.register_paths(vec![WatchPath {
path: params.path.to_path_buf(),
recursive: false,
}]);
self.watch_with_event_receiver(
connection_id,
params,
FsWatchEventReceiver::Core(DebouncedReceiver::new(
rx,
FS_CHANGED_NOTIFICATION_DEBOUNCE,
)),
WatchRegistrationGuard::Core {
_subscriber: subscriber,
_registration: registration,
},
outgoing,
)
.await
}
async fn watch_with_event_receiver(
&self,
connection_id: ConnectionId,
params: FsWatchParams,
rx: FsWatchEventReceiver,
registration: WatchRegistrationGuard,
outgoing: Arc<OutgoingMessageSender>,
) -> Result<FsWatchResponse, JSONRPCErrorError> {
let watch_id = params.watch_id;
let watch_key = WatchKey {
connection_id,
watch_id: watch_id.clone(),
};
let outgoing = self.outgoing.clone();
let (subscriber, rx) = self.file_watcher.add_subscriber();
let watch_root = params.path.clone();
let registration = subscriber.register_paths(vec![WatchPath {
path: params.path.to_path_buf(),
recursive: false,
}]);
let (terminate_tx, terminate_rx) = oneshot::channel();
match self.state.lock().await.entries.entry(watch_key) {
Entry::Occupied(_) => {
return Err(invalid_request(format!(
@@ -143,44 +204,20 @@ impl FsWatchManager {
Entry::Vacant(entry) => {
entry.insert(WatchEntry {
terminate_tx,
_subscriber: subscriber,
_registration: registration,
});
}
}
let task_watch_id = watch_id.clone();
tokio::spawn(async move {
let mut rx = DebouncedReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE);
tokio::pin!(terminate_rx);
loop {
let event = tokio::select! {
biased;
_ = &mut terminate_rx => break,
event = rx.recv() => match event {
Some(event) => event,
None => break,
},
};
let mut changed_paths = event
.paths
.into_iter()
.map(|path| watch_root.join(path))
.collect::<Vec<_>>();
changed_paths.sort_by(|left, right| left.as_path().cmp(right.as_path()));
if !changed_paths.is_empty() {
outgoing
.send_server_notification_to_connection_and_wait(
connection_id,
ServerNotification::FsChanged(FsChangedNotification {
watch_id: task_watch_id.clone(),
changed_paths,
}),
)
.await;
}
}
});
spawn_watch_forwarding_task(
outgoing,
connection_id,
task_watch_id,
watch_root,
rx,
terminate_rx,
);
Ok(FsWatchResponse { path: params.path })
}
@@ -197,7 +234,7 @@ impl FsWatchManager {
let entry = self.state.lock().await.entries.remove(&watch_key);
if let Some(entry) = entry {
// Wait for the oneshot to be destroyed by the task to ensure that no notifications
// are send after the unwatch response.
// are sent after the unwatch response.
let (done_tx, done_rx) = oneshot::channel();
let _ = entry.terminate_tx.send(done_tx);
let _ = done_rx.await;
@@ -214,12 +251,70 @@ impl FsWatchManager {
}
}
fn spawn_watch_forwarding_task(
outgoing: Arc<OutgoingMessageSender>,
connection_id: ConnectionId,
watch_id: String,
watch_root: AbsolutePathBuf,
mut rx: FsWatchEventReceiver,
terminate_rx: oneshot::Receiver<oneshot::Sender<()>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
// Each watch owns one forwarding task. It exits when the client
// unwatches, the connection closes and drops the entry, or the
// underlying FileWatcher subscriber closes.
tokio::pin!(terminate_rx);
loop {
let event = tokio::select! {
biased;
_ = &mut terminate_rx => break,
event = rx.recv() => match event {
Some(event) => event,
None => break,
},
};
if let Some(notification) = fs_changed_notification(&watch_id, &watch_root, event) {
outgoing
.send_server_notification_to_connection_and_wait(
connection_id,
ServerNotification::FsChanged(notification),
)
.await;
}
}
})
}
fn fs_changed_notification(
watch_id: &str,
watch_root: &AbsolutePathBuf,
event: FileWatcherEvent,
) -> Option<FsChangedNotification> {
// Absolute backend paths are preserved by AbsolutePathBuf::join, while
// relative synthetic/test paths are resolved against the logical watch
// root. Sort so repeated event batches are stable.
let mut changed_paths = event
.paths
.into_iter()
.map(|path| watch_root.join(path))
.collect::<Vec<_>>();
changed_paths.sort_by(|left, right| left.as_path().cmp(right.as_path()));
(!changed_paths.is_empty()).then(|| FsChangedNotification {
watch_id: watch_id.to_string(),
changed_paths,
})
}
#[cfg(test)]
mod tests {
use super::*;
use codex_utils_absolute_path::AbsolutePathBuf;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use pretty_assertions::assert_eq;
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::time::timeout;
fn absolute_path(path: PathBuf) -> AbsolutePathBuf {
assert!(
@@ -239,6 +334,67 @@ mod tests {
)
}
fn manager_with_outgoing_rx() -> (FsWatchManager, mpsc::Receiver<OutgoingEnvelope>) {
const OUTGOING_BUFFER: usize = 4;
let (tx, rx) = mpsc::channel(OUTGOING_BUFFER);
(
FsWatchManager::new_with_file_watcher(
Arc::new(OutgoingMessageSender::new(tx)),
Arc::new(FileWatcher::noop()),
),
rx,
)
}
async fn watch_with_synthetic_events(
manager: &FsWatchManager,
connection_id: ConnectionId,
params: FsWatchParams,
) -> Result<(FsWatchResponse, mpsc::Sender<FileWatcherEvent>), JSONRPCErrorError> {
let outgoing = manager.outgoing.clone();
let (event_tx, event_rx) = mpsc::channel(4);
let response = manager
.watch_with_event_receiver(
connection_id,
params,
FsWatchEventReceiver::Synthetic(event_rx),
WatchRegistrationGuard::Synthetic,
outgoing,
)
.await?;
Ok((response, event_tx))
}
async fn recv_fs_changed_notification(
rx: &mut mpsc::Receiver<OutgoingEnvelope>,
expected_connection_id: ConnectionId,
) -> FsChangedNotification {
let envelope = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("should receive outgoing envelope before timeout")
.expect("outgoing channel should be open");
let OutgoingEnvelope::ToConnection {
connection_id,
message,
write_complete_tx,
} = envelope
else {
panic!("expected targeted outgoing envelope");
};
assert_eq!(connection_id, expected_connection_id);
let OutgoingMessage::AppServerNotification(ServerNotification::FsChanged(notification)) =
message
else {
panic!("expected fs/changed app-server notification");
};
write_complete_tx
.expect("write completion sender should be attached")
.send(())
.expect("forwarding task should still be waiting for write completion");
notification
}
#[tokio::test]
async fn watch_uses_client_id_and_tracks_the_owner_scoped_entry() {
let temp_dir = TempDir::new().expect("temp dir");
@@ -271,6 +427,155 @@ mod tests {
);
}
#[tokio::test]
async fn synthetic_watch_forwards_fs_changed_notification_to_connection() {
let temp_dir = TempDir::new().expect("temp dir");
let git_dir = temp_dir.path().join(".git");
std::fs::create_dir(&git_dir).expect("create .git dir");
let fetch_head_path = git_dir.join("FETCH_HEAD");
let (manager, mut rx) = manager_with_outgoing_rx();
let watch_root = absolute_path(git_dir);
let (response, event_tx) = watch_with_synthetic_events(
&manager,
ConnectionId(7),
FsWatchParams {
watch_id: "watch-git-dir".to_string(),
path: watch_root.clone(),
},
)
.await
.expect("synthetic watch should succeed");
assert_eq!(response.path, watch_root);
event_tx
.send(FileWatcherEvent {
paths: vec![fetch_head_path.clone()],
})
.await
.expect("watch forwarding task should receive synthetic event");
let notification = recv_fs_changed_notification(&mut rx, ConnectionId(7)).await;
assert_eq!(
notification,
FsChangedNotification {
watch_id: "watch-git-dir".to_string(),
changed_paths: vec![absolute_path(fetch_head_path)],
}
);
}
#[tokio::test]
async fn synthetic_unwatch_stops_forwarding_before_response_returns() {
let temp_dir = TempDir::new().expect("temp dir");
let git_dir = temp_dir.path().join(".git");
std::fs::create_dir(&git_dir).expect("create .git dir");
let fetch_head_path = git_dir.join("FETCH_HEAD");
let packed_refs_path = git_dir.join("packed-refs");
let (manager, mut rx) = manager_with_outgoing_rx();
let (_response, event_tx) = watch_with_synthetic_events(
&manager,
ConnectionId(7),
FsWatchParams {
watch_id: "watch-git-dir".to_string(),
path: absolute_path(git_dir),
},
)
.await
.expect("synthetic watch should succeed");
event_tx
.send(FileWatcherEvent {
paths: vec![fetch_head_path],
})
.await
.expect("watch forwarding task should receive synthetic event");
let _ = recv_fs_changed_notification(&mut rx, ConnectionId(7)).await;
manager
.unwatch(
ConnectionId(7),
FsUnwatchParams {
watch_id: "watch-git-dir".to_string(),
},
)
.await
.expect("unwatch should succeed");
let send_result = event_tx
.send(FileWatcherEvent {
paths: vec![packed_refs_path],
})
.await;
assert!(send_result.is_err());
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn fs_changed_notification_reports_file_events_for_the_watch_id() {
let temp_dir = TempDir::new().expect("temp dir");
let head_path = temp_dir.path().join("HEAD");
let watch_root = absolute_path(head_path.clone());
let notification = fs_changed_notification(
"watch-head",
&watch_root,
FileWatcherEvent {
paths: vec![head_path.clone()],
},
)
.expect("file event should produce fs/changed notification");
assert_eq!(
notification,
FsChangedNotification {
watch_id: "watch-head".to_string(),
changed_paths: vec![absolute_path(head_path)],
}
);
}
#[test]
fn fs_changed_notification_reports_directory_child_events() {
let temp_dir = TempDir::new().expect("temp dir");
let git_dir = temp_dir.path().join(".git");
let fetch_head = git_dir.join("FETCH_HEAD");
let watch_root = absolute_path(git_dir);
let notification = fs_changed_notification(
"watch-git-dir",
&watch_root,
FileWatcherEvent {
paths: vec![fetch_head.clone()],
},
)
.expect("child event should produce fs/changed notification");
assert_eq!(
notification,
FsChangedNotification {
watch_id: "watch-git-dir".to_string(),
changed_paths: vec![absolute_path(fetch_head)],
}
);
}
#[test]
fn fs_changed_notification_ignores_empty_events() {
let temp_dir = TempDir::new().expect("temp dir");
let watch_root = absolute_path(temp_dir.path().join(".git"));
assert_eq!(
fs_changed_notification(
"watch-git-dir",
&watch_root,
FileWatcherEvent { paths: Vec::new() },
),
None
);
}
#[tokio::test]
async fn unwatch_is_scoped_to_the_connection_that_created_the_watch() {
let temp_dir = TempDir::new().expect("temp dir");

View File

@@ -4,7 +4,6 @@ use app_test_support::McpProcess;
use app_test_support::to_response;
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use codex_app_server_protocol::FsChangedNotification;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryEntry;
@@ -12,7 +11,6 @@ use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsUnwatchParams;
use codex_app_server_protocol::FsWatchResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
@@ -653,13 +651,10 @@ async fn fs_copy_rejects_standalone_fifo_source() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fs_watch_directory_reports_changed_child_paths_and_unwatch_stops_notifications()
-> Result<()> {
async fn fs_watch_directory_and_unwatch_succeed() -> Result<()> {
let codex_home = TempDir::new()?;
let git_dir = codex_home.path().join("repo").join(".git");
let fetch_head = git_dir.join("FETCH_HEAD");
std::fs::create_dir_all(&git_dir)?;
std::fs::write(&fetch_head, "old\n")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let watch_id = "watch-git-dir".to_string();
@@ -678,26 +673,6 @@ async fn fs_watch_directory_reports_changed_child_paths_and_unwatch_stops_notifi
)?;
assert_eq!(watch_response.path, absolute_path(git_dir.clone()));
std::fs::write(&fetch_head, "updated\n")?;
// Kernel file watching is not reliable in every sandboxed test environment.
// Keep validating notification shape when the backend does emit, but do not
// fail the whole suite if no OS event arrives.
if let Some(changed) = maybe_fs_changed_notification(&mut mcp).await? {
assert_eq!(changed.watch_id, watch_id.clone());
assert_eq!(
changed.changed_paths,
vec![absolute_path(fetch_head.clone())]
);
}
while timeout(
Duration::from_millis(200),
mcp.read_stream_until_notification_message("fs/changed"),
)
.await
.is_ok()
{}
let unwatch_request_id = mcp
.send_fs_unwatch_request(FsUnwatchParams { watch_id })
.await?;
@@ -707,22 +682,11 @@ async fn fs_watch_directory_reports_changed_child_paths_and_unwatch_stops_notifi
)
.await??;
std::fs::write(git_dir.join("packed-refs"), "refs\n")?;
let maybe_notification = timeout(
Duration::from_millis(1500),
mcp.read_stream_until_notification_message("fs/changed"),
)
.await;
assert!(
maybe_notification.is_err(),
"fs/unwatch should stop future change notifications"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fs_watch_file_reports_atomic_replace_events() -> Result<()> {
async fn fs_watch_allows_file_targets() -> Result<()> {
let codex_home = TempDir::new()?;
let git_dir = codex_home.path().join("repo").join(".git");
let head_path = git_dir.join("HEAD");
@@ -746,18 +710,6 @@ async fn fs_watch_file_reports_atomic_replace_events() -> Result<()> {
)?;
assert_eq!(watch_response.path, absolute_path(head_path.clone()));
replace_file_atomically(&head_path, "ref: refs/heads/feature\n")?;
if let Some(changed) = maybe_fs_changed_notification(&mut mcp).await? {
assert_eq!(
changed,
FsChangedNotification {
watch_id,
changed_paths: vec![absolute_path(head_path.clone())],
}
);
}
Ok(())
}
@@ -785,18 +737,6 @@ async fn fs_watch_allows_missing_file_targets() -> Result<()> {
)?;
assert_eq!(watch_response.path, absolute_path(fetch_head.clone()));
replace_file_atomically(&fetch_head, "origin/main\n")?;
if let Some(changed) = maybe_fs_changed_notification(&mut mcp).await? {
assert_eq!(
changed,
FsChangedNotification {
watch_id,
changed_paths: vec![absolute_path(fetch_head.clone())],
}
);
}
Ok(())
}
@@ -820,31 +760,3 @@ async fn fs_watch_rejects_relative_paths() -> Result<()> {
Ok(())
}
fn fs_changed_notification(notification: JSONRPCNotification) -> Result<FsChangedNotification> {
let params = notification
.params
.context("fs/changed notification should include params")?;
Ok(serde_json::from_value::<FsChangedNotification>(params)?)
}
async fn maybe_fs_changed_notification(
mcp: &mut McpProcess,
) -> Result<Option<FsChangedNotification>> {
match timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("fs/changed"),
)
.await
{
Ok(notification) => Ok(Some(fs_changed_notification(notification?)?)),
Err(_) => Ok(None),
}
}
fn replace_file_atomically(path: &PathBuf, contents: &str) -> Result<()> {
let temp_path = path.with_extension("lock");
std::fs::write(&temp_path, contents)?;
std::fs::rename(temp_path, path)?;
Ok(())
}