feat(app-server) FileSearchStream

This commit is contained in:
Dylan Hurd
2026-01-26 01:31:19 -07:00
parent 65c7119fb7
commit 1d67af9f04
10 changed files with 1495 additions and 8 deletions

33
codex-rs/Cargo.lock generated
View File

@@ -1560,10 +1560,12 @@ dependencies = [
"anyhow",
"clap",
"ignore",
"nucleo",
"nucleo-matcher",
"pretty_assertions",
"serde",
"serde_json",
"tempfile",
"tokio",
]
@@ -4758,6 +4760,17 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "nucleo"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5262af4c94921c2646c5ac6ff7900c2af9cbb08dc26a797e18130a7019c039d4"
dependencies = [
"nucleo-matcher",
"parking_lot",
"rayon",
]
[[package]]
name = "nucleo-matcher"
version = "0.3.1"
@@ -6125,6 +6138,26 @@ dependencies = [
"ratatui",
]
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.15"

View File

@@ -331,6 +331,10 @@ client_request_definitions! {
params: FuzzyFileSearchParams,
response: FuzzyFileSearchResponse,
},
FindFilesStream => "findFilesStream" {
params: FindFilesStreamParams,
response: FindFilesStreamResponse,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
params: v1::ExecOneOffCommandParams,
@@ -570,6 +574,34 @@ pub struct FuzzyFileSearchResponse {
pub files: Vec<FuzzyFileSearchResult>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FindFilesStreamParams {
pub query: String,
pub roots: Vec<String>,
// if provided, will cancel any previous request that used the same value
pub cancellation_token: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FindFilesStreamResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FindFilesStreamChunkNotification {
pub request_id: RequestId,
pub query: String,
pub files: Vec<FuzzyFileSearchResult>,
pub total_match_count: usize,
pub chunk_index: usize,
pub chunk_count: usize,
pub running: bool,
}
server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
@@ -597,6 +629,7 @@ server_notification_definitions! {
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
FindFilesStreamChunk => "findFilesStream/chunk" (FindFilesStreamChunkNotification),
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),

View File

@@ -1,6 +1,7 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::find_files_stream::FindFilesStreamSession;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::models::supported_models;
use crate::outgoing_message::OutgoingMessageSender;
@@ -35,6 +36,9 @@ use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::FeedbackUploadResponse;
use codex_app_server_protocol::FindFilesStreamChunkNotification;
use codex_app_server_protocol::FindFilesStreamParams;
use codex_app_server_protocol::FindFilesStreamResponse;
use codex_app_server_protocol::ForkConversationParams;
use codex_app_server_protocol::ForkConversationResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
@@ -257,6 +261,7 @@ pub(crate) struct CodexMessageProcessor {
pending_rollbacks: PendingRollbacks,
turn_summary_store: TurnSummaryStore,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
pending_find_files_streams: Arc<Mutex<HashMap<FindFilesStreamKey, FindFilesStreamSession>>>,
feedback: CodexFeedback,
}
@@ -266,6 +271,12 @@ pub(crate) enum ApiVersion {
V2,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
enum FindFilesStreamKey {
Token(String),
Request(RequestId),
}
impl CodexMessageProcessor {
async fn load_thread(
&self,
@@ -313,6 +324,7 @@ impl CodexMessageProcessor {
pending_rollbacks: Arc::new(Mutex::new(HashMap::new())),
turn_summary_store: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
pending_find_files_streams: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
@@ -560,6 +572,9 @@ impl CodexMessageProcessor {
ClientRequest::FuzzyFileSearch { request_id, params } => {
self.fuzzy_file_search(request_id, params).await;
}
ClientRequest::FindFilesStream { request_id, params } => {
self.find_files_stream(request_id, params).await;
}
ClientRequest::OneOffCommandExec { request_id, params } => {
self.exec_one_off_command(request_id, params).await;
}
@@ -4232,6 +4247,80 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn find_files_stream(&mut self, request_id: RequestId, params: FindFilesStreamParams) {
let FindFilesStreamParams {
query,
roots,
cancellation_token,
} = params;
if query.is_empty() || roots.is_empty() {
if let Some(token) = &cancellation_token {
let key = FindFilesStreamKey::Token(token.clone());
let mut streams = self.pending_find_files_streams.lock().await;
if let Some(stream) = streams.remove(&key) {
stream.cancel();
}
}
self.send_find_files_stream_empty(request_id.clone(), query)
.await;
self.outgoing
.send_response(request_id, FindFilesStreamResponse {})
.await;
return;
}
let key = cancellation_token
.clone()
.map(FindFilesStreamKey::Token)
.unwrap_or(FindFilesStreamKey::Request(request_id.clone()));
let mut streams = self.pending_find_files_streams.lock().await;
if let Some(stream) = streams.get(&key) {
if stream.roots() == roots {
stream.update_request_id(request_id.clone());
stream.on_query(query);
self.outgoing
.send_response(request_id, FindFilesStreamResponse {})
.await;
return;
}
stream.cancel();
streams.remove(&key);
}
let (stream, done_rx) =
FindFilesStreamSession::new(roots, request_id.clone(), self.outgoing.clone());
stream.on_query(query);
streams.insert(key.clone(), stream);
drop(streams);
self.outgoing
.send_response(request_id, FindFilesStreamResponse {})
.await;
let streams = Arc::clone(&self.pending_find_files_streams);
tokio::spawn(async move {
let _ = done_rx.await;
let mut streams = streams.lock().await;
streams.remove(&key);
});
}
async fn send_find_files_stream_empty(&self, request_id: RequestId, query: String) {
let notification = FindFilesStreamChunkNotification {
request_id,
query,
files: Vec::new(),
total_match_count: 0,
chunk_index: 0,
chunk_count: 1,
running: false,
};
self.outgoing
.send_server_notification(ServerNotification::FindFilesStreamChunk(notification))
.await;
}
async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) {
if !self.config.feedback_enabled {
let error = JSONRPCErrorError {

View File

@@ -0,0 +1,297 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::FindFilesStreamChunkNotification;
use codex_app_server_protocol::FuzzyFileSearchResult;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_file_search as file_search;
use codex_file_search::FileSearchResults;
use codex_file_search::search_manager::DebounceConfig;
use codex_file_search::search_manager::DebouncedSearchManager;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::warn;
use crate::outgoing_message::OutgoingMessageSender;
const LIMIT_PER_ROOT: usize = 50;
const MAX_THREADS: usize = 12;
const CHUNK_SIZE: usize = 100;
const COMPUTE_INDICES: bool = true;
type SearchCallback = dyn Fn(String, FileSearchResults, bool) + Send + Sync + 'static;
pub(crate) enum FindFilesStreamUpdate {
Cancel,
Results {
root: String,
query: String,
results: FileSearchResults,
running: bool,
},
}
struct RootSearchState {
results: FileSearchResults,
running: bool,
seen: bool,
}
pub(crate) struct FindFilesStreamSession {
roots: Vec<String>,
managers: Vec<DebouncedSearchManager<Box<SearchCallback>>>,
request_id: Arc<Mutex<RequestId>>,
cancel_flag: Arc<AtomicBool>,
update_tx: mpsc::UnboundedSender<FindFilesStreamUpdate>,
}
impl FindFilesStreamSession {
pub(crate) fn new(
roots: Vec<String>,
request_id: RequestId,
outgoing: Arc<OutgoingMessageSender>,
) -> (Self, oneshot::Receiver<()>) {
let limit_per_root = NonZeroUsize::new(LIMIT_PER_ROOT).unwrap_or(NonZeroUsize::MIN);
let threads_per_root = threads_per_root(roots.len());
let request_id = Arc::new(Mutex::new(request_id));
let cancel_flag = Arc::new(AtomicBool::new(false));
let (update_tx, update_rx) = mpsc::unbounded_channel();
let (done_tx, done_rx) = oneshot::channel();
let mut managers = Vec::with_capacity(roots.len());
for root in &roots {
let root_path = PathBuf::from(root);
let root_name = root.clone();
let update_tx = update_tx.clone();
let cancel_flag = Arc::clone(&cancel_flag);
let callback: Box<SearchCallback> = Box::new(move |query, results, running| {
if cancel_flag.load(Ordering::Relaxed) {
return;
}
if update_tx
.send(FindFilesStreamUpdate::Results {
root: root_name.clone(),
query,
results,
running,
})
.is_err()
{
warn!("find-files-stream update channel closed");
}
});
let manager = DebouncedSearchManager::new(
root_path,
limit_per_root,
threads_per_root,
COMPUTE_INDICES,
Vec::new(),
Arc::new(callback),
DebounceConfig::default(),
);
managers.push(manager);
}
let cancel_flag_for_task = Arc::clone(&cancel_flag);
let request_id_for_task = Arc::clone(&request_id);
let roots_for_task = roots.clone();
tokio::spawn(async move {
run_stream_task(
roots_for_task,
update_rx,
request_id_for_task,
outgoing,
cancel_flag_for_task,
)
.await;
let _ = done_tx.send(());
});
(
Self {
roots,
managers,
request_id,
cancel_flag,
update_tx,
},
done_rx,
)
}
pub(crate) fn roots(&self) -> &[String] {
&self.roots
}
pub(crate) fn update_request_id(&self, request_id: RequestId) {
#[expect(clippy::unwrap_used)]
let mut locked = self.request_id.lock().unwrap();
*locked = request_id;
}
pub(crate) fn on_query(&self, query: String) {
for manager in &self.managers {
manager.on_query(query.clone());
}
}
pub(crate) fn cancel(&self) {
self.cancel_flag.store(true, Ordering::Relaxed);
let _ = self.update_tx.send(FindFilesStreamUpdate::Cancel);
}
}
async fn run_stream_task(
roots: Vec<String>,
mut update_rx: mpsc::UnboundedReceiver<FindFilesStreamUpdate>,
request_id: Arc<Mutex<RequestId>>,
outgoing: Arc<OutgoingMessageSender>,
cancel_flag: Arc<AtomicBool>,
) {
let mut root_states: HashMap<String, RootSearchState> = roots
.iter()
.map(|root| {
(
root.clone(),
RootSearchState {
results: FileSearchResults {
matches: Vec::new(),
total_match_count: 0,
},
running: false,
seen: false,
},
)
})
.collect();
let mut current_query = String::new();
while let Some(update) = update_rx.recv().await {
match update {
FindFilesStreamUpdate::Cancel => break,
FindFilesStreamUpdate::Results {
root,
query,
results,
running,
} => {
if cancel_flag.load(Ordering::Relaxed) {
break;
}
if query != current_query {
current_query.clear();
current_query.push_str(&query);
for state in root_states.values_mut() {
state.results = FileSearchResults {
matches: Vec::new(),
total_match_count: 0,
};
state.running = false;
state.seen = false;
}
}
if let Some(state) = root_states.get_mut(&root) {
state.results = results;
state.running = running;
state.seen = true;
} else {
warn!("find-files-stream received update for unexpected root: {root}");
continue;
}
let mut files = Vec::new();
let mut total_match_count = 0usize;
let mut any_running = false;
let mut all_seen = true;
for (root, state) in &root_states {
if !state.seen {
all_seen = false;
}
any_running |= state.running;
total_match_count += state.results.total_match_count;
for entry in &state.results.matches {
files.push(FuzzyFileSearchResult {
root: root.clone(),
path: entry.path.clone(),
file_name: file_search::file_name_from_path(&entry.path),
score: entry.score,
indices: entry.indices.clone(),
});
}
}
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
FuzzyFileSearchResult,
_,
_,
>(|f| f.score, |f| f.path.as_str()));
let chunk_count = if files.is_empty() {
1
} else {
(files.len() + CHUNK_SIZE - 1) / CHUNK_SIZE
};
#[expect(clippy::unwrap_used)]
let current_request_id = request_id.lock().unwrap().clone();
if files.is_empty() {
let notification = FindFilesStreamChunkNotification {
request_id: current_request_id,
query: current_query.clone(),
files: Vec::new(),
total_match_count,
chunk_index: 0,
chunk_count,
running: any_running,
};
outgoing
.send_server_notification(ServerNotification::FindFilesStreamChunk(
notification,
))
.await;
} else {
for (index, chunk) in files.chunks(CHUNK_SIZE).enumerate() {
let notification = FindFilesStreamChunkNotification {
request_id: current_request_id.clone(),
query: current_query.clone(),
files: chunk.to_vec(),
total_match_count,
chunk_index: index,
chunk_count,
running: any_running,
};
outgoing
.send_server_notification(ServerNotification::FindFilesStreamChunk(
notification,
))
.await;
}
}
if all_seen && !any_running {
cancel_flag.store(true, Ordering::Relaxed);
break;
}
}
}
}
}
fn threads_per_root(roots_len: usize) -> NonZeroUsize {
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cores.min(MAX_THREADS);
let threads_per_root = (threads / roots_len.max(1)).max(1);
NonZeroUsize::new(threads_per_root).unwrap_or(NonZeroUsize::MIN)
}

View File

@@ -42,6 +42,7 @@ mod codex_message_processor;
mod config_api;
mod dynamic_tools;
mod error_code;
mod find_files_stream;
mod fuzzy_file_search;
mod message_processor;
mod models;

View File

@@ -16,6 +16,7 @@ path = "src/lib.rs"
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
ignore = { workspace = true }
nucleo = "0.5.0"
nucleo-matcher = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -23,3 +24,4 @@ tokio = { workspace = true, features = ["full"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = "3.13.0"

View File

@@ -7,6 +7,7 @@ use nucleo_matcher::pattern::CaseMatching;
use nucleo_matcher::pattern::Normalization;
use nucleo_matcher::pattern::Pattern;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::UnsafeCell;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
@@ -19,6 +20,7 @@ use std::sync::atomic::Ordering;
use tokio::process::Command;
mod cli;
pub mod search_manager;
pub use cli::Cli;
@@ -48,7 +50,7 @@ pub fn file_name_from_path(path: &str) -> String {
.unwrap_or_else(|| path.to_string())
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FileSearchResults {
pub matches: Vec<FileMatch>,
pub total_match_count: usize,
@@ -209,7 +211,7 @@ pub fn run(
Box::new(move |entry| {
if let Some(path) = get_file_path(&entry, search_directory) {
best_list.insert(path);
best_list.insert(path.as_ref());
}
processed += 1;
@@ -224,18 +226,30 @@ pub fn run(
fn get_file_path<'a>(
entry_result: &'a Result<ignore::DirEntry, ignore::Error>,
search_directory: &std::path::Path,
) -> Option<&'a str> {
) -> Option<Cow<'a, str>> {
let entry = match entry_result {
Ok(e) => e,
Err(_) => return None,
};
if entry.file_type().is_some_and(|ft| ft.is_dir()) {
let path = entry.path();
let rel_path = match path.strip_prefix(search_directory) {
Ok(rel_path) => rel_path,
Err(_) => path,
};
if rel_path.as_os_str().is_empty() {
return None;
}
let path = entry.path();
match path.strip_prefix(search_directory) {
Ok(rel_path) => rel_path.to_str(),
Err(_) => None,
let path_str = rel_path.to_str()?;
let is_dir = entry.file_type().is_some_and(|ft| ft.is_dir());
if is_dir {
let mut owned = String::with_capacity(path_str.len() + 1);
owned.push_str(path_str);
if !owned.ends_with('/') {
owned.push('/');
}
Some(Cow::Owned(owned))
} else {
Some(Cow::Borrowed(path_str))
}
}
@@ -453,4 +467,46 @@ mod tests {
fn file_name_from_path_falls_back_to_full_path() {
assert_eq!(file_name_from_path(""), "");
}
#[test]
fn run_includes_directory_matches() {
let temp_dir = tempfile::tempdir().unwrap();
let docs_dir = temp_dir.path().join("docs");
std::fs::create_dir_all(docs_dir.join("nested")).unwrap();
let cancel_flag = Arc::new(AtomicBool::new(false));
let results = run(
"docs",
NonZero::new(10).unwrap(),
temp_dir.path(),
Vec::new(),
NonZero::new(1).unwrap(),
cancel_flag,
false,
true,
)
.unwrap();
assert!(
results.matches.iter().any(|m| m.path == "docs/"),
"expected directory match; matches={:?}",
results
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}
#[test]
fn pattern_matches_directory_without_trailing_slash_in_query() {
let pattern = create_pattern("docs");
let mut utf32buf = Vec::<char>::new();
let haystack: Utf32Str<'_> = Utf32Str::new("docs/", &mut utf32buf);
let mut matcher = Matcher::new(nucleo_matcher::Config::DEFAULT);
assert!(
pattern.score(haystack, &mut matcher).is_some(),
"expected fuzzy pattern to match directory paths with trailing slash"
);
}
}

View File

@@ -0,0 +1,570 @@
use ignore::WalkBuilder;
use ignore::overrides::OverrideBuilder;
use nucleo::Config;
use nucleo::Nucleo;
use nucleo::Snapshot;
use nucleo::Status;
use nucleo_matcher::Matcher;
use nucleo_matcher::pattern::CaseMatching;
use nucleo_matcher::pattern::Normalization;
use std::num::NonZero;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::thread::JoinHandle;
use std::thread::{self};
use std::time::Duration;
use crate::FileMatch;
use crate::FileSearchResults;
#[derive(Debug, Clone)]
pub struct SearchItem {
pub path: String,
}
impl SearchItem {
fn new(path: String) -> Self {
Self { path }
}
}
pub struct SearchManager {
nucleo: Nucleo<SearchItem>,
cancel_flag: Arc<AtomicBool>,
walk_handle: Option<JoinHandle<()>>,
limit: NonZero<usize>,
compute_indices: bool,
matcher: Mutex<Matcher>,
search_directory: PathBuf,
case_matching: CaseMatching,
normalization: Normalization,
current_pattern: String,
}
impl SearchManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
pattern: &str,
limit: NonZero<usize>,
search_directory: &Path,
exclude: Vec<String>,
threads: NonZero<usize>,
compute_indices: bool,
notify: Arc<dyn Fn() + Sync + Send>,
) -> anyhow::Result<Self> {
let search_directory_buf = search_directory.to_path_buf();
let override_matcher = build_override_matcher(search_directory, exclude)?;
let cancel_flag = Arc::new(AtomicBool::new(false));
let mut nucleo = Nucleo::new(
Config::DEFAULT,
notify,
Some(threads.get()),
1, // Single column containing the relative file path.
);
nucleo
.pattern
.reparse(0, pattern, CaseMatching::Smart, Normalization::Smart, false);
let injector = nucleo.injector();
let walk_handle = Some(spawn_walker(
search_directory_buf.clone(),
threads.get(),
override_matcher,
cancel_flag.clone(),
injector,
)?);
Ok(Self {
nucleo,
cancel_flag,
walk_handle,
limit,
compute_indices,
matcher: Mutex::new(Matcher::new(nucleo_matcher::Config::DEFAULT)),
search_directory: search_directory_buf,
case_matching: CaseMatching::Smart,
normalization: Normalization::Smart,
current_pattern: pattern.to_string(),
})
}
pub fn update_pattern(&mut self, pattern: &str) {
let append = pattern.starts_with(&self.current_pattern);
self.nucleo
.pattern
.reparse(0, pattern, self.case_matching, self.normalization, append);
self.current_pattern.clear();
self.current_pattern.push_str(pattern);
}
pub fn tick(&mut self, timeout: Duration) -> Status {
let millis = timeout.as_millis();
let timeout_ms = millis.try_into().unwrap_or(u64::MAX);
self.nucleo.tick(timeout_ms)
}
pub fn injector(&self) -> nucleo::Injector<SearchItem> {
self.nucleo.injector()
}
pub fn snapshot(&self) -> &Snapshot<SearchItem> {
self.nucleo.snapshot()
}
pub fn current_results(&self) -> FileSearchResults {
let snapshot = self.nucleo.snapshot();
let matched = snapshot.matched_item_count();
let max_results = u32::try_from(self.limit.get()).unwrap_or(u32::MAX);
let take = std::cmp::min(max_results, matched);
let mut matcher = self.matcher.lock().expect("matcher mutex poisoned");
let pattern = snapshot.pattern().column_pattern(0);
let pattern_empty = pattern.atoms.is_empty();
let compute_indices = self.compute_indices;
let matches = snapshot
.matched_items(0..take)
.filter_map(|item| {
let haystack = item.matcher_columns[0].slice(..);
if pattern_empty {
Some(FileMatch {
score: 0,
path: item.data.path.clone(),
indices: None,
})
} else if compute_indices {
let mut indices = Vec::new();
let score = pattern.indices(haystack, &mut matcher, &mut indices)?;
indices.sort_unstable();
indices.dedup();
Some(FileMatch {
score,
path: item.data.path.clone(),
indices: Some(indices),
})
} else {
let score = pattern.score(haystack, &mut matcher)?;
Some(FileMatch {
score,
path: item.data.path.clone(),
indices: None,
})
}
})
.collect();
FileSearchResults {
matches,
total_match_count: matched as usize,
}
}
pub fn cancel(&self) {
self.cancel_flag.store(true, Ordering::Relaxed);
}
pub fn search_directory(&self) -> &Path {
&self.search_directory
}
}
impl Drop for SearchManager {
fn drop(&mut self) {
self.cancel_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.walk_handle.take() {
let _ = handle.join();
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct DebounceConfig {
/// How long to wait after a pattern change before starting a new search.
pub debounce_delay: Duration,
/// How often to poll for the previous search to finish before starting a new one.
pub active_search_complete_poll_interval: Duration,
/// How long each `tick` call should wait for new results.
pub tick_timeout: Duration,
/// Maximum time to wait for the first result before emitting a fallback update.
pub first_result_timeout: Duration,
}
impl Default for DebounceConfig {
fn default() -> Self {
Self {
debounce_delay: Duration::from_millis(100),
active_search_complete_poll_interval: Duration::from_millis(20),
tick_timeout: Duration::from_millis(16),
first_result_timeout: Duration::from_millis(200),
}
}
}
struct DebouncedSearchState {
latest_query: String,
is_search_scheduled: bool,
active_search: Option<ActiveDebouncedSearch>,
}
struct ActiveDebouncedSearch {
query: String,
cancellation_token: Arc<AtomicBool>,
}
struct ActiveDebouncedSearchGuard {
state: Arc<Mutex<DebouncedSearchState>>,
token: Arc<AtomicBool>,
}
impl ActiveDebouncedSearchGuard {
fn new(state: Arc<Mutex<DebouncedSearchState>>, token: Arc<AtomicBool>) -> Self {
Self { state, token }
}
}
impl Drop for ActiveDebouncedSearchGuard {
fn drop(&mut self) {
#[expect(clippy::unwrap_used)]
let mut state = self.state.lock().unwrap();
if let Some(active_search) = &state.active_search
&& Arc::ptr_eq(&active_search.cancellation_token, &self.token)
{
state.active_search = None;
}
}
}
/// Debounced wrapper over [`SearchManager`] suitable for UI-style incremental search.
///
/// This helper owns the debounce/cancellation logic for a stream of pattern
/// updates. Call [`DebouncedSearchManager::on_query`] for each new pattern; it
/// will start searches after a debounce delay and emit updates via the
/// provided callback whenever results change or progress is made.
pub struct DebouncedSearchManager<C>
where
C: Fn(String, FileSearchResults, bool) + Send + Sync + 'static,
{
state: Arc<Mutex<DebouncedSearchState>>,
search_dir: PathBuf,
limit: NonZero<usize>,
threads: NonZero<usize>,
compute_indices: bool,
exclude: Vec<String>,
callback: Arc<C>,
config: DebounceConfig,
}
impl<C> DebouncedSearchManager<C>
where
C: Fn(String, FileSearchResults, bool) + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
search_dir: PathBuf,
limit: NonZero<usize>,
threads: NonZero<usize>,
compute_indices: bool,
exclude: Vec<String>,
callback: Arc<C>,
config: DebounceConfig,
) -> Self {
Self {
state: Arc::new(Mutex::new(DebouncedSearchState {
latest_query: String::new(),
is_search_scheduled: false,
active_search: None,
})),
search_dir,
limit,
threads,
compute_indices,
exclude,
callback,
config,
}
}
/// Call whenever the search pattern is updated.
///
/// This method is cheap to call for each keystroke; a background worker
/// will apply debouncing and cancellation.
pub fn on_query(&self, query: String) {
{
#[expect(clippy::unwrap_used)]
let mut state = self.state.lock().unwrap();
if query == state.latest_query {
return;
}
state.latest_query.clear();
state.latest_query.push_str(&query);
if let Some(active_search) = &state.active_search
&& !query.starts_with(&active_search.query)
{
active_search
.cancellation_token
.store(true, Ordering::Relaxed);
}
if state.is_search_scheduled {
return;
}
state.is_search_scheduled = true;
}
let state = Arc::clone(&self.state);
let search_dir = self.search_dir.clone();
let limit = self.limit;
let threads = self.threads;
let compute_indices = self.compute_indices;
let exclude = self.exclude.clone();
let callback = Arc::clone(&self.callback);
let config = self.config;
thread::spawn(move || {
// Always do a minimum debounce, but then poll until the active
// search is cleared.
thread::sleep(config.debounce_delay);
loop {
#[expect(clippy::unwrap_used)]
if state.lock().unwrap().active_search.is_none() {
break;
}
thread::sleep(config.active_search_complete_poll_interval);
}
let cancellation_token = Arc::new(AtomicBool::new(false));
let token = Arc::clone(&cancellation_token);
let query = {
#[expect(clippy::unwrap_used)]
let mut locked_state = state.lock().unwrap();
let query = locked_state.latest_query.clone();
locked_state.is_search_scheduled = false;
locked_state.active_search = Some(ActiveDebouncedSearch {
query: query.clone(),
cancellation_token: token,
});
query
};
DebouncedSearchManager::spawn_search(
query,
search_dir,
limit,
threads,
compute_indices,
exclude,
callback,
cancellation_token,
state,
config,
);
});
}
#[allow(clippy::too_many_arguments)]
fn spawn_search(
query: String,
search_dir: PathBuf,
limit: NonZero<usize>,
threads: NonZero<usize>,
compute_indices: bool,
exclude: Vec<String>,
callback: Arc<C>,
cancellation_token: Arc<AtomicBool>,
search_state: Arc<Mutex<DebouncedSearchState>>,
config: DebounceConfig,
) {
thread::spawn(move || {
let _guard = ActiveDebouncedSearchGuard::new(
Arc::clone(&search_state),
Arc::clone(&cancellation_token),
);
let notify_flag = Arc::new(AtomicBool::new(false));
let notify = {
let flag = Arc::clone(&notify_flag);
Arc::new(move || {
flag.store(true, Ordering::Release);
})
};
let mut manager = match SearchManager::new(
&query,
limit,
&search_dir,
exclude,
threads,
compute_indices,
notify,
) {
Ok(manager) => manager,
Err(err) => {
// Emit an empty result set so the caller can clear any
// stale results and surface the failure if desired.
let empty_results = FileSearchResults {
matches: Vec::new(),
total_match_count: 0,
};
callback(query.clone(), empty_results, false);
eprintln!("debounced file search initialization failed: {err:?}");
return;
}
};
let mut last_sent_paths: Vec<String> = Vec::new();
let mut last_sent_query: String = String::new();
let mut current_query = query.clone();
let mut sent_once = false;
let start = std::time::Instant::now();
let mut last_progress = start;
loop {
if cancellation_token.load(Ordering::Relaxed) {
manager.cancel();
}
let latest_query = {
#[expect(clippy::unwrap_used)]
let state = search_state.lock().unwrap();
state.latest_query.clone()
};
if latest_query != current_query {
manager.update_pattern(&latest_query);
current_query = latest_query;
#[expect(clippy::unwrap_used)]
if let Some(active_search) = &mut search_state.lock().unwrap().active_search {
active_search.query.clear();
active_search.query.push_str(&current_query);
}
}
let status = manager.tick(config.tick_timeout);
let flag_was_set = notify_flag.swap(false, Ordering::AcqRel);
let results = manager.current_results();
let paths: Vec<String> = results.matches.iter().map(|m| m.path.clone()).collect();
let paths_changed = paths != last_sent_paths;
let timeout_elapsed = start.elapsed() >= config.first_result_timeout;
let should_emit = !cancellation_token.load(Ordering::Relaxed)
&& (paths_changed
|| current_query != last_sent_query
|| (!sent_once
&& (flag_was_set
|| status.changed
|| !status.running
|| timeout_elapsed)));
if should_emit {
let ui_running = status.running || flag_was_set || status.changed;
callback(current_query.clone(), results.clone(), ui_running);
sent_once = true;
last_sent_paths = paths;
last_sent_query.clear();
last_sent_query.push_str(&current_query);
last_progress = std::time::Instant::now();
}
if cancellation_token.load(Ordering::Relaxed) && sent_once {
break;
}
if !status.running && !flag_was_set {
if sent_once {
if last_progress.elapsed() >= config.first_result_timeout {
break;
}
} else if timeout_elapsed {
if !cancellation_token.load(Ordering::Relaxed) {
let ui_running = status.running || flag_was_set || status.changed;
callback(current_query.clone(), results, ui_running);
}
break;
}
}
}
});
}
}
fn spawn_walker(
search_directory: PathBuf,
threads: usize,
override_matcher: Option<ignore::overrides::Override>,
cancel_flag: Arc<AtomicBool>,
injector: nucleo::Injector<SearchItem>,
) -> anyhow::Result<JoinHandle<()>> {
thread::Builder::new()
.name("codex-file-search-walker".to_string())
.spawn(move || {
let search_directory = Arc::new(search_directory);
let mut walk_builder = WalkBuilder::new(search_directory.as_path());
walk_builder
.threads(threads)
.hidden(false)
.follow_links(true)
.require_git(false);
if let Some(override_matcher) = override_matcher {
walk_builder.overrides(override_matcher);
}
let walker = walk_builder.build_parallel();
walker.run(|| {
let injector = injector.clone();
let cancel_flag = cancel_flag.clone();
let search_directory = Arc::clone(&search_directory);
Box::new(move |entry| {
if cancel_flag.load(Ordering::Relaxed) {
return ignore::WalkState::Quit;
}
let entry = match entry {
Ok(entry) => entry,
Err(_) => return ignore::WalkState::Continue,
};
let path = entry.path();
let rel_path = match path.strip_prefix(search_directory.as_path()) {
Ok(rel) => rel,
Err(_) => path,
};
if rel_path.as_os_str().is_empty() {
return ignore::WalkState::Continue;
}
let Some(mut path_string) = rel_path.to_str().map(|s| s.to_string()) else {
return ignore::WalkState::Continue;
};
if entry.file_type().is_some_and(|ft| ft.is_dir())
&& !path_string.ends_with('/')
{
path_string.push('/');
}
injector.push(SearchItem::new(path_string), |item, columns| {
columns[0] = item.path.as_str().into();
});
ignore::WalkState::Continue
})
});
})
.map_err(anyhow::Error::new)
}
fn build_override_matcher(
search_directory: &Path,
exclude: Vec<String>,
) -> anyhow::Result<Option<ignore::overrides::Override>> {
if exclude.is_empty() {
return Ok(None);
}
let mut builder = OverrideBuilder::new(search_directory);
for pattern in exclude {
let exclude_pattern = format!("!{pattern}");
builder.add(&exclude_pattern)?;
}
Ok(Some(builder.build()?))
}

View File

@@ -0,0 +1,161 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use codex_file_search::FileSearchResults;
use codex_file_search::search_manager::DebounceConfig;
use codex_file_search::search_manager::DebouncedSearchManager;
use pretty_assertions::assert_eq;
#[test]
fn debounced_search_manager_emits_results() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_dir = temp_dir.path().join("src");
std::fs::create_dir_all(&nested_dir).unwrap();
std::fs::write(nested_dir.join("gamma.rs"), "fn main() {}").unwrap();
let captured: Arc<Mutex<Vec<(String, FileSearchResults, bool)>>> =
Arc::new(Mutex::new(Vec::new()));
let captured_clone = Arc::clone(&captured);
let callback = Arc::new(
move |query: String, results: FileSearchResults, running: bool| {
let mut guard = captured_clone
.lock()
.expect("captured results mutex poisoned");
guard.push((query, results, running));
},
);
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(2).unwrap();
let manager = DebouncedSearchManager::new(
temp_dir.path().to_path_buf(),
limit,
threads,
false,
Vec::new(),
callback,
DebounceConfig::default(),
);
manager.on_query("gam".to_string());
let start = Instant::now();
let mut saw_match = false;
while start.elapsed() < Duration::from_secs(2) {
{
let guard = captured
.lock()
.expect("captured results mutex poisoned while reading");
if guard
.iter()
.any(|(_, results, _)| results.matches.iter().any(|m| m.path.ends_with("gamma.rs")))
{
saw_match = true;
break;
}
}
thread::sleep(Duration::from_millis(10));
}
if !saw_match {
let guard = captured
.lock()
.expect("captured results mutex poisoned at end of test");
eprintln!("captured debounced results: {guard:?}");
}
assert!(saw_match, "debounced search did not emit expected result");
}
#[test]
fn debounced_search_manager_backspace_updates_results() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_dir = temp_dir.path().join("src");
std::fs::create_dir_all(&nested_dir).unwrap();
std::fs::write(nested_dir.join("alpha.rs"), "fn alpha() {}").unwrap();
std::fs::write(nested_dir.join("alpine.rs"), "fn alpine() {}").unwrap();
std::fs::write(nested_dir.join("beta.rs"), "fn beta() {}").unwrap();
let captured: Arc<Mutex<Vec<(String, FileSearchResults, bool)>>> =
Arc::new(Mutex::new(Vec::new()));
let captured_clone = Arc::clone(&captured);
let callback = Arc::new(
move |query: String, results: FileSearchResults, running: bool| {
let mut guard = captured_clone
.lock()
.expect("captured results mutex poisoned");
guard.push((query, results, running));
},
);
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(2).unwrap();
let manager = DebouncedSearchManager::new(
temp_dir.path().to_path_buf(),
limit,
threads,
false,
Vec::new(),
callback,
DebounceConfig::default(),
);
manager.on_query("alph".to_string());
let alpha_paths =
wait_for_query_paths_matching(&captured, "alph", Duration::from_secs(2), |paths| {
paths.iter().any(|path| path.ends_with("alpha.rs"))
})
.expect("timed out waiting for alph results");
assert_eq!(
alpha_paths,
vec!["src/alpha.rs".to_string()],
"expected only alpha for query 'alph'"
);
manager.on_query("al".to_string());
let mut backspace_paths =
wait_for_query_paths_matching(&captured, "al", Duration::from_secs(2), |paths| {
paths.iter().any(|path| path.ends_with("alpha.rs"))
&& paths.iter().any(|path| path.ends_with("alpine.rs"))
})
.expect("timed out waiting for backspace results");
backspace_paths.sort();
assert_eq!(
backspace_paths,
vec!["src/alpha.rs".to_string(), "src/alpine.rs".to_string()],
"expected backspace to include both alpha and alpine"
);
}
fn wait_for_query_paths_matching(
captured: &Arc<Mutex<Vec<(String, FileSearchResults, bool)>>>,
query: &str,
timeout: Duration,
predicate: impl Fn(&[String]) -> bool,
) -> Option<Vec<String>> {
let start = Instant::now();
while start.elapsed() < timeout {
{
let guard = captured
.lock()
.expect("captured results mutex poisoned while reading");
if let Some((_, results, _)) = guard.iter().rev().find(|(q, _, _)| q == query) {
let paths: Vec<String> = results.matches.iter().map(|m| m.path.clone()).collect();
if predicate(&paths) {
return Some(paths);
}
}
}
thread::sleep(Duration::from_millis(10));
}
None
}

View File

@@ -0,0 +1,245 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_file_search::search_manager::SearchItem;
use codex_file_search::search_manager::SearchManager;
fn push(injector: &nucleo::Injector<SearchItem>, path: &str) {
injector.push(
SearchItem {
path: path.to_string(),
},
|item, columns| {
columns[0] = item.path.as_str().into();
},
);
}
#[test]
fn search_manager_streams_results() {
let temp_dir = tempfile::tempdir().unwrap();
let tick_counter = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&tick_counter);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(2).unwrap();
let mut manager = SearchManager::new(
"g",
limit,
temp_dir.path(),
Vec::new(),
threads,
false,
notify,
)
.unwrap();
let injector = manager.injector();
push(&injector, "alpha.txt");
manager.tick(Duration::from_millis(10));
assert!(manager.current_results().matches.is_empty());
push(&injector, "subdir/gamma.rs");
for _ in 0..50 {
let status = manager.tick(Duration::from_millis(10));
if !status.running {
break;
}
}
let final_results = manager.current_results();
assert!(
final_results
.matches
.iter()
.any(|m| m.path.ends_with("gamma.rs")),
"expected to find gamma.rs; results={:?}",
final_results
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
assert!(tick_counter.load(Ordering::Relaxed) >= 2);
}
#[test]
fn search_manager_walk_finds_files() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_dir = temp_dir.path().join("subdir");
std::fs::create_dir_all(&nested_dir).unwrap();
std::fs::write(nested_dir.join("gamma.rs"), "fn main() {}").unwrap();
std::fs::write(temp_dir.path().join("alpha.txt"), "alpha").unwrap();
let notify_flag = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&notify_flag);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(1).unwrap();
let mut manager = SearchManager::new(
"gam",
limit,
temp_dir.path(),
Vec::new(),
threads,
true,
notify,
)
.unwrap();
let start = std::time::Instant::now();
let mut found = false;
loop {
let status = manager.tick(Duration::from_millis(20));
let _ = notify_flag.swap(0, Ordering::AcqRel);
let results = manager.current_results();
if results.matches.iter().any(|m| m.path.ends_with("gamma.rs")) {
found = true;
break;
}
if !status.running && start.elapsed() > Duration::from_secs(1) {
break;
}
if start.elapsed() > Duration::from_secs(5) {
break;
}
}
assert!(
found,
"expected walker to find gamma.rs; matches: {:?}",
manager
.current_results()
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}
#[test]
fn search_manager_walk_includes_directories() {
let temp_dir = tempfile::tempdir().unwrap();
let docs_dir = temp_dir.path().join("docs");
let nested_dir = docs_dir.join("nested");
std::fs::create_dir_all(&nested_dir).unwrap();
let notify_flag = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&notify_flag);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(1).unwrap();
let mut manager = SearchManager::new(
"docs",
limit,
temp_dir.path(),
Vec::new(),
threads,
false,
notify,
)
.unwrap();
let start = std::time::Instant::now();
let mut found_directory = false;
loop {
let status = manager.tick(Duration::from_millis(20));
let _ = notify_flag.swap(0, Ordering::AcqRel);
let results = manager.current_results();
if results.matches.iter().any(|m| m.path == "docs/") {
found_directory = true;
break;
}
if !status.running && start.elapsed() > Duration::from_secs(1) {
break;
}
if start.elapsed() > Duration::from_secs(5) {
break;
}
}
assert!(
found_directory,
"expected walker to find directory matches; matches: {:?}",
manager
.current_results()
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}
#[test]
#[cfg(unix)]
fn search_manager_walk_follows_symlinked_directories() {
let temp_dir = tempfile::tempdir().unwrap();
let root_dir = temp_dir.path().join("root");
std::fs::create_dir_all(&root_dir).unwrap();
let target_dir = temp_dir.path().join("target");
std::fs::create_dir_all(&target_dir).unwrap();
std::fs::write(target_dir.join("gamma.rs"), "fn main() {}").unwrap();
std::os::unix::fs::symlink(&target_dir, root_dir.join("link")).unwrap();
let notify_flag = Arc::new(AtomicUsize::new(0));
let notify_counter = Arc::clone(&notify_flag);
let notify = Arc::new(move || {
notify_counter.fetch_add(1, Ordering::Relaxed);
});
let limit = NonZeroUsize::new(10).unwrap();
let threads = NonZeroUsize::new(1).unwrap();
let mut manager =
SearchManager::new("gamma", limit, &root_dir, Vec::new(), threads, true, notify).unwrap();
let start = std::time::Instant::now();
let mut found = false;
loop {
let status = manager.tick(Duration::from_millis(20));
let _ = notify_flag.swap(0, Ordering::AcqRel);
let results = manager.current_results();
if results.matches.iter().any(|m| m.path == "link/gamma.rs") {
found = true;
break;
}
if !status.running && start.elapsed() > Duration::from_secs(1) {
break;
}
if start.elapsed() > Duration::from_secs(5) {
break;
}
}
assert!(
found,
"expected walker to find symlinked file; matches: {:?}",
manager
.current_results()
.matches
.iter()
.map(|m| m.path.clone())
.collect::<Vec<_>>()
);
}