mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Compare commits
2 Commits
codex/viya
...
dh--file-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a91e974efb | ||
|
|
1d67af9f04 |
33
codex-rs/Cargo.lock
generated
33
codex-rs/Cargo.lock
generated
@@ -1560,10 +1560,12 @@ dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"ignore",
|
||||
"nucleo",
|
||||
"nucleo-matcher",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -4758,6 +4760,17 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nucleo"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5262af4c94921c2646c5ac6ff7900c2af9cbb08dc26a797e18130a7019c039d4"
|
||||
dependencies = [
|
||||
"nucleo-matcher",
|
||||
"parking_lot",
|
||||
"rayon",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nucleo-matcher"
|
||||
version = "0.3.1"
|
||||
@@ -6125,6 +6138,26 @@ dependencies = [
|
||||
"ratatui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
|
||||
dependencies = [
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.15"
|
||||
|
||||
@@ -331,6 +331,10 @@ client_request_definitions! {
|
||||
params: FuzzyFileSearchParams,
|
||||
response: FuzzyFileSearchResponse,
|
||||
},
|
||||
FindFilesStream => "findFilesStream" {
|
||||
params: FindFilesStreamParams,
|
||||
response: FindFilesStreamResponse,
|
||||
},
|
||||
/// Execute a command (argv vector) under the server's sandbox.
|
||||
ExecOneOffCommand {
|
||||
params: v1::ExecOneOffCommandParams,
|
||||
@@ -570,6 +574,34 @@ pub struct FuzzyFileSearchResponse {
|
||||
pub files: Vec<FuzzyFileSearchResult>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FindFilesStreamParams {
|
||||
pub query: String,
|
||||
pub roots: Vec<String>,
|
||||
// if provided, will cancel any previous request that used the same value
|
||||
pub cancellation_token: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FindFilesStreamResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FindFilesStreamChunkNotification {
|
||||
pub request_id: RequestId,
|
||||
pub query: String,
|
||||
pub files: Vec<FuzzyFileSearchResult>,
|
||||
pub total_match_count: usize,
|
||||
pub chunk_index: usize,
|
||||
pub chunk_count: usize,
|
||||
pub running: bool,
|
||||
}
|
||||
|
||||
server_notification_definitions! {
|
||||
/// NEW NOTIFICATIONS
|
||||
Error => "error" (v2::ErrorNotification),
|
||||
@@ -597,6 +629,7 @@ server_notification_definitions! {
|
||||
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
|
||||
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
|
||||
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
|
||||
FindFilesStreamChunk => "findFilesStream/chunk" (FindFilesStreamChunkNotification),
|
||||
|
||||
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
|
||||
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::bespoke_event_handling::apply_bespoke_event_handling;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::find_files_stream::FindFilesStreamSession;
|
||||
use crate::fuzzy_file_search::run_fuzzy_file_search;
|
||||
use crate::models::supported_models;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
@@ -35,6 +36,9 @@ use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
|
||||
use codex_app_server_protocol::ExecOneOffCommandResponse;
|
||||
use codex_app_server_protocol::FeedbackUploadParams;
|
||||
use codex_app_server_protocol::FeedbackUploadResponse;
|
||||
use codex_app_server_protocol::FindFilesStreamChunkNotification;
|
||||
use codex_app_server_protocol::FindFilesStreamParams;
|
||||
use codex_app_server_protocol::FindFilesStreamResponse;
|
||||
use codex_app_server_protocol::ForkConversationParams;
|
||||
use codex_app_server_protocol::ForkConversationResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchParams;
|
||||
@@ -257,6 +261,7 @@ pub(crate) struct CodexMessageProcessor {
|
||||
pending_rollbacks: PendingRollbacks,
|
||||
turn_summary_store: TurnSummaryStore,
|
||||
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
|
||||
pending_find_files_streams: Arc<Mutex<HashMap<FindFilesStreamKey, FindFilesStreamSession>>>,
|
||||
feedback: CodexFeedback,
|
||||
}
|
||||
|
||||
@@ -266,6 +271,12 @@ pub(crate) enum ApiVersion {
|
||||
V2,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||
enum FindFilesStreamKey {
|
||||
Token(String),
|
||||
Request(RequestId),
|
||||
}
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
async fn load_thread(
|
||||
&self,
|
||||
@@ -313,6 +324,7 @@ impl CodexMessageProcessor {
|
||||
pending_rollbacks: Arc::new(Mutex::new(HashMap::new())),
|
||||
turn_summary_store: Arc::new(Mutex::new(HashMap::new())),
|
||||
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
|
||||
pending_find_files_streams: Arc::new(Mutex::new(HashMap::new())),
|
||||
feedback,
|
||||
}
|
||||
}
|
||||
@@ -560,6 +572,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::FuzzyFileSearch { request_id, params } => {
|
||||
self.fuzzy_file_search(request_id, params).await;
|
||||
}
|
||||
ClientRequest::FindFilesStream { request_id, params } => {
|
||||
self.find_files_stream(request_id, params).await;
|
||||
}
|
||||
ClientRequest::OneOffCommandExec { request_id, params } => {
|
||||
self.exec_one_off_command(request_id, params).await;
|
||||
}
|
||||
@@ -4232,6 +4247,80 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn find_files_stream(&mut self, request_id: RequestId, params: FindFilesStreamParams) {
|
||||
let FindFilesStreamParams {
|
||||
query,
|
||||
roots,
|
||||
cancellation_token,
|
||||
} = params;
|
||||
|
||||
if query.is_empty() || roots.is_empty() {
|
||||
if let Some(token) = &cancellation_token {
|
||||
let key = FindFilesStreamKey::Token(token.clone());
|
||||
let mut streams = self.pending_find_files_streams.lock().await;
|
||||
if let Some(stream) = streams.remove(&key) {
|
||||
stream.cancel();
|
||||
}
|
||||
}
|
||||
self.send_find_files_stream_empty(request_id.clone(), query)
|
||||
.await;
|
||||
self.outgoing
|
||||
.send_response(request_id, FindFilesStreamResponse {})
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let key = cancellation_token
|
||||
.clone()
|
||||
.map(FindFilesStreamKey::Token)
|
||||
.unwrap_or(FindFilesStreamKey::Request(request_id.clone()));
|
||||
let mut streams = self.pending_find_files_streams.lock().await;
|
||||
if let Some(stream) = streams.get(&key) {
|
||||
if stream.roots() == roots {
|
||||
stream.update_request_id(request_id.clone());
|
||||
stream.on_query(query);
|
||||
self.outgoing
|
||||
.send_response(request_id, FindFilesStreamResponse {})
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
stream.cancel();
|
||||
streams.remove(&key);
|
||||
}
|
||||
|
||||
let (stream, done_rx) =
|
||||
FindFilesStreamSession::new(roots, request_id.clone(), self.outgoing.clone());
|
||||
stream.on_query(query);
|
||||
streams.insert(key.clone(), stream);
|
||||
drop(streams);
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, FindFilesStreamResponse {})
|
||||
.await;
|
||||
|
||||
let streams = Arc::clone(&self.pending_find_files_streams);
|
||||
tokio::spawn(async move {
|
||||
let _ = done_rx.await;
|
||||
let mut streams = streams.lock().await;
|
||||
streams.remove(&key);
|
||||
});
|
||||
}
|
||||
|
||||
async fn send_find_files_stream_empty(&self, request_id: RequestId, query: String) {
|
||||
let notification = FindFilesStreamChunkNotification {
|
||||
request_id,
|
||||
query,
|
||||
files: Vec::new(),
|
||||
total_match_count: 0,
|
||||
chunk_index: 0,
|
||||
chunk_count: 1,
|
||||
running: false,
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::FindFilesStreamChunk(notification))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) {
|
||||
if !self.config.feedback_enabled {
|
||||
let error = JSONRPCErrorError {
|
||||
|
||||
297
codex-rs/app-server/src/find_files_stream.rs
Normal file
297
codex-rs/app-server/src/find_files_stream.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_app_server_protocol::FindFilesStreamChunkNotification;
|
||||
use codex_app_server_protocol::FuzzyFileSearchResult;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_file_search as file_search;
|
||||
use codex_file_search::FileSearchResults;
|
||||
use codex_file_search::search_manager::DebounceConfig;
|
||||
use codex_file_search::search_manager::DebouncedSearchManager;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
|
||||
const LIMIT_PER_ROOT: usize = 50;
|
||||
const MAX_THREADS: usize = 12;
|
||||
const CHUNK_SIZE: usize = 100;
|
||||
const COMPUTE_INDICES: bool = true;
|
||||
|
||||
type SearchCallback = dyn Fn(String, FileSearchResults, bool) + Send + Sync + 'static;
|
||||
|
||||
pub(crate) enum FindFilesStreamUpdate {
|
||||
Cancel,
|
||||
Results {
|
||||
root: String,
|
||||
query: String,
|
||||
results: FileSearchResults,
|
||||
running: bool,
|
||||
},
|
||||
}
|
||||
|
||||
struct RootSearchState {
|
||||
results: FileSearchResults,
|
||||
running: bool,
|
||||
seen: bool,
|
||||
}
|
||||
|
||||
pub(crate) struct FindFilesStreamSession {
|
||||
roots: Vec<String>,
|
||||
managers: Vec<DebouncedSearchManager<Box<SearchCallback>>>,
|
||||
request_id: Arc<Mutex<RequestId>>,
|
||||
cancel_flag: Arc<AtomicBool>,
|
||||
update_tx: mpsc::UnboundedSender<FindFilesStreamUpdate>,
|
||||
}
|
||||
|
||||
impl FindFilesStreamSession {
|
||||
pub(crate) fn new(
|
||||
roots: Vec<String>,
|
||||
request_id: RequestId,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
) -> (Self, oneshot::Receiver<()>) {
|
||||
let limit_per_root = NonZeroUsize::new(LIMIT_PER_ROOT).unwrap_or(NonZeroUsize::MIN);
|
||||
let threads_per_root = threads_per_root(roots.len());
|
||||
let request_id = Arc::new(Mutex::new(request_id));
|
||||
let cancel_flag = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let (update_tx, update_rx) = mpsc::unbounded_channel();
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
|
||||
let mut managers = Vec::with_capacity(roots.len());
|
||||
for root in &roots {
|
||||
let root_path = PathBuf::from(root);
|
||||
let root_name = root.clone();
|
||||
let update_tx = update_tx.clone();
|
||||
let cancel_flag = Arc::clone(&cancel_flag);
|
||||
let callback: Box<SearchCallback> = Box::new(move |query, results, running| {
|
||||
if cancel_flag.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
if update_tx
|
||||
.send(FindFilesStreamUpdate::Results {
|
||||
root: root_name.clone(),
|
||||
query,
|
||||
results,
|
||||
running,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
warn!("find-files-stream update channel closed");
|
||||
}
|
||||
});
|
||||
|
||||
let manager = DebouncedSearchManager::new(
|
||||
root_path,
|
||||
limit_per_root,
|
||||
threads_per_root,
|
||||
COMPUTE_INDICES,
|
||||
Vec::new(),
|
||||
Arc::new(callback),
|
||||
DebounceConfig::default(),
|
||||
);
|
||||
managers.push(manager);
|
||||
}
|
||||
|
||||
let cancel_flag_for_task = Arc::clone(&cancel_flag);
|
||||
let request_id_for_task = Arc::clone(&request_id);
|
||||
let roots_for_task = roots.clone();
|
||||
tokio::spawn(async move {
|
||||
run_stream_task(
|
||||
roots_for_task,
|
||||
update_rx,
|
||||
request_id_for_task,
|
||||
outgoing,
|
||||
cancel_flag_for_task,
|
||||
)
|
||||
.await;
|
||||
let _ = done_tx.send(());
|
||||
});
|
||||
|
||||
(
|
||||
Self {
|
||||
roots,
|
||||
managers,
|
||||
request_id,
|
||||
cancel_flag,
|
||||
update_tx,
|
||||
},
|
||||
done_rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn roots(&self) -> &[String] {
|
||||
&self.roots
|
||||
}
|
||||
|
||||
pub(crate) fn update_request_id(&self, request_id: RequestId) {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let mut locked = self.request_id.lock().unwrap();
|
||||
*locked = request_id;
|
||||
}
|
||||
|
||||
pub(crate) fn on_query(&self, query: String) {
|
||||
for manager in &self.managers {
|
||||
manager.on_query(query.clone());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cancel(&self) {
|
||||
self.cancel_flag.store(true, Ordering::Relaxed);
|
||||
let _ = self.update_tx.send(FindFilesStreamUpdate::Cancel);
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_stream_task(
|
||||
roots: Vec<String>,
|
||||
mut update_rx: mpsc::UnboundedReceiver<FindFilesStreamUpdate>,
|
||||
request_id: Arc<Mutex<RequestId>>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
cancel_flag: Arc<AtomicBool>,
|
||||
) {
|
||||
let mut root_states: HashMap<String, RootSearchState> = roots
|
||||
.iter()
|
||||
.map(|root| {
|
||||
(
|
||||
root.clone(),
|
||||
RootSearchState {
|
||||
results: FileSearchResults {
|
||||
matches: Vec::new(),
|
||||
total_match_count: 0,
|
||||
},
|
||||
running: false,
|
||||
seen: false,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let mut current_query = String::new();
|
||||
|
||||
while let Some(update) = update_rx.recv().await {
|
||||
match update {
|
||||
FindFilesStreamUpdate::Cancel => break,
|
||||
FindFilesStreamUpdate::Results {
|
||||
root,
|
||||
query,
|
||||
results,
|
||||
running,
|
||||
} => {
|
||||
if cancel_flag.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
if query != current_query {
|
||||
current_query.clear();
|
||||
current_query.push_str(&query);
|
||||
for state in root_states.values_mut() {
|
||||
state.results = FileSearchResults {
|
||||
matches: Vec::new(),
|
||||
total_match_count: 0,
|
||||
};
|
||||
state.running = false;
|
||||
state.seen = false;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(state) = root_states.get_mut(&root) {
|
||||
state.results = results;
|
||||
state.running = running;
|
||||
state.seen = true;
|
||||
} else {
|
||||
warn!("find-files-stream received update for unexpected root: {root}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut files = Vec::new();
|
||||
let mut total_match_count = 0usize;
|
||||
let mut any_running = false;
|
||||
let mut all_seen = true;
|
||||
for (root, state) in &root_states {
|
||||
if !state.seen {
|
||||
all_seen = false;
|
||||
}
|
||||
any_running |= state.running;
|
||||
total_match_count += state.results.total_match_count;
|
||||
for entry in &state.results.matches {
|
||||
files.push(FuzzyFileSearchResult {
|
||||
root: root.clone(),
|
||||
path: entry.path.clone(),
|
||||
file_name: file_search::file_name_from_path(&entry.path),
|
||||
score: entry.score,
|
||||
indices: entry.indices.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
|
||||
FuzzyFileSearchResult,
|
||||
_,
|
||||
_,
|
||||
>(|f| f.score, |f| f.path.as_str()));
|
||||
|
||||
let chunk_count = if files.is_empty() {
|
||||
1
|
||||
} else {
|
||||
(files.len() + CHUNK_SIZE - 1) / CHUNK_SIZE
|
||||
};
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let current_request_id = request_id.lock().unwrap().clone();
|
||||
if files.is_empty() {
|
||||
let notification = FindFilesStreamChunkNotification {
|
||||
request_id: current_request_id,
|
||||
query: current_query.clone(),
|
||||
files: Vec::new(),
|
||||
total_match_count,
|
||||
chunk_index: 0,
|
||||
chunk_count,
|
||||
running: any_running,
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::FindFilesStreamChunk(
|
||||
notification,
|
||||
))
|
||||
.await;
|
||||
} else {
|
||||
for (index, chunk) in files.chunks(CHUNK_SIZE).enumerate() {
|
||||
let notification = FindFilesStreamChunkNotification {
|
||||
request_id: current_request_id.clone(),
|
||||
query: current_query.clone(),
|
||||
files: chunk.to_vec(),
|
||||
total_match_count,
|
||||
chunk_index: index,
|
||||
chunk_count,
|
||||
running: any_running,
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::FindFilesStreamChunk(
|
||||
notification,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
if all_seen && !any_running {
|
||||
cancel_flag.store(true, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn threads_per_root(roots_len: usize) -> NonZeroUsize {
|
||||
let cores = std::thread::available_parallelism()
|
||||
.map(std::num::NonZero::get)
|
||||
.unwrap_or(1);
|
||||
let threads = cores.min(MAX_THREADS);
|
||||
let threads_per_root = (threads / roots_len.max(1)).max(1);
|
||||
NonZeroUsize::new(threads_per_root).unwrap_or(NonZeroUsize::MIN)
|
||||
}
|
||||
@@ -42,6 +42,7 @@ mod codex_message_processor;
|
||||
mod config_api;
|
||||
mod dynamic_tools;
|
||||
mod error_code;
|
||||
mod find_files_stream;
|
||||
mod fuzzy_file_search;
|
||||
mod message_processor;
|
||||
mod models;
|
||||
|
||||
@@ -573,6 +573,23 @@ impl McpProcess {
|
||||
self.send_request("fuzzyFileSearch", Some(params)).await
|
||||
}
|
||||
|
||||
/// Send a `findFilesStream` JSON-RPC request.
|
||||
pub async fn send_find_files_stream_request(
|
||||
&mut self,
|
||||
query: &str,
|
||||
roots: Vec<String>,
|
||||
cancellation_token: Option<String>,
|
||||
) -> anyhow::Result<i64> {
|
||||
let mut params = serde_json::json!({
|
||||
"query": query,
|
||||
"roots": roots,
|
||||
});
|
||||
if let Some(token) = cancellation_token {
|
||||
params["cancellationToken"] = serde_json::json!(token);
|
||||
}
|
||||
self.send_request("findFilesStream", Some(params)).await
|
||||
}
|
||||
|
||||
async fn send_request(
|
||||
&mut self,
|
||||
method: &str,
|
||||
|
||||
441
codex-rs/app-server/tests/suite/find_file_stream.rs
Normal file
441
codex-rs/app-server/tests/suite/find_file_stream.rs
Normal file
@@ -0,0 +1,441 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use app_test_support::McpProcess;
|
||||
use codex_app_server_protocol::FindFilesStreamChunkNotification;
|
||||
use codex_app_server_protocol::FindFilesStreamResponse;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const CHUNK_METHOD: &str = "findFilesStream/chunk";
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_single_root_single_match() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
|
||||
std::fs::write(root.path().join("beta.rs"), "fn beta() {}")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request("alp", vec![root_path.clone()], None)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
let files = flatten_files(&chunks);
|
||||
|
||||
assert_eq!(files.len(), 1, "files={files:?}");
|
||||
assert_eq!(files[0].root, root_path);
|
||||
assert_eq!(files[0].path, "alpha.rs");
|
||||
assert_eq!(files[0].file_name, "alpha.rs");
|
||||
assert!(files[0].indices.is_some(), "expected indices for match");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_empty_query_emits_single_empty_chunk() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request("", vec![root.path().to_string_lossy().to_string()], None)
|
||||
.await?;
|
||||
|
||||
let response = read_response(&mut mcp, request_id).await?;
|
||||
let parsed: FindFilesStreamResponse = serde_json::from_value(response.result)?;
|
||||
assert_eq!(parsed, FindFilesStreamResponse {});
|
||||
|
||||
let (chunks, mismatched_count) = collect_chunks_until_complete(&mut mcp, request_id).await?;
|
||||
assert_eq!(mismatched_count, 0, "unexpected mismatched notifications");
|
||||
assert_eq!(chunks.len(), 1, "chunks={chunks:?}");
|
||||
let chunk = &chunks[0];
|
||||
assert_eq!(chunk.files.len(), 0);
|
||||
assert_eq!(chunk.total_match_count, 0);
|
||||
assert_eq!(chunk.chunk_index, 0);
|
||||
assert_eq!(chunk.chunk_count, 1);
|
||||
assert!(!chunk.running);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_empty_roots_emits_single_empty_chunk() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request("alp", Vec::new(), None)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
assert_eq!(chunks.len(), 1, "chunks={chunks:?}");
|
||||
let chunk = &chunks[0];
|
||||
assert_eq!(chunk.files.len(), 0);
|
||||
assert_eq!(chunk.total_match_count, 0);
|
||||
assert_eq!(chunk.chunk_count, 1);
|
||||
assert!(!chunk.running);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_no_matches_returns_empty_files() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request(
|
||||
"zzz",
|
||||
vec![root.path().to_string_lossy().to_string()],
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
let chunk = chunks
|
||||
.iter()
|
||||
.find(|chunk| chunk.chunk_index == 0)
|
||||
.ok_or_else(|| anyhow!("missing chunk 0"))?;
|
||||
|
||||
assert_eq!(chunk.files.len(), 0);
|
||||
assert_eq!(chunk.total_match_count, 0);
|
||||
assert!(!chunk.running);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_merges_results_across_roots() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root_a = TempDir::new()?;
|
||||
let root_b = TempDir::new()?;
|
||||
|
||||
std::fs::write(root_a.path().join("alpha.rs"), "fn alpha() {}")?;
|
||||
std::fs::write(root_b.path().join("alpine.rs"), "fn alpine() {}")?;
|
||||
std::fs::write(root_b.path().join("beta.rs"), "fn beta() {}")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let root_a_path = root_a.path().to_string_lossy().to_string();
|
||||
let root_b_path = root_b.path().to_string_lossy().to_string();
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request("alp", vec![root_a_path.clone(), root_b_path.clone()], None)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
let files = flatten_files(&chunks);
|
||||
|
||||
let observed: BTreeSet<(String, String)> = files
|
||||
.into_iter()
|
||||
.map(|file| (file.root, file.path))
|
||||
.collect();
|
||||
let expected: BTreeSet<(String, String)> = BTreeSet::from([
|
||||
(root_a_path, "alpha.rs".to_string()),
|
||||
(root_b_path, "alpine.rs".to_string()),
|
||||
]);
|
||||
|
||||
assert_eq!(observed, expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_same_token_updates_request_id_and_query() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("alpha.rs"), "fn alpha() {}")?;
|
||||
std::fs::write(root.path().join("beta.rs"), "fn beta() {}")?;
|
||||
|
||||
// Create enough extra files to keep the stream active while we issue a follow-up query.
|
||||
write_matching_files(root.path(), "alpha-extra", 150)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let token = "shared-token".to_string();
|
||||
|
||||
let first_request_id = mcp
|
||||
.send_find_files_stream_request("alp", vec![root_path.clone()], Some(token.clone()))
|
||||
.await?;
|
||||
let _first_response = read_response(&mut mcp, first_request_id).await?;
|
||||
|
||||
let second_request_id = mcp
|
||||
.send_find_files_stream_request("bet", vec![root_path.clone()], Some(token))
|
||||
.await?;
|
||||
|
||||
let (chunks, _mismatched_count) =
|
||||
collect_chunks_until_complete(&mut mcp, second_request_id).await?;
|
||||
assert_eq!(
|
||||
chunks[0].request_id,
|
||||
RequestId::Integer(second_request_id),
|
||||
"expected notifications to adopt latest request id"
|
||||
);
|
||||
assert_eq!(chunks[0].query, "bet");
|
||||
|
||||
let files = flatten_files(&chunks);
|
||||
assert!(files.iter().any(|file| file.path == "beta.rs"));
|
||||
assert!(
|
||||
chunks
|
||||
.iter()
|
||||
.all(|chunk| chunk.request_id == RequestId::Integer(second_request_id))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_same_token_with_different_roots_cancels_old_stream() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root_a = TempDir::new()?;
|
||||
let root_b = TempDir::new()?;
|
||||
|
||||
std::fs::write(root_a.path().join("alpha.rs"), "fn alpha() {}")?;
|
||||
std::fs::write(root_b.path().join("beta.rs"), "fn beta() {}")?;
|
||||
|
||||
write_matching_files(root_a.path(), "alpha-extra", 120)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let token = "root-swap-token".to_string();
|
||||
let root_a_path = root_a.path().to_string_lossy().to_string();
|
||||
let root_b_path = root_b.path().to_string_lossy().to_string();
|
||||
|
||||
let first_request_id = mcp
|
||||
.send_find_files_stream_request("alp", vec![root_a_path], Some(token.clone()))
|
||||
.await?;
|
||||
let _first_response = read_response(&mut mcp, first_request_id).await?;
|
||||
|
||||
let second_request_id = mcp
|
||||
.send_find_files_stream_request("alp", vec![root_b_path.clone()], Some(token))
|
||||
.await?;
|
||||
|
||||
let (chunks, _mismatched_count) =
|
||||
collect_chunks_until_complete(&mut mcp, second_request_id).await?;
|
||||
|
||||
let files = flatten_files(&chunks);
|
||||
assert!(files.iter().all(|file| file.root == root_b_path));
|
||||
assert!(files.iter().all(|file| file.path != "alpha.rs"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_enforces_limit_per_root() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
write_matching_files(root.path(), "limit-case", 60)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request(
|
||||
"limit-case",
|
||||
vec![root.path().to_string_lossy().to_string()],
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
let files = flatten_files(&chunks);
|
||||
|
||||
assert_eq!(
|
||||
files.len(),
|
||||
50,
|
||||
"expected limit-per-root to cap emitted matches"
|
||||
);
|
||||
assert!(
|
||||
chunks[0].total_match_count >= 60,
|
||||
"expected total match count to reflect all matches"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_chunks_results_when_over_chunk_size() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root_a = TempDir::new()?;
|
||||
let root_b = TempDir::new()?;
|
||||
let root_c = TempDir::new()?;
|
||||
|
||||
write_matching_files(root_a.path(), "chunk-case", 55)?;
|
||||
write_matching_files(root_b.path(), "chunk-case", 55)?;
|
||||
write_matching_files(root_c.path(), "chunk-case", 55)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request(
|
||||
"chunk-case",
|
||||
vec![
|
||||
root_a.path().to_string_lossy().to_string(),
|
||||
root_b.path().to_string_lossy().to_string(),
|
||||
root_c.path().to_string_lossy().to_string(),
|
||||
],
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
let chunk_indices: BTreeSet<usize> = chunks.iter().map(|chunk| chunk.chunk_index).collect();
|
||||
|
||||
assert_eq!(chunks[0].chunk_count, 2);
|
||||
assert_eq!(chunk_indices, BTreeSet::from([0, 1]));
|
||||
assert_eq!(flatten_files(&chunks).len(), 150);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn find_files_stream_emits_sorted_unique_indices() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("abcde.rs"), "fn main() {}")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_find_files_stream_request(
|
||||
"ace",
|
||||
vec![root.path().to_string_lossy().to_string()],
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let chunks = collect_final_chunks(&mut mcp, request_id).await?;
|
||||
let files = flatten_files(&chunks);
|
||||
|
||||
assert_eq!(files.len(), 1, "files={files:?}");
|
||||
let indices = files[0]
|
||||
.indices
|
||||
.clone()
|
||||
.ok_or_else(|| anyhow!("missing indices"))?;
|
||||
assert_eq!(indices, vec![0, 2, 4]);
|
||||
assert!(is_sorted_unique(&indices));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn collect_final_chunks(
|
||||
mcp: &mut McpProcess,
|
||||
request_id: i64,
|
||||
) -> anyhow::Result<Vec<FindFilesStreamChunkNotification>> {
|
||||
let _response = read_response(mcp, request_id).await?;
|
||||
let (chunks, mismatched_count) = collect_chunks_until_complete(mcp, request_id).await?;
|
||||
if mismatched_count != 0 {
|
||||
anyhow::bail!("saw {mismatched_count} notifications for other request ids");
|
||||
}
|
||||
Ok(chunks)
|
||||
}
|
||||
|
||||
async fn read_response(mcp: &mut McpProcess, request_id: i64) -> anyhow::Result<JSONRPCResponse> {
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn collect_chunks_until_complete(
|
||||
mcp: &mut McpProcess,
|
||||
request_id: i64,
|
||||
) -> anyhow::Result<(Vec<FindFilesStreamChunkNotification>, usize)> {
|
||||
let mut latest_query = String::new();
|
||||
let mut latest_chunk_count = 0usize;
|
||||
let mut latest_chunks = std::collections::BTreeMap::new();
|
||||
let mut mismatched_count = 0usize;
|
||||
|
||||
loop {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message(CHUNK_METHOD),
|
||||
)
|
||||
.await??;
|
||||
let chunk = parse_chunk(notification)?;
|
||||
|
||||
if chunk.request_id != RequestId::Integer(request_id) {
|
||||
mismatched_count += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if chunk.query != latest_query || chunk.chunk_count != latest_chunk_count {
|
||||
latest_query.clear();
|
||||
latest_query.push_str(&chunk.query);
|
||||
latest_chunk_count = chunk.chunk_count;
|
||||
latest_chunks.clear();
|
||||
}
|
||||
|
||||
latest_chunks.insert(chunk.chunk_index, chunk.clone());
|
||||
|
||||
if !chunk.running && latest_chunks.len() == latest_chunk_count {
|
||||
let chunks = latest_chunks.into_values().collect();
|
||||
return Ok((chunks, mismatched_count));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_chunk(
|
||||
notification: JSONRPCNotification,
|
||||
) -> anyhow::Result<FindFilesStreamChunkNotification> {
|
||||
let params = notification
|
||||
.params
|
||||
.ok_or_else(|| anyhow!("notification missing params"))?;
|
||||
let chunk = serde_json::from_value::<FindFilesStreamChunkNotification>(params)?;
|
||||
Ok(chunk)
|
||||
}
|
||||
|
||||
fn flatten_files(
|
||||
chunks: &[FindFilesStreamChunkNotification],
|
||||
) -> Vec<codex_app_server_protocol::FuzzyFileSearchResult> {
|
||||
let mut files = Vec::new();
|
||||
for chunk in chunks {
|
||||
files.extend(chunk.files.clone());
|
||||
}
|
||||
files
|
||||
}
|
||||
|
||||
fn write_matching_files(root: &std::path::Path, prefix: &str, count: usize) -> Result<()> {
|
||||
for index in 0..count {
|
||||
let file_name = format!("{prefix}-{index:03}.rs");
|
||||
std::fs::write(root.join(file_name), "fn main() {}")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_sorted_unique(indices: &[u32]) -> bool {
|
||||
indices.windows(2).all(|pair| pair[0] < pair[1])
|
||||
}
|
||||
@@ -3,6 +3,7 @@ mod auth;
|
||||
mod codex_message_processor_flow;
|
||||
mod config;
|
||||
mod create_thread;
|
||||
mod find_file_stream;
|
||||
mod fork_thread;
|
||||
mod fuzzy_file_search;
|
||||
mod interrupt;
|
||||
|
||||
@@ -16,6 +16,7 @@ path = "src/lib.rs"
|
||||
anyhow = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
ignore = { workspace = true }
|
||||
nucleo = "0.5.0"
|
||||
nucleo-matcher = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
@@ -23,3 +24,4 @@ tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = "3.13.0"
|
||||
|
||||
@@ -7,6 +7,7 @@ use nucleo_matcher::pattern::CaseMatching;
|
||||
use nucleo_matcher::pattern::Normalization;
|
||||
use nucleo_matcher::pattern::Pattern;
|
||||
use serde::Serialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::BinaryHeap;
|
||||
@@ -19,6 +20,7 @@ use std::sync::atomic::Ordering;
|
||||
use tokio::process::Command;
|
||||
|
||||
mod cli;
|
||||
pub mod search_manager;
|
||||
|
||||
pub use cli::Cli;
|
||||
|
||||
@@ -48,7 +50,7 @@ pub fn file_name_from_path(path: &str) -> String {
|
||||
.unwrap_or_else(|| path.to_string())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileSearchResults {
|
||||
pub matches: Vec<FileMatch>,
|
||||
pub total_match_count: usize,
|
||||
@@ -209,7 +211,7 @@ pub fn run(
|
||||
|
||||
Box::new(move |entry| {
|
||||
if let Some(path) = get_file_path(&entry, search_directory) {
|
||||
best_list.insert(path);
|
||||
best_list.insert(path.as_ref());
|
||||
}
|
||||
|
||||
processed += 1;
|
||||
@@ -224,18 +226,30 @@ pub fn run(
|
||||
fn get_file_path<'a>(
|
||||
entry_result: &'a Result<ignore::DirEntry, ignore::Error>,
|
||||
search_directory: &std::path::Path,
|
||||
) -> Option<&'a str> {
|
||||
) -> Option<Cow<'a, str>> {
|
||||
let entry = match entry_result {
|
||||
Ok(e) => e,
|
||||
Err(_) => return None,
|
||||
};
|
||||
if entry.file_type().is_some_and(|ft| ft.is_dir()) {
|
||||
let path = entry.path();
|
||||
let rel_path = match path.strip_prefix(search_directory) {
|
||||
Ok(rel_path) => rel_path,
|
||||
Err(_) => path,
|
||||
};
|
||||
if rel_path.as_os_str().is_empty() {
|
||||
return None;
|
||||
}
|
||||
let path = entry.path();
|
||||
match path.strip_prefix(search_directory) {
|
||||
Ok(rel_path) => rel_path.to_str(),
|
||||
Err(_) => None,
|
||||
let path_str = rel_path.to_str()?;
|
||||
let is_dir = entry.file_type().is_some_and(|ft| ft.is_dir());
|
||||
if is_dir {
|
||||
let mut owned = String::with_capacity(path_str.len() + 1);
|
||||
owned.push_str(path_str);
|
||||
if !owned.ends_with('/') {
|
||||
owned.push('/');
|
||||
}
|
||||
Some(Cow::Owned(owned))
|
||||
} else {
|
||||
Some(Cow::Borrowed(path_str))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -453,4 +467,46 @@ mod tests {
|
||||
fn file_name_from_path_falls_back_to_full_path() {
|
||||
assert_eq!(file_name_from_path(""), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn run_includes_directory_matches() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let docs_dir = temp_dir.path().join("docs");
|
||||
std::fs::create_dir_all(docs_dir.join("nested")).unwrap();
|
||||
|
||||
let cancel_flag = Arc::new(AtomicBool::new(false));
|
||||
let results = run(
|
||||
"docs",
|
||||
NonZero::new(10).unwrap(),
|
||||
temp_dir.path(),
|
||||
Vec::new(),
|
||||
NonZero::new(1).unwrap(),
|
||||
cancel_flag,
|
||||
false,
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
results.matches.iter().any(|m| m.path == "docs/"),
|
||||
"expected directory match; matches={:?}",
|
||||
results
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| m.path.clone())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pattern_matches_directory_without_trailing_slash_in_query() {
|
||||
let pattern = create_pattern("docs");
|
||||
let mut utf32buf = Vec::<char>::new();
|
||||
let haystack: Utf32Str<'_> = Utf32Str::new("docs/", &mut utf32buf);
|
||||
let mut matcher = Matcher::new(nucleo_matcher::Config::DEFAULT);
|
||||
assert!(
|
||||
pattern.score(haystack, &mut matcher).is_some(),
|
||||
"expected fuzzy pattern to match directory paths with trailing slash"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
599
codex-rs/file-search/src/search_manager.rs
Normal file
599
codex-rs/file-search/src/search_manager.rs
Normal file
@@ -0,0 +1,599 @@
|
||||
use ignore::WalkBuilder;
|
||||
use ignore::overrides::OverrideBuilder;
|
||||
use nucleo::Config;
|
||||
use nucleo::Nucleo;
|
||||
use nucleo::Snapshot;
|
||||
use nucleo::Status;
|
||||
use nucleo_matcher::Matcher;
|
||||
use nucleo_matcher::pattern::CaseMatching;
|
||||
use nucleo_matcher::pattern::Normalization;
|
||||
use std::num::NonZero;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::thread::JoinHandle;
|
||||
use std::thread::{self};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::FileMatch;
|
||||
use crate::FileSearchResults;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SearchItem {
|
||||
pub path: String,
|
||||
}
|
||||
|
||||
impl SearchItem {
|
||||
fn new(path: String) -> Self {
|
||||
Self { path }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SearchManager {
|
||||
nucleo: Nucleo<SearchItem>,
|
||||
cancel_flag: Arc<AtomicBool>,
|
||||
walker_running: Arc<AtomicBool>,
|
||||
walk_handle: Option<JoinHandle<()>>,
|
||||
limit: NonZero<usize>,
|
||||
compute_indices: bool,
|
||||
matcher: Mutex<Matcher>,
|
||||
search_directory: PathBuf,
|
||||
case_matching: CaseMatching,
|
||||
normalization: Normalization,
|
||||
current_pattern: String,
|
||||
}
|
||||
|
||||
impl SearchManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pattern: &str,
|
||||
limit: NonZero<usize>,
|
||||
search_directory: &Path,
|
||||
exclude: Vec<String>,
|
||||
threads: NonZero<usize>,
|
||||
compute_indices: bool,
|
||||
notify: Arc<dyn Fn() + Sync + Send>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let search_directory_buf = search_directory.to_path_buf();
|
||||
let override_matcher = build_override_matcher(search_directory, exclude)?;
|
||||
let cancel_flag = Arc::new(AtomicBool::new(false));
|
||||
let walker_running = Arc::new(AtomicBool::new(true));
|
||||
let mut nucleo = Nucleo::new(
|
||||
Config::DEFAULT,
|
||||
notify,
|
||||
Some(threads.get()),
|
||||
1, // Single column containing the relative file path.
|
||||
);
|
||||
nucleo
|
||||
.pattern
|
||||
.reparse(0, pattern, CaseMatching::Smart, Normalization::Smart, false);
|
||||
let injector = nucleo.injector();
|
||||
let walk_handle = Some(spawn_walker(
|
||||
search_directory_buf.clone(),
|
||||
threads.get(),
|
||||
override_matcher,
|
||||
cancel_flag.clone(),
|
||||
walker_running.clone(),
|
||||
injector,
|
||||
)?);
|
||||
|
||||
Ok(Self {
|
||||
nucleo,
|
||||
cancel_flag,
|
||||
walker_running,
|
||||
walk_handle,
|
||||
limit,
|
||||
compute_indices,
|
||||
matcher: Mutex::new(Matcher::new(nucleo_matcher::Config::DEFAULT)),
|
||||
search_directory: search_directory_buf,
|
||||
case_matching: CaseMatching::Smart,
|
||||
normalization: Normalization::Smart,
|
||||
current_pattern: pattern.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_pattern(&mut self, pattern: &str) {
|
||||
let append = pattern.starts_with(&self.current_pattern);
|
||||
self.nucleo
|
||||
.pattern
|
||||
.reparse(0, pattern, self.case_matching, self.normalization, append);
|
||||
self.current_pattern.clear();
|
||||
self.current_pattern.push_str(pattern);
|
||||
}
|
||||
|
||||
pub fn tick(&mut self, timeout: Duration) -> Status {
|
||||
let millis = timeout.as_millis();
|
||||
let timeout_ms = millis.try_into().unwrap_or(u64::MAX);
|
||||
self.nucleo.tick(timeout_ms)
|
||||
}
|
||||
|
||||
pub fn injector(&self) -> nucleo::Injector<SearchItem> {
|
||||
self.nucleo.injector()
|
||||
}
|
||||
|
||||
pub fn snapshot(&self) -> &Snapshot<SearchItem> {
|
||||
self.nucleo.snapshot()
|
||||
}
|
||||
|
||||
pub fn current_results(&self) -> FileSearchResults {
|
||||
let snapshot = self.nucleo.snapshot();
|
||||
let matched = snapshot.matched_item_count();
|
||||
let max_results = u32::try_from(self.limit.get()).unwrap_or(u32::MAX);
|
||||
let take = std::cmp::min(max_results, matched);
|
||||
let mut matcher = self.matcher.lock().expect("matcher mutex poisoned");
|
||||
let pattern = snapshot.pattern().column_pattern(0);
|
||||
let pattern_empty = pattern.atoms.is_empty();
|
||||
let compute_indices = self.compute_indices;
|
||||
|
||||
let matches = snapshot
|
||||
.matched_items(0..take)
|
||||
.filter_map(|item| {
|
||||
let haystack = item.matcher_columns[0].slice(..);
|
||||
if pattern_empty {
|
||||
Some(FileMatch {
|
||||
score: 0,
|
||||
path: item.data.path.clone(),
|
||||
indices: None,
|
||||
})
|
||||
} else if compute_indices {
|
||||
let mut indices = Vec::new();
|
||||
let score = pattern.indices(haystack, &mut matcher, &mut indices)?;
|
||||
indices.sort_unstable();
|
||||
indices.dedup();
|
||||
Some(FileMatch {
|
||||
score,
|
||||
path: item.data.path.clone(),
|
||||
indices: Some(indices),
|
||||
})
|
||||
} else {
|
||||
let score = pattern.score(haystack, &mut matcher)?;
|
||||
Some(FileMatch {
|
||||
score,
|
||||
path: item.data.path.clone(),
|
||||
indices: None,
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
FileSearchResults {
|
||||
matches,
|
||||
total_match_count: matched as usize,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel(&self) {
|
||||
self.cancel_flag.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn walker_running(&self) -> bool {
|
||||
self.walker_running.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn search_directory(&self) -> &Path {
|
||||
&self.search_directory
|
||||
}
|
||||
}
|
||||
|
||||
struct WalkerRunningGuard {
|
||||
flag: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl WalkerRunningGuard {
|
||||
fn new(flag: Arc<AtomicBool>) -> Self {
|
||||
flag.store(true, Ordering::Relaxed);
|
||||
Self { flag }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalkerRunningGuard {
|
||||
fn drop(&mut self) {
|
||||
self.flag.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SearchManager {
|
||||
fn drop(&mut self) {
|
||||
self.cancel_flag.store(true, Ordering::Relaxed);
|
||||
if let Some(handle) = self.walk_handle.take() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct DebounceConfig {
|
||||
/// How long to wait after a pattern change before starting a new search.
|
||||
pub debounce_delay: Duration,
|
||||
/// How often to poll for the previous search to finish before starting a new one.
|
||||
pub active_search_complete_poll_interval: Duration,
|
||||
/// How long each `tick` call should wait for new results.
|
||||
pub tick_timeout: Duration,
|
||||
/// Maximum time to wait for the first result before emitting a fallback update.
|
||||
pub first_result_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for DebounceConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
debounce_delay: Duration::from_millis(100),
|
||||
active_search_complete_poll_interval: Duration::from_millis(20),
|
||||
tick_timeout: Duration::from_millis(16),
|
||||
first_result_timeout: Duration::from_millis(200),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct DebouncedSearchState {
|
||||
latest_query: String,
|
||||
is_search_scheduled: bool,
|
||||
active_search: Option<ActiveDebouncedSearch>,
|
||||
}
|
||||
|
||||
struct ActiveDebouncedSearch {
|
||||
query: String,
|
||||
cancellation_token: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
struct ActiveDebouncedSearchGuard {
|
||||
state: Arc<Mutex<DebouncedSearchState>>,
|
||||
token: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl ActiveDebouncedSearchGuard {
|
||||
fn new(state: Arc<Mutex<DebouncedSearchState>>, token: Arc<AtomicBool>) -> Self {
|
||||
Self { state, token }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ActiveDebouncedSearchGuard {
|
||||
fn drop(&mut self) {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let Some(active_search) = &state.active_search
|
||||
&& Arc::ptr_eq(&active_search.cancellation_token, &self.token)
|
||||
{
|
||||
state.active_search = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Debounced wrapper over [`SearchManager`] suitable for UI-style incremental search.
|
||||
///
|
||||
/// This helper owns the debounce/cancellation logic for a stream of pattern
|
||||
/// updates. Call [`DebouncedSearchManager::on_query`] for each new pattern; it
|
||||
/// will start searches after a debounce delay and emit updates via the
|
||||
/// provided callback whenever results change or progress is made.
|
||||
pub struct DebouncedSearchManager<C>
|
||||
where
|
||||
C: Fn(String, FileSearchResults, bool) + Send + Sync + 'static,
|
||||
{
|
||||
state: Arc<Mutex<DebouncedSearchState>>,
|
||||
search_dir: PathBuf,
|
||||
limit: NonZero<usize>,
|
||||
threads: NonZero<usize>,
|
||||
compute_indices: bool,
|
||||
exclude: Vec<String>,
|
||||
callback: Arc<C>,
|
||||
config: DebounceConfig,
|
||||
}
|
||||
|
||||
impl<C> DebouncedSearchManager<C>
|
||||
where
|
||||
C: Fn(String, FileSearchResults, bool) + Send + Sync + 'static,
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
search_dir: PathBuf,
|
||||
limit: NonZero<usize>,
|
||||
threads: NonZero<usize>,
|
||||
compute_indices: bool,
|
||||
exclude: Vec<String>,
|
||||
callback: Arc<C>,
|
||||
config: DebounceConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: Arc::new(Mutex::new(DebouncedSearchState {
|
||||
latest_query: String::new(),
|
||||
is_search_scheduled: false,
|
||||
active_search: None,
|
||||
})),
|
||||
search_dir,
|
||||
limit,
|
||||
threads,
|
||||
compute_indices,
|
||||
exclude,
|
||||
callback,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
/// Call whenever the search pattern is updated.
|
||||
///
|
||||
/// This method is cheap to call for each keystroke; a background worker
|
||||
/// will apply debouncing and cancellation.
|
||||
pub fn on_query(&self, query: String) {
|
||||
{
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if query == state.latest_query {
|
||||
return;
|
||||
}
|
||||
|
||||
state.latest_query.clear();
|
||||
state.latest_query.push_str(&query);
|
||||
|
||||
if let Some(active_search) = &state.active_search
|
||||
&& !query.starts_with(&active_search.query)
|
||||
{
|
||||
active_search
|
||||
.cancellation_token
|
||||
.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
if state.is_search_scheduled {
|
||||
return;
|
||||
}
|
||||
|
||||
state.is_search_scheduled = true;
|
||||
}
|
||||
|
||||
let state = Arc::clone(&self.state);
|
||||
let search_dir = self.search_dir.clone();
|
||||
let limit = self.limit;
|
||||
let threads = self.threads;
|
||||
let compute_indices = self.compute_indices;
|
||||
let exclude = self.exclude.clone();
|
||||
let callback = Arc::clone(&self.callback);
|
||||
let config = self.config;
|
||||
|
||||
thread::spawn(move || {
|
||||
// Always do a minimum debounce, but then poll until the active
|
||||
// search is cleared.
|
||||
thread::sleep(config.debounce_delay);
|
||||
loop {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
if state.lock().unwrap().active_search.is_none() {
|
||||
break;
|
||||
}
|
||||
thread::sleep(config.active_search_complete_poll_interval);
|
||||
}
|
||||
|
||||
let cancellation_token = Arc::new(AtomicBool::new(false));
|
||||
let token = Arc::clone(&cancellation_token);
|
||||
let query = {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let mut locked_state = state.lock().unwrap();
|
||||
let query = locked_state.latest_query.clone();
|
||||
locked_state.is_search_scheduled = false;
|
||||
locked_state.active_search = Some(ActiveDebouncedSearch {
|
||||
query: query.clone(),
|
||||
cancellation_token: token,
|
||||
});
|
||||
query
|
||||
};
|
||||
|
||||
DebouncedSearchManager::spawn_search(
|
||||
query,
|
||||
search_dir,
|
||||
limit,
|
||||
threads,
|
||||
compute_indices,
|
||||
exclude,
|
||||
callback,
|
||||
cancellation_token,
|
||||
state,
|
||||
config,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn spawn_search(
|
||||
query: String,
|
||||
search_dir: PathBuf,
|
||||
limit: NonZero<usize>,
|
||||
threads: NonZero<usize>,
|
||||
compute_indices: bool,
|
||||
exclude: Vec<String>,
|
||||
callback: Arc<C>,
|
||||
cancellation_token: Arc<AtomicBool>,
|
||||
search_state: Arc<Mutex<DebouncedSearchState>>,
|
||||
config: DebounceConfig,
|
||||
) {
|
||||
thread::spawn(move || {
|
||||
let _guard = ActiveDebouncedSearchGuard::new(
|
||||
Arc::clone(&search_state),
|
||||
Arc::clone(&cancellation_token),
|
||||
);
|
||||
let notify_flag = Arc::new(AtomicBool::new(false));
|
||||
let notify = {
|
||||
let flag = Arc::clone(¬ify_flag);
|
||||
Arc::new(move || {
|
||||
flag.store(true, Ordering::Release);
|
||||
})
|
||||
};
|
||||
|
||||
let mut manager = match SearchManager::new(
|
||||
&query,
|
||||
limit,
|
||||
&search_dir,
|
||||
exclude,
|
||||
threads,
|
||||
compute_indices,
|
||||
notify,
|
||||
) {
|
||||
Ok(manager) => manager,
|
||||
Err(err) => {
|
||||
// Emit an empty result set so the caller can clear any
|
||||
// stale results and surface the failure if desired.
|
||||
let empty_results = FileSearchResults {
|
||||
matches: Vec::new(),
|
||||
total_match_count: 0,
|
||||
};
|
||||
callback(query.clone(), empty_results, false);
|
||||
eprintln!("debounced file search initialization failed: {err:?}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut last_sent_paths: Vec<String> = Vec::new();
|
||||
let mut last_sent_query: String = String::new();
|
||||
let mut current_query = query.clone();
|
||||
let mut sent_once = false;
|
||||
let mut last_sent_running = false;
|
||||
let start = std::time::Instant::now();
|
||||
let mut last_progress = start;
|
||||
|
||||
loop {
|
||||
if cancellation_token.load(Ordering::Relaxed) {
|
||||
manager.cancel();
|
||||
}
|
||||
|
||||
let latest_query = {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let state = search_state.lock().unwrap();
|
||||
state.latest_query.clone()
|
||||
};
|
||||
if latest_query != current_query {
|
||||
manager.update_pattern(&latest_query);
|
||||
current_query = latest_query;
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
if let Some(active_search) = &mut search_state.lock().unwrap().active_search {
|
||||
active_search.query.clear();
|
||||
active_search.query.push_str(¤t_query);
|
||||
}
|
||||
}
|
||||
|
||||
let status = manager.tick(config.tick_timeout);
|
||||
let flag_was_set = notify_flag.swap(false, Ordering::AcqRel);
|
||||
let results = manager.current_results();
|
||||
let paths: Vec<String> = results.matches.iter().map(|m| m.path.clone()).collect();
|
||||
|
||||
let paths_changed = paths != last_sent_paths;
|
||||
let timeout_elapsed = start.elapsed() >= config.first_result_timeout;
|
||||
let walker_running = manager.walker_running();
|
||||
let ui_running = walker_running || status.running || flag_was_set || status.changed;
|
||||
let running_changed = sent_once && last_sent_running && !ui_running;
|
||||
|
||||
let should_emit = !cancellation_token.load(Ordering::Relaxed)
|
||||
&& (paths_changed
|
||||
|| current_query != last_sent_query
|
||||
|| running_changed
|
||||
|| (!sent_once && (flag_was_set || status.changed || timeout_elapsed)));
|
||||
|
||||
if should_emit {
|
||||
callback(current_query.clone(), results.clone(), ui_running);
|
||||
sent_once = true;
|
||||
last_sent_paths = paths;
|
||||
last_sent_query.clear();
|
||||
last_sent_query.push_str(¤t_query);
|
||||
last_sent_running = ui_running;
|
||||
last_progress = std::time::Instant::now();
|
||||
}
|
||||
|
||||
if cancellation_token.load(Ordering::Relaxed) && sent_once {
|
||||
break;
|
||||
}
|
||||
|
||||
if !status.running && !flag_was_set && !walker_running {
|
||||
if sent_once {
|
||||
if last_progress.elapsed() >= config.first_result_timeout {
|
||||
break;
|
||||
}
|
||||
} else if timeout_elapsed && !cancellation_token.load(Ordering::Relaxed) {
|
||||
let ui_running =
|
||||
walker_running || status.running || flag_was_set || status.changed;
|
||||
callback(current_query.clone(), results, ui_running);
|
||||
if !walker_running {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_walker(
|
||||
search_directory: PathBuf,
|
||||
threads: usize,
|
||||
override_matcher: Option<ignore::overrides::Override>,
|
||||
cancel_flag: Arc<AtomicBool>,
|
||||
walker_running: Arc<AtomicBool>,
|
||||
injector: nucleo::Injector<SearchItem>,
|
||||
) -> anyhow::Result<JoinHandle<()>> {
|
||||
thread::Builder::new()
|
||||
.name("codex-file-search-walker".to_string())
|
||||
.spawn(move || {
|
||||
let _walker_running_guard = WalkerRunningGuard::new(walker_running);
|
||||
let search_directory = Arc::new(search_directory);
|
||||
let mut walk_builder = WalkBuilder::new(search_directory.as_path());
|
||||
walk_builder
|
||||
.threads(threads)
|
||||
.hidden(false)
|
||||
.follow_links(true)
|
||||
.require_git(false);
|
||||
|
||||
if let Some(override_matcher) = override_matcher {
|
||||
walk_builder.overrides(override_matcher);
|
||||
}
|
||||
|
||||
let walker = walk_builder.build_parallel();
|
||||
walker.run(|| {
|
||||
let injector = injector.clone();
|
||||
let cancel_flag = cancel_flag.clone();
|
||||
let search_directory = Arc::clone(&search_directory);
|
||||
Box::new(move |entry| {
|
||||
if cancel_flag.load(Ordering::Relaxed) {
|
||||
return ignore::WalkState::Quit;
|
||||
}
|
||||
let entry = match entry {
|
||||
Ok(entry) => entry,
|
||||
Err(_) => return ignore::WalkState::Continue,
|
||||
};
|
||||
let path = entry.path();
|
||||
let rel_path = match path.strip_prefix(search_directory.as_path()) {
|
||||
Ok(rel) => rel,
|
||||
Err(_) => path,
|
||||
};
|
||||
if rel_path.as_os_str().is_empty() {
|
||||
return ignore::WalkState::Continue;
|
||||
}
|
||||
let Some(mut path_string) = rel_path.to_str().map(|s| s.to_string()) else {
|
||||
return ignore::WalkState::Continue;
|
||||
};
|
||||
if entry.file_type().is_some_and(|ft| ft.is_dir())
|
||||
&& !path_string.ends_with('/')
|
||||
{
|
||||
path_string.push('/');
|
||||
}
|
||||
injector.push(SearchItem::new(path_string), |item, columns| {
|
||||
columns[0] = item.path.as_str().into();
|
||||
});
|
||||
ignore::WalkState::Continue
|
||||
})
|
||||
});
|
||||
})
|
||||
.map_err(anyhow::Error::new)
|
||||
}
|
||||
|
||||
fn build_override_matcher(
|
||||
search_directory: &Path,
|
||||
exclude: Vec<String>,
|
||||
) -> anyhow::Result<Option<ignore::overrides::Override>> {
|
||||
if exclude.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut builder = OverrideBuilder::new(search_directory);
|
||||
for pattern in exclude {
|
||||
let exclude_pattern = format!("!{pattern}");
|
||||
builder.add(&exclude_pattern)?;
|
||||
}
|
||||
Ok(Some(builder.build()?))
|
||||
}
|
||||
161
codex-rs/file-search/tests/debounced_search_manager.rs
Normal file
161
codex-rs/file-search/tests/debounced_search_manager.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_file_search::FileSearchResults;
|
||||
use codex_file_search::search_manager::DebounceConfig;
|
||||
use codex_file_search::search_manager::DebouncedSearchManager;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn debounced_search_manager_emits_results() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let nested_dir = temp_dir.path().join("src");
|
||||
std::fs::create_dir_all(&nested_dir).unwrap();
|
||||
std::fs::write(nested_dir.join("gamma.rs"), "fn main() {}").unwrap();
|
||||
|
||||
let captured: Arc<Mutex<Vec<(String, FileSearchResults, bool)>>> =
|
||||
Arc::new(Mutex::new(Vec::new()));
|
||||
let captured_clone = Arc::clone(&captured);
|
||||
|
||||
let callback = Arc::new(
|
||||
move |query: String, results: FileSearchResults, running: bool| {
|
||||
let mut guard = captured_clone
|
||||
.lock()
|
||||
.expect("captured results mutex poisoned");
|
||||
guard.push((query, results, running));
|
||||
},
|
||||
);
|
||||
|
||||
let limit = NonZeroUsize::new(10).unwrap();
|
||||
let threads = NonZeroUsize::new(2).unwrap();
|
||||
let manager = DebouncedSearchManager::new(
|
||||
temp_dir.path().to_path_buf(),
|
||||
limit,
|
||||
threads,
|
||||
false,
|
||||
Vec::new(),
|
||||
callback,
|
||||
DebounceConfig::default(),
|
||||
);
|
||||
|
||||
manager.on_query("gam".to_string());
|
||||
|
||||
let start = Instant::now();
|
||||
let mut saw_match = false;
|
||||
|
||||
while start.elapsed() < Duration::from_secs(2) {
|
||||
{
|
||||
let guard = captured
|
||||
.lock()
|
||||
.expect("captured results mutex poisoned while reading");
|
||||
if guard
|
||||
.iter()
|
||||
.any(|(_, results, _)| results.matches.iter().any(|m| m.path.ends_with("gamma.rs")))
|
||||
{
|
||||
saw_match = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
|
||||
if !saw_match {
|
||||
let guard = captured
|
||||
.lock()
|
||||
.expect("captured results mutex poisoned at end of test");
|
||||
eprintln!("captured debounced results: {guard:?}");
|
||||
}
|
||||
|
||||
assert!(saw_match, "debounced search did not emit expected result");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn debounced_search_manager_backspace_updates_results() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let nested_dir = temp_dir.path().join("src");
|
||||
std::fs::create_dir_all(&nested_dir).unwrap();
|
||||
std::fs::write(nested_dir.join("alpha.rs"), "fn alpha() {}").unwrap();
|
||||
std::fs::write(nested_dir.join("alpine.rs"), "fn alpine() {}").unwrap();
|
||||
std::fs::write(nested_dir.join("beta.rs"), "fn beta() {}").unwrap();
|
||||
|
||||
let captured: Arc<Mutex<Vec<(String, FileSearchResults, bool)>>> =
|
||||
Arc::new(Mutex::new(Vec::new()));
|
||||
let captured_clone = Arc::clone(&captured);
|
||||
|
||||
let callback = Arc::new(
|
||||
move |query: String, results: FileSearchResults, running: bool| {
|
||||
let mut guard = captured_clone
|
||||
.lock()
|
||||
.expect("captured results mutex poisoned");
|
||||
guard.push((query, results, running));
|
||||
},
|
||||
);
|
||||
|
||||
let limit = NonZeroUsize::new(10).unwrap();
|
||||
let threads = NonZeroUsize::new(2).unwrap();
|
||||
let manager = DebouncedSearchManager::new(
|
||||
temp_dir.path().to_path_buf(),
|
||||
limit,
|
||||
threads,
|
||||
false,
|
||||
Vec::new(),
|
||||
callback,
|
||||
DebounceConfig::default(),
|
||||
);
|
||||
|
||||
manager.on_query("alph".to_string());
|
||||
|
||||
let alpha_paths =
|
||||
wait_for_query_paths_matching(&captured, "alph", Duration::from_secs(2), |paths| {
|
||||
paths.iter().any(|path| path.ends_with("alpha.rs"))
|
||||
})
|
||||
.expect("timed out waiting for alph results");
|
||||
assert_eq!(
|
||||
alpha_paths,
|
||||
vec!["src/alpha.rs".to_string()],
|
||||
"expected only alpha for query 'alph'"
|
||||
);
|
||||
|
||||
manager.on_query("al".to_string());
|
||||
|
||||
let mut backspace_paths =
|
||||
wait_for_query_paths_matching(&captured, "al", Duration::from_secs(2), |paths| {
|
||||
paths.iter().any(|path| path.ends_with("alpha.rs"))
|
||||
&& paths.iter().any(|path| path.ends_with("alpine.rs"))
|
||||
})
|
||||
.expect("timed out waiting for backspace results");
|
||||
backspace_paths.sort();
|
||||
assert_eq!(
|
||||
backspace_paths,
|
||||
vec!["src/alpha.rs".to_string(), "src/alpine.rs".to_string()],
|
||||
"expected backspace to include both alpha and alpine"
|
||||
);
|
||||
}
|
||||
|
||||
fn wait_for_query_paths_matching(
|
||||
captured: &Arc<Mutex<Vec<(String, FileSearchResults, bool)>>>,
|
||||
query: &str,
|
||||
timeout: Duration,
|
||||
predicate: impl Fn(&[String]) -> bool,
|
||||
) -> Option<Vec<String>> {
|
||||
let start = Instant::now();
|
||||
while start.elapsed() < timeout {
|
||||
{
|
||||
let guard = captured
|
||||
.lock()
|
||||
.expect("captured results mutex poisoned while reading");
|
||||
if let Some((_, results, _)) = guard.iter().rev().find(|(q, _, _)| q == query) {
|
||||
let paths: Vec<String> = results.matches.iter().map(|m| m.path.clone()).collect();
|
||||
if predicate(&paths) {
|
||||
return Some(paths);
|
||||
}
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
None
|
||||
}
|
||||
245
codex-rs/file-search/tests/search_manager.rs
Normal file
245
codex-rs/file-search/tests/search_manager.rs
Normal file
@@ -0,0 +1,245 @@
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_file_search::search_manager::SearchItem;
|
||||
use codex_file_search::search_manager::SearchManager;
|
||||
|
||||
fn push(injector: &nucleo::Injector<SearchItem>, path: &str) {
|
||||
injector.push(
|
||||
SearchItem {
|
||||
path: path.to_string(),
|
||||
},
|
||||
|item, columns| {
|
||||
columns[0] = item.path.as_str().into();
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn search_manager_streams_results() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
|
||||
let tick_counter = Arc::new(AtomicUsize::new(0));
|
||||
let notify_counter = Arc::clone(&tick_counter);
|
||||
let notify = Arc::new(move || {
|
||||
notify_counter.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let limit = NonZeroUsize::new(10).unwrap();
|
||||
let threads = NonZeroUsize::new(2).unwrap();
|
||||
|
||||
let mut manager = SearchManager::new(
|
||||
"g",
|
||||
limit,
|
||||
temp_dir.path(),
|
||||
Vec::new(),
|
||||
threads,
|
||||
false,
|
||||
notify,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let injector = manager.injector();
|
||||
|
||||
push(&injector, "alpha.txt");
|
||||
manager.tick(Duration::from_millis(10));
|
||||
assert!(manager.current_results().matches.is_empty());
|
||||
|
||||
push(&injector, "subdir/gamma.rs");
|
||||
for _ in 0..50 {
|
||||
let status = manager.tick(Duration::from_millis(10));
|
||||
if !status.running {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let final_results = manager.current_results();
|
||||
assert!(
|
||||
final_results
|
||||
.matches
|
||||
.iter()
|
||||
.any(|m| m.path.ends_with("gamma.rs")),
|
||||
"expected to find gamma.rs; results={:?}",
|
||||
final_results
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| m.path.clone())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
assert!(tick_counter.load(Ordering::Relaxed) >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn search_manager_walk_finds_files() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let nested_dir = temp_dir.path().join("subdir");
|
||||
std::fs::create_dir_all(&nested_dir).unwrap();
|
||||
std::fs::write(nested_dir.join("gamma.rs"), "fn main() {}").unwrap();
|
||||
std::fs::write(temp_dir.path().join("alpha.txt"), "alpha").unwrap();
|
||||
|
||||
let notify_flag = Arc::new(AtomicUsize::new(0));
|
||||
let notify_counter = Arc::clone(¬ify_flag);
|
||||
let notify = Arc::new(move || {
|
||||
notify_counter.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let limit = NonZeroUsize::new(10).unwrap();
|
||||
let threads = NonZeroUsize::new(1).unwrap();
|
||||
|
||||
let mut manager = SearchManager::new(
|
||||
"gam",
|
||||
limit,
|
||||
temp_dir.path(),
|
||||
Vec::new(),
|
||||
threads,
|
||||
true,
|
||||
notify,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let mut found = false;
|
||||
|
||||
loop {
|
||||
let status = manager.tick(Duration::from_millis(20));
|
||||
let _ = notify_flag.swap(0, Ordering::AcqRel);
|
||||
let results = manager.current_results();
|
||||
if results.matches.iter().any(|m| m.path.ends_with("gamma.rs")) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
if !status.running && start.elapsed() > Duration::from_secs(1) {
|
||||
break;
|
||||
}
|
||||
if start.elapsed() > Duration::from_secs(5) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
found,
|
||||
"expected walker to find gamma.rs; matches: {:?}",
|
||||
manager
|
||||
.current_results()
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| m.path.clone())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn search_manager_walk_includes_directories() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let docs_dir = temp_dir.path().join("docs");
|
||||
let nested_dir = docs_dir.join("nested");
|
||||
std::fs::create_dir_all(&nested_dir).unwrap();
|
||||
|
||||
let notify_flag = Arc::new(AtomicUsize::new(0));
|
||||
let notify_counter = Arc::clone(¬ify_flag);
|
||||
let notify = Arc::new(move || {
|
||||
notify_counter.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let limit = NonZeroUsize::new(10).unwrap();
|
||||
let threads = NonZeroUsize::new(1).unwrap();
|
||||
|
||||
let mut manager = SearchManager::new(
|
||||
"docs",
|
||||
limit,
|
||||
temp_dir.path(),
|
||||
Vec::new(),
|
||||
threads,
|
||||
false,
|
||||
notify,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let mut found_directory = false;
|
||||
|
||||
loop {
|
||||
let status = manager.tick(Duration::from_millis(20));
|
||||
let _ = notify_flag.swap(0, Ordering::AcqRel);
|
||||
let results = manager.current_results();
|
||||
if results.matches.iter().any(|m| m.path == "docs/") {
|
||||
found_directory = true;
|
||||
break;
|
||||
}
|
||||
if !status.running && start.elapsed() > Duration::from_secs(1) {
|
||||
break;
|
||||
}
|
||||
if start.elapsed() > Duration::from_secs(5) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
found_directory,
|
||||
"expected walker to find directory matches; matches: {:?}",
|
||||
manager
|
||||
.current_results()
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| m.path.clone())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(unix)]
|
||||
fn search_manager_walk_follows_symlinked_directories() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let root_dir = temp_dir.path().join("root");
|
||||
std::fs::create_dir_all(&root_dir).unwrap();
|
||||
let target_dir = temp_dir.path().join("target");
|
||||
std::fs::create_dir_all(&target_dir).unwrap();
|
||||
std::fs::write(target_dir.join("gamma.rs"), "fn main() {}").unwrap();
|
||||
|
||||
std::os::unix::fs::symlink(&target_dir, root_dir.join("link")).unwrap();
|
||||
|
||||
let notify_flag = Arc::new(AtomicUsize::new(0));
|
||||
let notify_counter = Arc::clone(¬ify_flag);
|
||||
let notify = Arc::new(move || {
|
||||
notify_counter.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let limit = NonZeroUsize::new(10).unwrap();
|
||||
let threads = NonZeroUsize::new(1).unwrap();
|
||||
|
||||
let mut manager =
|
||||
SearchManager::new("gamma", limit, &root_dir, Vec::new(), threads, true, notify).unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let mut found = false;
|
||||
|
||||
loop {
|
||||
let status = manager.tick(Duration::from_millis(20));
|
||||
let _ = notify_flag.swap(0, Ordering::AcqRel);
|
||||
let results = manager.current_results();
|
||||
if results.matches.iter().any(|m| m.path == "link/gamma.rs") {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
if !status.running && start.elapsed() > Duration::from_secs(1) {
|
||||
break;
|
||||
}
|
||||
if start.elapsed() > Duration::from_secs(5) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
found,
|
||||
"expected walker to find symlinked file; matches: {:?}",
|
||||
manager
|
||||
.current_results()
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| m.path.clone())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user