test(app-server): add findFilesStream integration scenarios

Cover 10 scenarios: single match; empty query; empty roots; no matches; multi-root merge; same token updates request id/query; same token + different roots cancels old stream; per-root limit; chunking over chunk size; sorted unique indices.

Also fix a stream completion race by tracking walker liveness in codex-file-search’s DebouncedSearchManager/SearchManager.
This commit is contained in:
Dylan Hurd
2026-01-26 16:11:41 -07:00
parent 1d67af9f04
commit a91e974efb
4 changed files with 500 additions and 12 deletions

View File

@@ -573,6 +573,23 @@ impl McpProcess {
self.send_request("fuzzyFileSearch", Some(params)).await
}
/// Send a `findFilesStream` JSON-RPC request.
pub async fn send_find_files_stream_request(
&mut self,
query: &str,
roots: Vec<String>,
cancellation_token: Option<String>,
) -> anyhow::Result<i64> {
let mut params = serde_json::json!({
"query": query,
"roots": roots,
});
if let Some(token) = cancellation_token {
params["cancellationToken"] = serde_json::json!(token);
}
self.send_request("findFilesStream", Some(params)).await
}
async fn send_request(
&mut self,
method: &str,

View File

@@ -0,0 +1,441 @@
use std::collections::BTreeSet;
use std::time::Duration;
use anyhow::Result;
use anyhow::anyhow;
use app_test_support::McpProcess;
use codex_app_server_protocol::FindFilesStreamChunkNotification;
use codex_app_server_protocol::FindFilesStreamResponse;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
const CHUNK_METHOD: &str = "findFilesStream/chunk";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_single_root_single_match() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root.path().join("beta.rs"), "fn beta() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_path = root.path().to_string_lossy().to_string();
let request_id = mcp
.send_find_files_stream_request("alp", vec![root_path.clone()], None)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
assert_eq!(files.len(), 1, "files={files:?}");
assert_eq!(files[0].root, root_path);
assert_eq!(files[0].path, "alpha.rs");
assert_eq!(files[0].file_name, "alpha.rs");
assert!(files[0].indices.is_some(), "expected indices for match");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_empty_query_emits_single_empty_chunk() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request("", vec![root.path().to_string_lossy().to_string()], None)
.await?;
let response = read_response(&mut mcp, request_id).await?;
let parsed: FindFilesStreamResponse = serde_json::from_value(response.result)?;
assert_eq!(parsed, FindFilesStreamResponse {});
let (chunks, mismatched_count) = collect_chunks_until_complete(&mut mcp, request_id).await?;
assert_eq!(mismatched_count, 0, "unexpected mismatched notifications");
assert_eq!(chunks.len(), 1, "chunks={chunks:?}");
let chunk = &chunks[0];
assert_eq!(chunk.files.len(), 0);
assert_eq!(chunk.total_match_count, 0);
assert_eq!(chunk.chunk_index, 0);
assert_eq!(chunk.chunk_count, 1);
assert!(!chunk.running);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_empty_roots_emits_single_empty_chunk() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request("alp", Vec::new(), None)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
assert_eq!(chunks.len(), 1, "chunks={chunks:?}");
let chunk = &chunks[0];
assert_eq!(chunk.files.len(), 0);
assert_eq!(chunk.total_match_count, 0);
assert_eq!(chunk.chunk_count, 1);
assert!(!chunk.running);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_no_matches_returns_empty_files() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"zzz",
vec![root.path().to_string_lossy().to_string()],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let chunk = chunks
.iter()
.find(|chunk| chunk.chunk_index == 0)
.ok_or_else(|| anyhow!("missing chunk 0"))?;
assert_eq!(chunk.files.len(), 0);
assert_eq!(chunk.total_match_count, 0);
assert!(!chunk.running);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_merges_results_across_roots() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
std::fs::write(root_a.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root_b.path().join("alpine.rs"), "fn alpine() {}")?;
std::fs::write(root_b.path().join("beta.rs"), "fn beta() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_a_path = root_a.path().to_string_lossy().to_string();
let root_b_path = root_b.path().to_string_lossy().to_string();
let request_id = mcp
.send_find_files_stream_request("alp", vec![root_a_path.clone(), root_b_path.clone()], None)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
let observed: BTreeSet<(String, String)> = files
.into_iter()
.map(|file| (file.root, file.path))
.collect();
let expected: BTreeSet<(String, String)> = BTreeSet::from([
(root_a_path, "alpha.rs".to_string()),
(root_b_path, "alpine.rs".to_string()),
]);
assert_eq!(observed, expected);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_same_token_updates_request_id_and_query() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root.path().join("beta.rs"), "fn beta() {}")?;
// Create enough extra files to keep the stream active while we issue a follow-up query.
write_matching_files(root.path(), "alpha-extra", 150)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_path = root.path().to_string_lossy().to_string();
let token = "shared-token".to_string();
let first_request_id = mcp
.send_find_files_stream_request("alp", vec![root_path.clone()], Some(token.clone()))
.await?;
let _first_response = read_response(&mut mcp, first_request_id).await?;
let second_request_id = mcp
.send_find_files_stream_request("bet", vec![root_path.clone()], Some(token))
.await?;
let (chunks, _mismatched_count) =
collect_chunks_until_complete(&mut mcp, second_request_id).await?;
assert_eq!(
chunks[0].request_id,
RequestId::Integer(second_request_id),
"expected notifications to adopt latest request id"
);
assert_eq!(chunks[0].query, "bet");
let files = flatten_files(&chunks);
assert!(files.iter().any(|file| file.path == "beta.rs"));
assert!(
chunks
.iter()
.all(|chunk| chunk.request_id == RequestId::Integer(second_request_id))
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_same_token_with_different_roots_cancels_old_stream() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
std::fs::write(root_a.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root_b.path().join("beta.rs"), "fn beta() {}")?;
write_matching_files(root_a.path(), "alpha-extra", 120)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let token = "root-swap-token".to_string();
let root_a_path = root_a.path().to_string_lossy().to_string();
let root_b_path = root_b.path().to_string_lossy().to_string();
let first_request_id = mcp
.send_find_files_stream_request("alp", vec![root_a_path], Some(token.clone()))
.await?;
let _first_response = read_response(&mut mcp, first_request_id).await?;
let second_request_id = mcp
.send_find_files_stream_request("alp", vec![root_b_path.clone()], Some(token))
.await?;
let (chunks, _mismatched_count) =
collect_chunks_until_complete(&mut mcp, second_request_id).await?;
let files = flatten_files(&chunks);
assert!(files.iter().all(|file| file.root == root_b_path));
assert!(files.iter().all(|file| file.path != "alpha.rs"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_enforces_limit_per_root() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
write_matching_files(root.path(), "limit-case", 60)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"limit-case",
vec![root.path().to_string_lossy().to_string()],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
assert_eq!(
files.len(),
50,
"expected limit-per-root to cap emitted matches"
);
assert!(
chunks[0].total_match_count >= 60,
"expected total match count to reflect all matches"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_chunks_results_when_over_chunk_size() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
let root_c = TempDir::new()?;
write_matching_files(root_a.path(), "chunk-case", 55)?;
write_matching_files(root_b.path(), "chunk-case", 55)?;
write_matching_files(root_c.path(), "chunk-case", 55)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"chunk-case",
vec![
root_a.path().to_string_lossy().to_string(),
root_b.path().to_string_lossy().to_string(),
root_c.path().to_string_lossy().to_string(),
],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let chunk_indices: BTreeSet<usize> = chunks.iter().map(|chunk| chunk.chunk_index).collect();
assert_eq!(chunks[0].chunk_count, 2);
assert_eq!(chunk_indices, BTreeSet::from([0, 1]));
assert_eq!(flatten_files(&chunks).len(), 150);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_emits_sorted_unique_indices() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("abcde.rs"), "fn main() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"ace",
vec![root.path().to_string_lossy().to_string()],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
assert_eq!(files.len(), 1, "files={files:?}");
let indices = files[0]
.indices
.clone()
.ok_or_else(|| anyhow!("missing indices"))?;
assert_eq!(indices, vec![0, 2, 4]);
assert!(is_sorted_unique(&indices));
Ok(())
}
async fn collect_final_chunks(
mcp: &mut McpProcess,
request_id: i64,
) -> anyhow::Result<Vec<FindFilesStreamChunkNotification>> {
let _response = read_response(mcp, request_id).await?;
let (chunks, mismatched_count) = collect_chunks_until_complete(mcp, request_id).await?;
if mismatched_count != 0 {
anyhow::bail!("saw {mismatched_count} notifications for other request ids");
}
Ok(chunks)
}
async fn read_response(mcp: &mut McpProcess, request_id: i64) -> anyhow::Result<JSONRPCResponse> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await?
}
async fn collect_chunks_until_complete(
mcp: &mut McpProcess,
request_id: i64,
) -> anyhow::Result<(Vec<FindFilesStreamChunkNotification>, usize)> {
let mut latest_query = String::new();
let mut latest_chunk_count = 0usize;
let mut latest_chunks = std::collections::BTreeMap::new();
let mut mismatched_count = 0usize;
loop {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message(CHUNK_METHOD),
)
.await??;
let chunk = parse_chunk(notification)?;
if chunk.request_id != RequestId::Integer(request_id) {
mismatched_count += 1;
continue;
}
if chunk.query != latest_query || chunk.chunk_count != latest_chunk_count {
latest_query.clear();
latest_query.push_str(&chunk.query);
latest_chunk_count = chunk.chunk_count;
latest_chunks.clear();
}
latest_chunks.insert(chunk.chunk_index, chunk.clone());
if !chunk.running && latest_chunks.len() == latest_chunk_count {
let chunks = latest_chunks.into_values().collect();
return Ok((chunks, mismatched_count));
}
}
}
fn parse_chunk(
notification: JSONRPCNotification,
) -> anyhow::Result<FindFilesStreamChunkNotification> {
let params = notification
.params
.ok_or_else(|| anyhow!("notification missing params"))?;
let chunk = serde_json::from_value::<FindFilesStreamChunkNotification>(params)?;
Ok(chunk)
}
fn flatten_files(
chunks: &[FindFilesStreamChunkNotification],
) -> Vec<codex_app_server_protocol::FuzzyFileSearchResult> {
let mut files = Vec::new();
for chunk in chunks {
files.extend(chunk.files.clone());
}
files
}
fn write_matching_files(root: &std::path::Path, prefix: &str, count: usize) -> Result<()> {
for index in 0..count {
let file_name = format!("{prefix}-{index:03}.rs");
std::fs::write(root.join(file_name), "fn main() {}")?;
}
Ok(())
}
fn is_sorted_unique(indices: &[u32]) -> bool {
indices.windows(2).all(|pair| pair[0] < pair[1])
}

View File

@@ -3,6 +3,7 @@ mod auth;
mod codex_message_processor_flow;
mod config;
mod create_thread;
mod find_file_stream;
mod fork_thread;
mod fuzzy_file_search;
mod interrupt;

View File

@@ -35,6 +35,7 @@ impl SearchItem {
pub struct SearchManager {
nucleo: Nucleo<SearchItem>,
cancel_flag: Arc<AtomicBool>,
walker_running: Arc<AtomicBool>,
walk_handle: Option<JoinHandle<()>>,
limit: NonZero<usize>,
compute_indices: bool,
@@ -59,6 +60,7 @@ impl SearchManager {
let search_directory_buf = search_directory.to_path_buf();
let override_matcher = build_override_matcher(search_directory, exclude)?;
let cancel_flag = Arc::new(AtomicBool::new(false));
let walker_running = Arc::new(AtomicBool::new(true));
let mut nucleo = Nucleo::new(
Config::DEFAULT,
notify,
@@ -74,12 +76,14 @@ impl SearchManager {
threads.get(),
override_matcher,
cancel_flag.clone(),
walker_running.clone(),
injector,
)?);
Ok(Self {
nucleo,
cancel_flag,
walker_running,
walk_handle,
limit,
compute_indices,
@@ -165,11 +169,32 @@ impl SearchManager {
self.cancel_flag.store(true, Ordering::Relaxed);
}
pub fn walker_running(&self) -> bool {
self.walker_running.load(Ordering::Relaxed)
}
pub fn search_directory(&self) -> &Path {
&self.search_directory
}
}
struct WalkerRunningGuard {
flag: Arc<AtomicBool>,
}
impl WalkerRunningGuard {
fn new(flag: Arc<AtomicBool>) -> Self {
flag.store(true, Ordering::Relaxed);
Self { flag }
}
}
impl Drop for WalkerRunningGuard {
fn drop(&mut self) {
self.flag.store(false, Ordering::Relaxed);
}
}
impl Drop for SearchManager {
fn drop(&mut self) {
self.cancel_flag.store(true, Ordering::Relaxed);
@@ -419,6 +444,7 @@ where
let mut last_sent_query: String = String::new();
let mut current_query = query.clone();
let mut sent_once = false;
let mut last_sent_running = false;
let start = std::time::Instant::now();
let mut last_progress = start;
@@ -450,23 +476,23 @@ where
let paths_changed = paths != last_sent_paths;
let timeout_elapsed = start.elapsed() >= config.first_result_timeout;
let walker_running = manager.walker_running();
let ui_running = walker_running || status.running || flag_was_set || status.changed;
let running_changed = sent_once && last_sent_running && !ui_running;
let should_emit = !cancellation_token.load(Ordering::Relaxed)
&& (paths_changed
|| current_query != last_sent_query
|| (!sent_once
&& (flag_was_set
|| status.changed
|| !status.running
|| timeout_elapsed)));
|| running_changed
|| (!sent_once && (flag_was_set || status.changed || timeout_elapsed)));
if should_emit {
let ui_running = status.running || flag_was_set || status.changed;
callback(current_query.clone(), results.clone(), ui_running);
sent_once = true;
last_sent_paths = paths;
last_sent_query.clear();
last_sent_query.push_str(&current_query);
last_sent_running = ui_running;
last_progress = std::time::Instant::now();
}
@@ -474,17 +500,18 @@ where
break;
}
if !status.running && !flag_was_set {
if !status.running && !flag_was_set && !walker_running {
if sent_once {
if last_progress.elapsed() >= config.first_result_timeout {
break;
}
} else if timeout_elapsed {
if !cancellation_token.load(Ordering::Relaxed) {
let ui_running = status.running || flag_was_set || status.changed;
callback(current_query.clone(), results, ui_running);
} else if timeout_elapsed && !cancellation_token.load(Ordering::Relaxed) {
let ui_running =
walker_running || status.running || flag_was_set || status.changed;
callback(current_query.clone(), results, ui_running);
if !walker_running {
break;
}
break;
}
}
}
@@ -497,11 +524,13 @@ fn spawn_walker(
threads: usize,
override_matcher: Option<ignore::overrides::Override>,
cancel_flag: Arc<AtomicBool>,
walker_running: Arc<AtomicBool>,
injector: nucleo::Injector<SearchItem>,
) -> anyhow::Result<JoinHandle<()>> {
thread::Builder::new()
.name("codex-file-search-walker".to_string())
.spawn(move || {
let _walker_running_guard = WalkerRunningGuard::new(walker_running);
let search_directory = Arc::new(search_directory);
let mut walk_builder = WalkBuilder::new(search_directory.as_path());
walk_builder