Compare commits

...

2 Commits

Author SHA1 Message Date
Dylan Hurd
a91e974efb test(app-server): add findFilesStream integration scenarios
Cover 10 scenarios: single match; empty query; empty roots; no matches; multi-root merge; same token updates request id/query; same token + different roots cancels old stream; per-root limit; chunking over chunk size; sorted unique indices.

Also fix a stream completion race by tracking walker liveness in codex-file-search’s DebouncedSearchManager/SearchManager.
2026-01-26 16:12:03 -07:00
Dylan Hurd
1d67af9f04 feat(app-server) FileSearchStream 2026-01-26 16:12:03 -07:00
13 changed files with 1983 additions and 8 deletions

33
codex-rs/Cargo.lock generated
View File

@@ -1560,10 +1560,12 @@ dependencies = [
"anyhow",
"clap",
"ignore",
"nucleo",
"nucleo-matcher",
"pretty_assertions",
"serde",
"serde_json",
"tempfile",
"tokio",
]
@@ -4758,6 +4760,17 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "nucleo"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5262af4c94921c2646c5ac6ff7900c2af9cbb08dc26a797e18130a7019c039d4"
dependencies = [
"nucleo-matcher",
"parking_lot",
"rayon",
]
[[package]]
name = "nucleo-matcher"
version = "0.3.1"
@@ -6125,6 +6138,26 @@ dependencies = [
"ratatui",
]
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.15"

View File

@@ -331,6 +331,10 @@ client_request_definitions! {
params: FuzzyFileSearchParams,
response: FuzzyFileSearchResponse,
},
FindFilesStream => "findFilesStream" {
params: FindFilesStreamParams,
response: FindFilesStreamResponse,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
params: v1::ExecOneOffCommandParams,
@@ -570,6 +574,34 @@ pub struct FuzzyFileSearchResponse {
pub files: Vec<FuzzyFileSearchResult>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FindFilesStreamParams {
pub query: String,
pub roots: Vec<String>,
// if provided, will cancel any previous request that used the same value
pub cancellation_token: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FindFilesStreamResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FindFilesStreamChunkNotification {
pub request_id: RequestId,
pub query: String,
pub files: Vec<FuzzyFileSearchResult>,
pub total_match_count: usize,
pub chunk_index: usize,
pub chunk_count: usize,
pub running: bool,
}
server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
@@ -597,6 +629,7 @@ server_notification_definitions! {
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
FindFilesStreamChunk => "findFilesStream/chunk" (FindFilesStreamChunkNotification),
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),

View File

@@ -1,6 +1,7 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::find_files_stream::FindFilesStreamSession;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::models::supported_models;
use crate::outgoing_message::OutgoingMessageSender;
@@ -35,6 +36,9 @@ use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::FeedbackUploadResponse;
use codex_app_server_protocol::FindFilesStreamChunkNotification;
use codex_app_server_protocol::FindFilesStreamParams;
use codex_app_server_protocol::FindFilesStreamResponse;
use codex_app_server_protocol::ForkConversationParams;
use codex_app_server_protocol::ForkConversationResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
@@ -257,6 +261,7 @@ pub(crate) struct CodexMessageProcessor {
pending_rollbacks: PendingRollbacks,
turn_summary_store: TurnSummaryStore,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
pending_find_files_streams: Arc<Mutex<HashMap<FindFilesStreamKey, FindFilesStreamSession>>>,
feedback: CodexFeedback,
}
@@ -266,6 +271,12 @@ pub(crate) enum ApiVersion {
V2,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
enum FindFilesStreamKey {
Token(String),
Request(RequestId),
}
impl CodexMessageProcessor {
async fn load_thread(
&self,
@@ -313,6 +324,7 @@ impl CodexMessageProcessor {
pending_rollbacks: Arc::new(Mutex::new(HashMap::new())),
turn_summary_store: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
pending_find_files_streams: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
@@ -560,6 +572,9 @@ impl CodexMessageProcessor {
ClientRequest::FuzzyFileSearch { request_id, params } => {
self.fuzzy_file_search(request_id, params).await;
}
ClientRequest::FindFilesStream { request_id, params } => {
self.find_files_stream(request_id, params).await;
}
ClientRequest::OneOffCommandExec { request_id, params } => {
self.exec_one_off_command(request_id, params).await;
}
@@ -4232,6 +4247,80 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn find_files_stream(&mut self, request_id: RequestId, params: FindFilesStreamParams) {
let FindFilesStreamParams {
query,
roots,
cancellation_token,
} = params;
if query.is_empty() || roots.is_empty() {
if let Some(token) = &cancellation_token {
let key = FindFilesStreamKey::Token(token.clone());
let mut streams = self.pending_find_files_streams.lock().await;
if let Some(stream) = streams.remove(&key) {
stream.cancel();
}
}
self.send_find_files_stream_empty(request_id.clone(), query)
.await;
self.outgoing
.send_response(request_id, FindFilesStreamResponse {})
.await;
return;
}
let key = cancellation_token
.clone()
.map(FindFilesStreamKey::Token)
.unwrap_or(FindFilesStreamKey::Request(request_id.clone()));
let mut streams = self.pending_find_files_streams.lock().await;
if let Some(stream) = streams.get(&key) {
if stream.roots() == roots {
stream.update_request_id(request_id.clone());
stream.on_query(query);
self.outgoing
.send_response(request_id, FindFilesStreamResponse {})
.await;
return;
}
stream.cancel();
streams.remove(&key);
}
let (stream, done_rx) =
FindFilesStreamSession::new(roots, request_id.clone(), self.outgoing.clone());
stream.on_query(query);
streams.insert(key.clone(), stream);
drop(streams);
self.outgoing
.send_response(request_id, FindFilesStreamResponse {})
.await;
let streams = Arc::clone(&self.pending_find_files_streams);
tokio::spawn(async move {
let _ = done_rx.await;
let mut streams = streams.lock().await;
streams.remove(&key);
});
}
async fn send_find_files_stream_empty(&self, request_id: RequestId, query: String) {
let notification = FindFilesStreamChunkNotification {
request_id,
query,
files: Vec::new(),
total_match_count: 0,
chunk_index: 0,
chunk_count: 1,
running: false,
};
self.outgoing
.send_server_notification(ServerNotification::FindFilesStreamChunk(notification))
.await;
}
async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) {
if !self.config.feedback_enabled {
let error = JSONRPCErrorError {

View File

@@ -0,0 +1,297 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::FindFilesStreamChunkNotification;
use codex_app_server_protocol::FuzzyFileSearchResult;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_file_search as file_search;
use codex_file_search::FileSearchResults;
use codex_file_search::search_manager::DebounceConfig;
use codex_file_search::search_manager::DebouncedSearchManager;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::warn;
use crate::outgoing_message::OutgoingMessageSender;
const LIMIT_PER_ROOT: usize = 50;
const MAX_THREADS: usize = 12;
const CHUNK_SIZE: usize = 100;
const COMPUTE_INDICES: bool = true;
type SearchCallback = dyn Fn(String, FileSearchResults, bool) + Send + Sync + 'static;
pub(crate) enum FindFilesStreamUpdate {
Cancel,
Results {
root: String,
query: String,
results: FileSearchResults,
running: bool,
},
}
struct RootSearchState {
results: FileSearchResults,
running: bool,
seen: bool,
}
pub(crate) struct FindFilesStreamSession {
roots: Vec<String>,
managers: Vec<DebouncedSearchManager<Box<SearchCallback>>>,
request_id: Arc<Mutex<RequestId>>,
cancel_flag: Arc<AtomicBool>,
update_tx: mpsc::UnboundedSender<FindFilesStreamUpdate>,
}
impl FindFilesStreamSession {
pub(crate) fn new(
roots: Vec<String>,
request_id: RequestId,
outgoing: Arc<OutgoingMessageSender>,
) -> (Self, oneshot::Receiver<()>) {
let limit_per_root = NonZeroUsize::new(LIMIT_PER_ROOT).unwrap_or(NonZeroUsize::MIN);
let threads_per_root = threads_per_root(roots.len());
let request_id = Arc::new(Mutex::new(request_id));
let cancel_flag = Arc::new(AtomicBool::new(false));
let (update_tx, update_rx) = mpsc::unbounded_channel();
let (done_tx, done_rx) = oneshot::channel();
let mut managers = Vec::with_capacity(roots.len());
for root in &roots {
let root_path = PathBuf::from(root);
let root_name = root.clone();
let update_tx = update_tx.clone();
let cancel_flag = Arc::clone(&cancel_flag);
let callback: Box<SearchCallback> = Box::new(move |query, results, running| {
if cancel_flag.load(Ordering::Relaxed) {
return;
}
if update_tx
.send(FindFilesStreamUpdate::Results {
root: root_name.clone(),
query,
results,
running,
})
.is_err()
{
warn!("find-files-stream update channel closed");
}
});
let manager = DebouncedSearchManager::new(
root_path,
limit_per_root,
threads_per_root,
COMPUTE_INDICES,
Vec::new(),
Arc::new(callback),
DebounceConfig::default(),
);
managers.push(manager);
}
let cancel_flag_for_task = Arc::clone(&cancel_flag);
let request_id_for_task = Arc::clone(&request_id);
let roots_for_task = roots.clone();
tokio::spawn(async move {
run_stream_task(
roots_for_task,
update_rx,
request_id_for_task,
outgoing,
cancel_flag_for_task,
)
.await;
let _ = done_tx.send(());
});
(
Self {
roots,
managers,
request_id,
cancel_flag,
update_tx,
},
done_rx,
)
}
pub(crate) fn roots(&self) -> &[String] {
&self.roots
}
pub(crate) fn update_request_id(&self, request_id: RequestId) {
#[expect(clippy::unwrap_used)]
let mut locked = self.request_id.lock().unwrap();
*locked = request_id;
}
pub(crate) fn on_query(&self, query: String) {
for manager in &self.managers {
manager.on_query(query.clone());
}
}
pub(crate) fn cancel(&self) {
self.cancel_flag.store(true, Ordering::Relaxed);
let _ = self.update_tx.send(FindFilesStreamUpdate::Cancel);
}
}
async fn run_stream_task(
roots: Vec<String>,
mut update_rx: mpsc::UnboundedReceiver<FindFilesStreamUpdate>,
request_id: Arc<Mutex<RequestId>>,
outgoing: Arc<OutgoingMessageSender>,
cancel_flag: Arc<AtomicBool>,
) {
let mut root_states: HashMap<String, RootSearchState> = roots
.iter()
.map(|root| {
(
root.clone(),
RootSearchState {
results: FileSearchResults {
matches: Vec::new(),
total_match_count: 0,
},
running: false,
seen: false,
},
)
})
.collect();
let mut current_query = String::new();
while let Some(update) = update_rx.recv().await {
match update {
FindFilesStreamUpdate::Cancel => break,
FindFilesStreamUpdate::Results {
root,
query,
results,
running,
} => {
if cancel_flag.load(Ordering::Relaxed) {
break;
}
if query != current_query {
current_query.clear();
current_query.push_str(&query);
for state in root_states.values_mut() {
state.results = FileSearchResults {
matches: Vec::new(),
total_match_count: 0,
};
state.running = false;
state.seen = false;
}
}
if let Some(state) = root_states.get_mut(&root) {
state.results = results;
state.running = running;
state.seen = true;
} else {
warn!("find-files-stream received update for unexpected root: {root}");
continue;
}
let mut files = Vec::new();
let mut total_match_count = 0usize;
let mut any_running = false;
let mut all_seen = true;
for (root, state) in &root_states {
if !state.seen {
all_seen = false;
}
any_running |= state.running;
total_match_count += state.results.total_match_count;
for entry in &state.results.matches {
files.push(FuzzyFileSearchResult {
root: root.clone(),
path: entry.path.clone(),
file_name: file_search::file_name_from_path(&entry.path),
score: entry.score,
indices: entry.indices.clone(),
});
}
}
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
FuzzyFileSearchResult,
_,
_,
>(|f| f.score, |f| f.path.as_str()));
let chunk_count = if files.is_empty() {
1
} else {
(files.len() + CHUNK_SIZE - 1) / CHUNK_SIZE
};
#[expect(clippy::unwrap_used)]
let current_request_id = request_id.lock().unwrap().clone();
if files.is_empty() {
let notification = FindFilesStreamChunkNotification {
request_id: current_request_id,
query: current_query.clone(),
files: Vec::new(),
total_match_count,
chunk_index: 0,
chunk_count,
running: any_running,
};
outgoing
.send_server_notification(ServerNotification::FindFilesStreamChunk(
notification,
))
.await;
} else {
for (index, chunk) in files.chunks(CHUNK_SIZE).enumerate() {
let notification = FindFilesStreamChunkNotification {
request_id: current_request_id.clone(),
query: current_query.clone(),
files: chunk.to_vec(),
total_match_count,
chunk_index: index,
chunk_count,
running: any_running,
};
outgoing
.send_server_notification(ServerNotification::FindFilesStreamChunk(
notification,
))
.await;
}
}
if all_seen && !any_running {
cancel_flag.store(true, Ordering::Relaxed);
break;
}
}
}
}
}
fn threads_per_root(roots_len: usize) -> NonZeroUsize {
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cores.min(MAX_THREADS);
let threads_per_root = (threads / roots_len.max(1)).max(1);
NonZeroUsize::new(threads_per_root).unwrap_or(NonZeroUsize::MIN)
}

View File

@@ -42,6 +42,7 @@ mod codex_message_processor;
mod config_api;
mod dynamic_tools;
mod error_code;
mod find_files_stream;
mod fuzzy_file_search;
mod message_processor;
mod models;

View File

@@ -573,6 +573,23 @@ impl McpProcess {
self.send_request("fuzzyFileSearch", Some(params)).await
}
/// Send a `findFilesStream` JSON-RPC request.
pub async fn send_find_files_stream_request(
&mut self,
query: &str,
roots: Vec<String>,
cancellation_token: Option<String>,
) -> anyhow::Result<i64> {
let mut params = serde_json::json!({
"query": query,
"roots": roots,
});
if let Some(token) = cancellation_token {
params["cancellationToken"] = serde_json::json!(token);
}
self.send_request("findFilesStream", Some(params)).await
}
async fn send_request(
&mut self,
method: &str,

View File

@@ -0,0 +1,441 @@
use std::collections::BTreeSet;
use std::time::Duration;
use anyhow::Result;
use anyhow::anyhow;
use app_test_support::McpProcess;
use codex_app_server_protocol::FindFilesStreamChunkNotification;
use codex_app_server_protocol::FindFilesStreamResponse;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
const CHUNK_METHOD: &str = "findFilesStream/chunk";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_single_root_single_match() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root.path().join("beta.rs"), "fn beta() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_path = root.path().to_string_lossy().to_string();
let request_id = mcp
.send_find_files_stream_request("alp", vec![root_path.clone()], None)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
assert_eq!(files.len(), 1, "files={files:?}");
assert_eq!(files[0].root, root_path);
assert_eq!(files[0].path, "alpha.rs");
assert_eq!(files[0].file_name, "alpha.rs");
assert!(files[0].indices.is_some(), "expected indices for match");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_empty_query_emits_single_empty_chunk() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request("", vec![root.path().to_string_lossy().to_string()], None)
.await?;
let response = read_response(&mut mcp, request_id).await?;
let parsed: FindFilesStreamResponse = serde_json::from_value(response.result)?;
assert_eq!(parsed, FindFilesStreamResponse {});
let (chunks, mismatched_count) = collect_chunks_until_complete(&mut mcp, request_id).await?;
assert_eq!(mismatched_count, 0, "unexpected mismatched notifications");
assert_eq!(chunks.len(), 1, "chunks={chunks:?}");
let chunk = &chunks[0];
assert_eq!(chunk.files.len(), 0);
assert_eq!(chunk.total_match_count, 0);
assert_eq!(chunk.chunk_index, 0);
assert_eq!(chunk.chunk_count, 1);
assert!(!chunk.running);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_empty_roots_emits_single_empty_chunk() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request("alp", Vec::new(), None)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
assert_eq!(chunks.len(), 1, "chunks={chunks:?}");
let chunk = &chunks[0];
assert_eq!(chunk.files.len(), 0);
assert_eq!(chunk.total_match_count, 0);
assert_eq!(chunk.chunk_count, 1);
assert!(!chunk.running);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_no_matches_returns_empty_files() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"zzz",
vec![root.path().to_string_lossy().to_string()],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let chunk = chunks
.iter()
.find(|chunk| chunk.chunk_index == 0)
.ok_or_else(|| anyhow!("missing chunk 0"))?;
assert_eq!(chunk.files.len(), 0);
assert_eq!(chunk.total_match_count, 0);
assert!(!chunk.running);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_merges_results_across_roots() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
std::fs::write(root_a.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root_b.path().join("alpine.rs"), "fn alpine() {}")?;
std::fs::write(root_b.path().join("beta.rs"), "fn beta() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_a_path = root_a.path().to_string_lossy().to_string();
let root_b_path = root_b.path().to_string_lossy().to_string();
let request_id = mcp
.send_find_files_stream_request("alp", vec![root_a_path.clone(), root_b_path.clone()], None)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
let observed: BTreeSet<(String, String)> = files
.into_iter()
.map(|file| (file.root, file.path))
.collect();
let expected: BTreeSet<(String, String)> = BTreeSet::from([
(root_a_path, "alpha.rs".to_string()),
(root_b_path, "alpine.rs".to_string()),
]);
assert_eq!(observed, expected);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_same_token_updates_request_id_and_query() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root.path().join("beta.rs"), "fn beta() {}")?;
// Create enough extra files to keep the stream active while we issue a follow-up query.
write_matching_files(root.path(), "alpha-extra", 150)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_path = root.path().to_string_lossy().to_string();
let token = "shared-token".to_string();
let first_request_id = mcp
.send_find_files_stream_request("alp", vec![root_path.clone()], Some(token.clone()))
.await?;
let _first_response = read_response(&mut mcp, first_request_id).await?;
let second_request_id = mcp
.send_find_files_stream_request("bet", vec![root_path.clone()], Some(token))
.await?;
let (chunks, _mismatched_count) =
collect_chunks_until_complete(&mut mcp, second_request_id).await?;
assert_eq!(
chunks[0].request_id,
RequestId::Integer(second_request_id),
"expected notifications to adopt latest request id"
);
assert_eq!(chunks[0].query, "bet");
let files = flatten_files(&chunks);
assert!(files.iter().any(|file| file.path == "beta.rs"));
assert!(
chunks
.iter()
.all(|chunk| chunk.request_id == RequestId::Integer(second_request_id))
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_same_token_with_different_roots_cancels_old_stream() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
std::fs::write(root_a.path().join("alpha.rs"), "fn alpha() {}")?;
std::fs::write(root_b.path().join("beta.rs"), "fn beta() {}")?;
write_matching_files(root_a.path(), "alpha-extra", 120)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let token = "root-swap-token".to_string();
let root_a_path = root_a.path().to_string_lossy().to_string();
let root_b_path = root_b.path().to_string_lossy().to_string();
let first_request_id = mcp
.send_find_files_stream_request("alp", vec![root_a_path], Some(token.clone()))
.await?;
let _first_response = read_response(&mut mcp, first_request_id).await?;
let second_request_id = mcp
.send_find_files_stream_request("alp", vec![root_b_path.clone()], Some(token))
.await?;
let (chunks, _mismatched_count) =
collect_chunks_until_complete(&mut mcp, second_request_id).await?;
let files = flatten_files(&chunks);
assert!(files.iter().all(|file| file.root == root_b_path));
assert!(files.iter().all(|file| file.path != "alpha.rs"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_enforces_limit_per_root() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
write_matching_files(root.path(), "limit-case", 60)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"limit-case",
vec![root.path().to_string_lossy().to_string()],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
assert_eq!(
files.len(),
50,
"expected limit-per-root to cap emitted matches"
);
assert!(
chunks[0].total_match_count >= 60,
"expected total match count to reflect all matches"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_chunks_results_when_over_chunk_size() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
let root_c = TempDir::new()?;
write_matching_files(root_a.path(), "chunk-case", 55)?;
write_matching_files(root_b.path(), "chunk-case", 55)?;
write_matching_files(root_c.path(), "chunk-case", 55)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"chunk-case",
vec![
root_a.path().to_string_lossy().to_string(),
root_b.path().to_string_lossy().to_string(),
root_c.path().to_string_lossy().to_string(),
],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let chunk_indices: BTreeSet<usize> = chunks.iter().map(|chunk| chunk.chunk_index).collect();
assert_eq!(chunks[0].chunk_count, 2);
assert_eq!(chunk_indices, BTreeSet::from([0, 1]));
assert_eq!(flatten_files(&chunks).len(), 150);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn find_files_stream_emits_sorted_unique_indices() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("abcde.rs"), "fn main() {}")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_find_files_stream_request(
"ace",
vec![root.path().to_string_lossy().to_string()],
None,
)
.await?;
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
let files = flatten_files(&chunks);
assert_eq!(files.len(), 1, "files={files:?}");
let indices = files[0]
.indices
.clone()
.ok_or_else(|| anyhow!("missing indices"))?;
assert_eq!(indices, vec![0, 2, 4]);
assert!(is_sorted_unique(&indices));
Ok(())
}
async fn collect_final_chunks(
mcp: &mut McpProcess,
request_id: i64,
) -> anyhow::Result<Vec<FindFilesStreamChunkNotification>> {
let _response = read_response(mcp, request_id).await?;
let (chunks, mismatched_count) = collect_chunks_until_complete(mcp, request_id).await?;
if mismatched_count != 0 {
anyhow::bail!("saw {mismatched_count} notifications for other request ids");
}
Ok(chunks)
}
async fn read_response(mcp: &mut McpProcess, request_id: i64) -> anyhow::Result<JSONRPCResponse> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await?
}
async fn collect_chunks_until_complete(
mcp: &mut McpProcess,
request_id: i64,
) -> anyhow::Result<(Vec<FindFilesStreamChunkNotification>, usize)> {
let mut latest_query = String::new();
let mut latest_chunk_count = 0usize;
let mut latest_chunks = std::collections::BTreeMap::new();
let mut mismatched_count = 0usize;
loop {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message(CHUNK_METHOD),
)
.await??;
let chunk = parse_chunk(notification)?;
if chunk.request_id != RequestId::Integer(request_id) {
mismatched_count += 1;
continue;
}
if chunk.query != latest_query || chunk.chunk_count != latest_chunk_count {
latest_query.clear();
latest_query.push_str(&chunk.query);
latest_chunk_count = chunk.chunk_count;
latest_chunks.clear();
}
latest_chunks.insert(chunk.chunk_index, chunk.clone());
if !chunk.running && latest_chunks.len() == latest_chunk_count {
let chunks = latest_chunks.into_values().collect();
return Ok((chunks, mismatched_count));
}
}
}
fn parse_chunk(
notification: JSONRPCNotification,
) -> anyhow::Result<FindFilesStreamChunkNotification> {
let params = notification
.params
.ok_or_else(|| anyhow!("notification missing params"))?;
let chunk = serde_json::from_value::<FindFilesStreamChunkNotification>(params)?;
Ok(chunk)
}
fn flatten_files(
chunks: &[FindFilesStreamChunkNotification],
) -> Vec<codex_app_server_protocol::FuzzyFileSearchResult> {
let mut files = Vec::new();
for chunk in chunks {
files.extend(chunk.files.clone());
}
files
}
fn write_matching_files(root: &std::path::Path, prefix: &str, count: usize) -> Result<()> {
for index in 0..count {
let file_name = format!("{prefix}-{index:03}.rs");
std::fs::write(root.join(file_name), "fn main() {}")?;
}
Ok(())
}
fn is_sorted_unique(indices: &[u32]) -> bool {
indices.windows(2).all(|pair| pair[0] < pair[1])
}

View File

@@ -3,6 +3,7 @@ mod auth;
mod codex_message_processor_flow;
mod config;
mod create_thread;
mod find_file_stream;
mod fork_thread;
mod fuzzy_file_search;
mod interrupt;

View File

@@ -16,6 +16,7 @@ path = "src/lib.rs"
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
ignore = { workspace = true }
nucleo = "0.5.0"
nucleo-matcher = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -23,3 +24,4 @@ tokio = { workspace = true, features = ["full"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = "3.13.0"

View File

@@ -7,6 +7,7 @@ use nucleo_matcher::pattern::CaseMatching;
use nucleo_matcher::pattern::Normalization;
use nucleo_matcher::pattern::Pattern;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::UnsafeCell;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
@@ -19,6 +20,7 @@ use std::sync::atomic::Ordering;
use tokio::process::Command;
mod cli;
pub mod search_manager;
pub use cli::Cli;
@@ -48,7 +50,7 @@ pub fn file_name_from_path(path: &str) -> String {
.unwrap_or_else(|| path.to_string())
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FileSearchResults {
pub matches: Vec<FileMatch>,
pub total_match_count: usize,
@@ -209,7 +211,7 @@ pub fn run(
Box::new(move |entry| {
if let Some(path) = get_file_path(&entry, search_directory) {
best_list.insert(path);
best_list.insert(path.as_ref());
}
processed += 1;
@@ -224,18 +226,30 @@ pub fn run(
fn get_file_path<'a>(
entry_result: &'a Result<ignore::DirEntry, ignore::Error>,
search_directory: &std::path::Path,
) -> Option<&'a str> {
) -> Option<Cow<'a, str>> {
let entry = match entry_result {
Ok(e) => e,
Err(_) => return None,
};
if entry.file_type().is_some_and(|ft| ft.is_dir()) {
let path = entry.path();
let rel_path = match path.strip_prefix(search_directory) {
Ok(rel_path) => rel_path,
Err(_) => path,
};
if rel_path.as_os_str().is_empty() {
return None;
}
let path = entry.path();
match path.strip_prefix(search_directory) {
Ok(rel_path) => rel_path.to_str(),
Err(_) => None,
let path_str = rel_path.to_str()?;
let is_dir = entry.file_type().is_some_and(|ft| ft.is_dir());
if is_dir {
let mut owned = String::with_capacity(path_str.len() + 1);
owned.push_str(path_str);
if !owned.ends_with('/') {
owned.push('/');
}
Some(Cow::Owned(owned))
} else {
Some(Cow::Borrowed(path_str))
}
}
@@ -453,4 +467,46 @@ mod tests {
fn file_name_from_path_falls_back_to_full_path() {
assert_eq!(file_name_from_path(""), "");
}
#[test]
fn run_includes_directory_matches() {
let temp_dir = tempfile::tempdir().unwrap();
let docs_dir = temp_dir.path().join("docs");
std::fs::create_dir_all(docs_dir.join("nested")).unwrap();
let cancel_flag = Arc::new(AtomicBool::new(false));
let results = run(
"docs",
NonZero::new(10).unwrap(),
temp_dir.path(),
Vec::new(),
NonZero::new(1).unwrap(),
cancel_flag,
false,
true,
)
.unwrap();
assert!(
results.matches.iter().any(|m| m.path == "docs/"),
"expected directory match; matches={:?}",
results
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}
#[test]
fn pattern_matches_directory_without_trailing_slash_in_query() {
let pattern = create_pattern("docs");
let mut utf32buf = Vec::<char>::new();
let haystack: Utf32Str<'_> = Utf32Str::new("docs/", &mut utf32buf);
let mut matcher = Matcher::new(nucleo_matcher::Config::DEFAULT);
assert!(
pattern.score(haystack, &mut matcher).is_some(),
"expected fuzzy pattern to match directory paths with trailing slash"
);
}
}

View File

@@ -0,0 +1,599 @@
use ignore::WalkBuilder;
use ignore::overrides::OverrideBuilder;
use nucleo::Config;
use nucleo::Nucleo;
use nucleo::Snapshot;
use nucleo::Status;
use nucleo_matcher::Matcher;
use nucleo_matcher::pattern::CaseMatching;
use nucleo_matcher::pattern::Normalization;
use std::num::NonZero;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::thread::JoinHandle;
use std::thread::{self};
use std::time::Duration;
use crate::FileMatch;
use crate::FileSearchResults;
#[derive(Debug, Clone)]
pub struct SearchItem {
pub path: String,
}
impl SearchItem {
fn new(path: String) -> Self {
Self { path }
}
}
pub struct SearchManager {
nucleo: Nucleo<SearchItem>,
cancel_flag: Arc<AtomicBool>,
walker_running: Arc<AtomicBool>,
walk_handle: Option<JoinHandle<()>>,
limit: NonZero<usize>,
compute_indices: bool,
matcher: Mutex<Matcher>,
search_directory: PathBuf,
case_matching: CaseMatching,
normalization: Normalization,
current_pattern: String,
}
impl SearchManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
pattern: &str,
limit: NonZero<usize>,
search_directory: &Path,
exclude: Vec<String>,
threads: NonZero<usize>,
compute_indices: bool,
notify: Arc<dyn Fn() + Sync + Send>,
) -> anyhow::Result<Self> {
let search_directory_buf = search_directory.to_path_buf();
let override_matcher = build_override_matcher(search_directory, exclude)?;
let cancel_flag = Arc::new(AtomicBool::new(false));
let walker_running = Arc::new(AtomicBool::new(true));
let mut nucleo = Nucleo::new(
Config::DEFAULT,
notify,
Some(threads.get()),
1, // Single column containing the relative file path.
);
nucleo
.pattern
.reparse(0, pattern, CaseMatching::Smart, Normalization::Smart, false);
let injector = nucleo.injector();
let walk_handle = Some(spawn_walker(
search_directory_buf.clone(),
threads.get(),
override_matcher,
cancel_flag.clone(),
walker_running.clone(),
injector,
)?);
Ok(Self {
nucleo,
cancel_flag,
walker_running,
walk_handle,
limit,
compute_indices,
matcher: Mutex::new(Matcher::new(nucleo_matcher::Config::DEFAULT)),
search_directory: search_directory_buf,
case_matching: CaseMatching::Smart,
normalization: Normalization::Smart,
current_pattern: pattern.to_string(),
})
}
pub fn update_pattern(&mut self, pattern: &str) {
let append = pattern.starts_with(&self.current_pattern);
self.nucleo
.pattern
.reparse(0, pattern, self.case_matching, self.normalization, append);
self.current_pattern.clear();
self.current_pattern.push_str(pattern);
}
pub fn tick(&mut self, timeout: Duration) -> Status {
let millis = timeout.as_millis();
let timeout_ms = millis.try_into().unwrap_or(u64::MAX);
self.nucleo.tick(timeout_ms)
}
pub fn injector(&self) -> nucleo::Injector<SearchItem> {
self.nucleo.injector()
}
pub fn snapshot(&self) -> &Snapshot<SearchItem> {
self.nucleo.snapshot()
}
pub fn current_results(&self) -> FileSearchResults {
let snapshot = self.nucleo.snapshot();
let matched = snapshot.matched_item_count();
let max_results = u32::try_from(self.limit.get()).unwrap_or(u32::MAX);
let take = std::cmp::min(max_results, matched);
let mut matcher = self.matcher.lock().expect("matcher mutex poisoned");
let pattern = snapshot.pattern().column_pattern(0);
let pattern_empty = pattern.atoms.is_empty();
let compute_indices = self.compute_indices;
let matches = snapshot
.matched_items(0..take)
.filter_map(|item| {
let haystack = item.matcher_columns[0].slice(..);
if pattern_empty {
Some(FileMatch {
score: 0,
path: item.data.path.clone(),
indices: None,
})
} else if compute_indices {
let mut indices = Vec::new();
let score = pattern.indices(haystack, &mut matcher, &mut indices)?;
indices.sort_unstable();
indices.dedup();
Some(FileMatch {
score,
path: item.data.path.clone(),
indices: Some(indices),
})
} else {
let score = pattern.score(haystack, &mut matcher)?;
Some(FileMatch {
score,
path: item.data.path.clone(),
indices: None,
})
}
})
.collect();
FileSearchResults {
matches,
total_match_count: matched as usize,
}
}
pub fn cancel(&self) {
self.cancel_flag.store(true, Ordering::Relaxed);
}
pub fn walker_running(&self) -> bool {
self.walker_running.load(Ordering::Relaxed)
}
pub fn search_directory(&self) -> &Path {
&self.search_directory
}
}
struct WalkerRunningGuard {
flag: Arc<AtomicBool>,
}
impl WalkerRunningGuard {
fn new(flag: Arc<AtomicBool>) -> Self {
flag.store(true, Ordering::Relaxed);
Self { flag }
}
}
impl Drop for WalkerRunningGuard {
fn drop(&mut self) {
self.flag.store(false, Ordering::Relaxed);
}
}
impl Drop for SearchManager {
fn drop(&mut self) {
self.cancel_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.walk_handle.take() {
let _ = handle.join();
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct DebounceConfig {
/// How long to wait after a pattern change before starting a new search.
pub debounce_delay: Duration,
/// How often to poll for the previous search to finish before starting a new one.
pub active_search_complete_poll_interval: Duration,
/// How long each `tick` call should wait for new results.
pub tick_timeout: Duration,
/// Maximum time to wait for the first result before emitting a fallback update.
pub first_result_timeout: Duration,
}
impl Default for DebounceConfig {
fn default() -> Self {
Self {
debounce_delay: Duration::from_millis(100),
active_search_complete_poll_interval: Duration::from_millis(20),
tick_timeout: Duration::from_millis(16),
first_result_timeout: Duration::from_millis(200),
}
}
}
struct DebouncedSearchState {
latest_query: String,
is_search_scheduled: bool,
active_search: Option<ActiveDebouncedSearch>,
}
struct ActiveDebouncedSearch {
query: String,
cancellation_token: Arc<AtomicBool>,
}
struct ActiveDebouncedSearchGuard {
state: Arc<Mutex<DebouncedSearchState>>,
token: Arc<AtomicBool>,
}
impl ActiveDebouncedSearchGuard {
fn new(state: Arc<Mutex<DebouncedSearchState>>, token: Arc<AtomicBool>) -> Self {
Self { state, token }
}
}
impl Drop for ActiveDebouncedSearchGuard {
fn drop(&mut self) {
#[expect(clippy::unwrap_used)]
let mut state = self.state.lock().unwrap();
if let Some(active_search) = &state.active_search
&& Arc::ptr_eq(&active_search.cancellation_token, &self.token)
{
state.active_search = None;
}
}
}
/// Debounced wrapper over [`SearchManager`] suitable for UI-style incremental search.
///
/// This helper owns the debounce/cancellation logic for a stream of pattern
/// updates. Call [`DebouncedSearchManager::on_query`] for each new pattern; it
/// will start searches after a debounce delay and emit updates via the
/// provided callback whenever results change or progress is made.
pub struct DebouncedSearchManager<C>
where
C: Fn(String, FileSearchResults, bool) + Send + Sync + 'static,
{
state: Arc<Mutex<DebouncedSearchState>>,
search_dir: PathBuf,
limit: NonZero<usize>,
threads: NonZero<usize>,
compute_indices: bool,
exclude: Vec<String>,
callback: Arc<C>,
config: DebounceConfig,
}
impl<C> DebouncedSearchManager<C>
where
C: Fn(String, FileSearchResults, bool) + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
search_dir: PathBuf,
limit: NonZero<usize>,
threads: NonZero<usize>,
compute_indices: bool,
exclude: Vec<String>,
callback: Arc<C>,
config: DebounceConfig,
) -> Self {
Self {
state: Arc::new(Mutex::new(DebouncedSearchState {
latest_query: String::new(),
is_search_scheduled: false,
active_search: None,
})),
search_dir,
limit,
threads,
compute_indices,
exclude,
callback,
config,
}
}
/// Call whenever the search pattern is updated.
///
/// This method is cheap to call for each keystroke; a background worker
/// will apply debouncing and cancellation.
pub fn on_query(&self, query: String) {
{
#[expect(clippy::unwrap_used)]
let mut state = self.state.lock().unwrap();
if query == state.latest_query {
return;
}
state.latest_query.clear();
state.latest_query.push_str(&query);
if let Some(active_search) = &state.active_search
&& !query.starts_with(&active_search.query)
{
active_search
.cancellation_token
.store(true, Ordering::Relaxed);
}
if state.is_search_scheduled {
return;
}
state.is_search_scheduled = true;
}
let state = Arc::clone(&self.state);
let search_dir = self.search_dir.clone();
let limit = self.limit;
let threads = self.threads;
let compute_indices = self.compute_indices;
let exclude = self.exclude.clone();
let callback = Arc::clone(&self.callback);
let config = self.config;
thread::spawn(move || {
// Always do a minimum debounce, but then poll until the active
// search is cleared.
thread::sleep(config.debounce_delay);
loop {
#[expect(clippy::unwrap_used)]
if state.lock().unwrap().active_search.is_none() {
break;
}
thread::sleep(config.active_search_complete_poll_interval);
}
let cancellation_token = Arc::new(AtomicBool::new(false));
let token = Arc::clone(&cancellation_token);
let query = {
#[expect(clippy::unwrap_used)]
let mut locked_state = state.lock().unwrap();
let query = locked_state.latest_query.clone();
locked_state.is_search_scheduled = false;
locked_state.active_search = Some(ActiveDebouncedSearch {
query: query.clone(),
cancellation_token: token,
});
query
};
DebouncedSearchManager::spawn_search(
query,
search_dir,
limit,
threads,
compute_indices,
exclude,
callback,
cancellation_token,
state,
config,
);
});
}
#[allow(clippy::too_many_arguments)]
fn spawn_search(
query: String,
search_dir: PathBuf,
limit: NonZero<usize>,
threads: NonZero<usize>,
compute_indices: bool,
exclude: Vec<String>,
callback: Arc<C>,
cancellation_token: Arc<AtomicBool>,
search_state: Arc<Mutex<DebouncedSearchState>>,
config: DebounceConfig,
) {
thread::spawn(move || {
let _guard = ActiveDebouncedSearchGuard::new(
Arc::clone(&search_state),
Arc::clone(&cancellation_token),
);
let notify_flag = Arc::new(AtomicBool::new(false));
let notify = {
let flag = Arc::clone(&notify_flag);
Arc::new(move || {
flag.store(true, Ordering::Release);
})
};
let mut manager = match SearchManager::new(
&query,
limit,
&search_dir,
exclude,
threads,
compute_indices,
notify,
) {
Ok(manager) => manager,
Err(err) => {
// Emit an empty result set so the caller can clear any
// stale results and surface the failure if desired.
let empty_results = FileSearchResults {
matches: Vec::new(),
total_match_count: 0,
};
callback(query.clone(), empty_results, false);
eprintln!("debounced file search initialization failed: {err:?}");
return;
}
};
let mut last_sent_paths: Vec<String> = Vec::new();
let mut last_sent_query: String = String::new();
let mut current_query = query.clone();
let mut sent_once = false;
let mut last_sent_running = false;
let start = std::time::Instant::now();
let mut last_progress = start;
loop {
if cancellation_token.load(Ordering::Relaxed) {
manager.cancel();
}
let latest_query = {
#[expect(clippy::unwrap_used)]
let state = search_state.lock().unwrap();
state.latest_query.clone()
};
if latest_query != current_query {
manager.update_pattern(&latest_query);
current_query = latest_query;
#[expect(clippy::unwrap_used)]
if let Some(active_search) = &mut search_state.lock().unwrap().active_search {
active_search.query.clear();
active_search.query.push_str(&current_query);
}
}
let status = manager.tick(config.tick_timeout);
let flag_was_set = notify_flag.swap(false, Ordering::AcqRel);
let results = manager.current_results();
let paths: Vec<String> = results.matches.iter().map(|m| m.path.clone()).collect();
let paths_changed = paths != last_sent_paths;
let timeout_elapsed = start.elapsed() >= config.first_result_timeout;
let walker_running = manager.walker_running();
let ui_running = walker_running || status.running || flag_was_set || status.changed;
let running_changed = sent_once && last_sent_running && !ui_running;
let should_emit = !cancellation_token.load(Ordering::Relaxed)
&& (paths_changed
|| current_query != last_sent_query
|| running_changed
|| (!sent_once && (flag_was_set || status.changed || timeout_elapsed)));
if should_emit {
callback(current_query.clone(), results.clone(), ui_running);
sent_once = true;
last_sent_paths = paths;
last_sent_query.clear();
last_sent_query.push_str(&current_query);
last_sent_running = ui_running;
last_progress = std::time::Instant::now();
}
if cancellation_token.load(Ordering::Relaxed) && sent_once {
break;
}
if !status.running && !flag_was_set && !walker_running {
if sent_once {
if last_progress.elapsed() >= config.first_result_timeout {
break;
}
} else if timeout_elapsed && !cancellation_token.load(Ordering::Relaxed) {
let ui_running =
walker_running || status.running || flag_was_set || status.changed;
callback(current_query.clone(), results, ui_running);
if !walker_running {
break;
}
}
}
}
});
}
}
fn spawn_walker(
search_directory: PathBuf,
threads: usize,
override_matcher: Option<ignore::overrides::Override>,
cancel_flag: Arc<AtomicBool>,
walker_running: Arc<AtomicBool>,
injector: nucleo::Injector<SearchItem>,
) -> anyhow::Result<JoinHandle<()>> {
thread::Builder::new()
.name("codex-file-search-walker".to_string())
.spawn(move || {
let _walker_running_guard = WalkerRunningGuard::new(walker_running);
let search_directory = Arc::new(search_directory);
let mut walk_builder = WalkBuilder::new(search_directory.as_path());
walk_builder
.threads(threads)
.hidden(false)
.follow_links(true)
.require_git(false);
if let Some(override_matcher) = override_matcher {
walk_builder.overrides(override_matcher);
}
let walker = walk_builder.build_parallel();
walker.run(|| {
let injector = injector.clone();
let cancel_flag = cancel_flag.clone();
let search_directory = Arc::clone(&search_directory);
Box::new(move |entry| {
if cancel_flag.load(Ordering::Relaxed) {
return ignore::WalkState::Quit;
}
let entry = match entry {
Ok(entry) => entry,
Err(_) => return ignore::WalkState::Continue,
};
let path = entry.path();
let rel_path = match path.strip_prefix(search_directory.as_path()) {
Ok(rel) => rel,
Err(_) => path,
};
if rel_path.as_os_str().is_empty() {
return ignore::WalkState::Continue;
}
let Some(mut path_string) = rel_path.to_str().map(|s| s.to_string()) else {
return ignore::WalkState::Continue;
};
if entry.file_type().is_some_and(|ft| ft.is_dir())
&& !path_string.ends_with('/')
{
path_string.push('/');
}
injector.push(SearchItem::new(path_string), |item, columns| {
columns[0] = item.path.as_str().into();
});
ignore::WalkState::Continue
})
});
})
.map_err(anyhow::Error::new)
}
fn build_override_matcher(
search_directory: &Path,
exclude: Vec<String>,
) -> anyhow::Result<Option<ignore::overrides::Override>> {
if exclude.is_empty() {
return Ok(None);
}
let mut builder = OverrideBuilder::new(search_directory);
for pattern in exclude {
let exclude_pattern = format!("!{pattern}");
builder.add(&exclude_pattern)?;
}
Ok(Some(builder.build()?))
}

View File

@@ -0,0 +1,161 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use codex_file_search::FileSearchResults;
use codex_file_search::search_manager::DebounceConfig;
use codex_file_search::search_manager::DebouncedSearchManager;
use pretty_assertions::assert_eq;
#[test]
fn debounced_search_manager_emits_results() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_dir = temp_dir.path().join("src");
std::fs::create_dir_all(&nested_dir).unwrap();
std::fs::write(nested_dir.join("gamma.rs"), "fn main() {}").unwrap();
let captured: Arc<Mutex<Vec<(String, FileSearchResults, bool)>>> =
Arc::new(Mutex::new(Vec::new()));
let captured_clone = Arc::clone(&captured);
let callback = Arc::new(
move |query: String, results: FileSearchResults, running: bool| {
let mut guard = captured_clone
.lock()
.expect("captured results mutex poisoned");
guard.push((query, results, running));
},
);
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(2).unwrap();
let manager = DebouncedSearchManager::new(
temp_dir.path().to_path_buf(),
limit,
threads,
false,
Vec::new(),
callback,
DebounceConfig::default(),
);
manager.on_query("gam".to_string());
let start = Instant::now();
let mut saw_match = false;
while start.elapsed() < Duration::from_secs(2) {
{
let guard = captured
.lock()
.expect("captured results mutex poisoned while reading");
if guard
.iter()
.any(|(_, results, _)| results.matches.iter().any(|m| m.path.ends_with("gamma.rs")))
{
saw_match = true;
break;
}
}
thread::sleep(Duration::from_millis(10));
}
if !saw_match {
let guard = captured
.lock()
.expect("captured results mutex poisoned at end of test");
eprintln!("captured debounced results: {guard:?}");
}
assert!(saw_match, "debounced search did not emit expected result");
}
#[test]
fn debounced_search_manager_backspace_updates_results() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_dir = temp_dir.path().join("src");
std::fs::create_dir_all(&nested_dir).unwrap();
std::fs::write(nested_dir.join("alpha.rs"), "fn alpha() {}").unwrap();
std::fs::write(nested_dir.join("alpine.rs"), "fn alpine() {}").unwrap();
std::fs::write(nested_dir.join("beta.rs"), "fn beta() {}").unwrap();
let captured: Arc<Mutex<Vec<(String, FileSearchResults, bool)>>> =
Arc::new(Mutex::new(Vec::new()));
let captured_clone = Arc::clone(&captured);
let callback = Arc::new(
move |query: String, results: FileSearchResults, running: bool| {
let mut guard = captured_clone
.lock()
.expect("captured results mutex poisoned");
guard.push((query, results, running));
},
);
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(2).unwrap();
let manager = DebouncedSearchManager::new(
temp_dir.path().to_path_buf(),
limit,
threads,
false,
Vec::new(),
callback,
DebounceConfig::default(),
);
manager.on_query("alph".to_string());
let alpha_paths =
wait_for_query_paths_matching(&captured, "alph", Duration::from_secs(2), |paths| {
paths.iter().any(|path| path.ends_with("alpha.rs"))
})
.expect("timed out waiting for alph results");
assert_eq!(
alpha_paths,
vec!["src/alpha.rs".to_string()],
"expected only alpha for query 'alph'"
);
manager.on_query("al".to_string());
let mut backspace_paths =
wait_for_query_paths_matching(&captured, "al", Duration::from_secs(2), |paths| {
paths.iter().any(|path| path.ends_with("alpha.rs"))
&& paths.iter().any(|path| path.ends_with("alpine.rs"))
})
.expect("timed out waiting for backspace results");
backspace_paths.sort();
assert_eq!(
backspace_paths,
vec!["src/alpha.rs".to_string(), "src/alpine.rs".to_string()],
"expected backspace to include both alpha and alpine"
);
}
fn wait_for_query_paths_matching(
captured: &Arc<Mutex<Vec<(String, FileSearchResults, bool)>>>,
query: &str,
timeout: Duration,
predicate: impl Fn(&[String]) -> bool,
) -> Option<Vec<String>> {
let start = Instant::now();
while start.elapsed() < timeout {
{
let guard = captured
.lock()
.expect("captured results mutex poisoned while reading");
if let Some((_, results, _)) = guard.iter().rev().find(|(q, _, _)| q == query) {
let paths: Vec<String> = results.matches.iter().map(|m| m.path.clone()).collect();
if predicate(&paths) {
return Some(paths);
}
}
}
thread::sleep(Duration::from_millis(10));
}
None
}

View File

@@ -0,0 +1,245 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_file_search::search_manager::SearchItem;
use codex_file_search::search_manager::SearchManager;
fn push(injector: &nucleo::Injector<SearchItem>, path: &str) {
injector.push(
SearchItem {
path: path.to_string(),
},
|item, columns| {
columns[0] = item.path.as_str().into();
},
);
}
#[test]
fn search_manager_streams_results() {
let temp_dir = tempfile::tempdir().unwrap();
let tick_counter = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&tick_counter);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(2).unwrap();
let mut manager = SearchManager::new(
"g",
limit,
temp_dir.path(),
Vec::new(),
threads,
false,
notify,
)
.unwrap();
let injector = manager.injector();
push(&injector, "alpha.txt");
manager.tick(Duration::from_millis(10));
assert!(manager.current_results().matches.is_empty());
push(&injector, "subdir/gamma.rs");
for _ in 0..50 {
let status = manager.tick(Duration::from_millis(10));
if !status.running {
break;
}
}
let final_results = manager.current_results();
assert!(
final_results
.matches
.iter()
.any(|m| m.path.ends_with("gamma.rs")),
"expected to find gamma.rs; results={:?}",
final_results
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
assert!(tick_counter.load(Ordering::Relaxed) >= 2);
}
#[test]
fn search_manager_walk_finds_files() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_dir = temp_dir.path().join("subdir");
std::fs::create_dir_all(&nested_dir).unwrap();
std::fs::write(nested_dir.join("gamma.rs"), "fn main() {}").unwrap();
std::fs::write(temp_dir.path().join("alpha.txt"), "alpha").unwrap();
let notify_flag = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&notify_flag);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(1).unwrap();
let mut manager = SearchManager::new(
"gam",
limit,
temp_dir.path(),
Vec::new(),
threads,
true,
notify,
)
.unwrap();
let start = std::time::Instant::now();
let mut found = false;
loop {
let status = manager.tick(Duration::from_millis(20));
let _ = notify_flag.swap(0, Ordering::AcqRel);
let results = manager.current_results();
if results.matches.iter().any(|m| m.path.ends_with("gamma.rs")) {
found = true;
break;
}
if !status.running && start.elapsed() > Duration::from_secs(1) {
break;
}
if start.elapsed() > Duration::from_secs(5) {
break;
}
}
assert!(
found,
"expected walker to find gamma.rs; matches: {:?}",
manager
.current_results()
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}
#[test]
fn search_manager_walk_includes_directories() {
let temp_dir = tempfile::tempdir().unwrap();
let docs_dir = temp_dir.path().join("docs");
let nested_dir = docs_dir.join("nested");
std::fs::create_dir_all(&nested_dir).unwrap();
let notify_flag = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&notify_flag);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(1).unwrap();
let mut manager = SearchManager::new(
"docs",
limit,
temp_dir.path(),
Vec::new(),
threads,
false,
notify,
)
.unwrap();
let start = std::time::Instant::now();
let mut found_directory = false;
loop {
let status = manager.tick(Duration::from_millis(20));
let _ = notify_flag.swap(0, Ordering::AcqRel);
let results = manager.current_results();
if results.matches.iter().any(|m| m.path == "docs/") {
found_directory = true;
break;
}
if !status.running && start.elapsed() > Duration::from_secs(1) {
break;
}
if start.elapsed() > Duration::from_secs(5) {
break;
}
}
assert!(
found_directory,
"expected walker to find directory matches; matches: {:?}",
manager
.current_results()
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}
#[test]
#[cfg(unix)]
fn search_manager_walk_follows_symlinked_directories() {
let temp_dir = tempfile::tempdir().unwrap();
let root_dir = temp_dir.path().join("root");
std::fs::create_dir_all(&root_dir).unwrap();
let target_dir = temp_dir.path().join("target");
std::fs::create_dir_all(&target_dir).unwrap();
std::fs::write(target_dir.join("gamma.rs"), "fn main() {}").unwrap();
std::os::unix::fs::symlink(&target_dir, root_dir.join("link")).unwrap();
let notify_flag = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&notify_flag);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(1).unwrap();
let mut manager =
SearchManager::new("gamma", limit, &root_dir, Vec::new(), threads, true, notify).unwrap();
let start = std::time::Instant::now();
let mut found = false;
loop {
let status = manager.tick(Duration::from_millis(20));
let _ = notify_flag.swap(0, Ordering::AcqRel);
let results = manager.current_results();
if results.matches.iter().any(|m| m.path == "link/gamma.rs") {
found = true;
break;
}
if !status.running && start.elapsed() > Duration::from_secs(1) {
break;
}
if start.elapsed() > Duration::from_secs(5) {
break;
}
}
assert!(
found,
"expected walker to find symlinked file; matches: {:?}",
manager
.current_results()
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}