This commit is contained in:
Jeremy Rose
2026-01-30 13:30:45 -08:00
parent d59685f6d4
commit ee9bed72d0
7 changed files with 472 additions and 26 deletions

View File

@@ -350,6 +350,18 @@ client_request_definitions! {
params: FuzzyFileSearchParams,
response: FuzzyFileSearchResponse,
},
FuzzyFileSearchSessionStart => "fuzzyFileSearch/sessionStart" {
params: FuzzyFileSearchSessionStartParams,
response: FuzzyFileSearchSessionStartResponse,
},
FuzzyFileSearchSessionUpdate => "fuzzyFileSearch/sessionUpdate" {
params: FuzzyFileSearchSessionUpdateParams,
response: FuzzyFileSearchSessionUpdateResponse,
},
FuzzyFileSearchSessionStop => "fuzzyFileSearch/sessionStop" {
params: FuzzyFileSearchSessionStopParams,
response: FuzzyFileSearchSessionStopResponse,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
params: v1::ExecOneOffCommandParams,
@@ -594,6 +606,47 @@ pub struct FuzzyFileSearchResponse {
pub files: Vec<FuzzyFileSearchResult>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionStartParams {
pub session_id: String,
pub roots: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionStartResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionUpdateParams {
pub session_id: String,
pub query: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionUpdateResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionStopParams {
pub session_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionStopResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionUpdatedNotification {
pub session_id: String,
pub query: String,
pub files: Vec<FuzzyFileSearchResult>,
}
server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
@@ -625,6 +678,7 @@ server_notification_definitions! {
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
FuzzyFileSearchSessionUpdated => "fuzzyFileSearch/sessionUpdated" (FuzzyFileSearchSessionUpdatedNotification),
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),

View File

@@ -1,7 +1,9 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::fuzzy_file_search::FuzzyFileSearchSession;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::fuzzy_file_search::start_fuzzy_file_search_session;
use crate::models::supported_models;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotification;
@@ -38,6 +40,12 @@ use codex_app_server_protocol::ForkConversationParams;
use codex_app_server_protocol::ForkConversationResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStartParams;
use codex_app_server_protocol::FuzzyFileSearchSessionStartResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStopParams;
use codex_app_server_protocol::FuzzyFileSearchSessionStopResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateParams;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateResponse;
use codex_app_server_protocol::GetAccountParams;
use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::GetAccountResponse;
@@ -272,6 +280,7 @@ pub(crate) struct CodexMessageProcessor {
pending_rollbacks: PendingRollbacks,
turn_summary_store: TurnSummaryStore,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
feedback: CodexFeedback,
}
@@ -328,6 +337,7 @@ impl CodexMessageProcessor {
pending_rollbacks: Arc::new(Mutex::new(HashMap::new())),
turn_summary_store: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
@@ -581,6 +591,18 @@ impl CodexMessageProcessor {
ClientRequest::FuzzyFileSearch { request_id, params } => {
self.fuzzy_file_search(request_id, params).await;
}
ClientRequest::FuzzyFileSearchSessionStart { request_id, params } => {
self.fuzzy_file_search_session_start(request_id, params)
.await;
}
ClientRequest::FuzzyFileSearchSessionUpdate { request_id, params } => {
self.fuzzy_file_search_session_update(request_id, params)
.await;
}
ClientRequest::FuzzyFileSearchSessionStop { request_id, params } => {
self.fuzzy_file_search_session_stop(request_id, params)
.await;
}
ClientRequest::OneOffCommandExec { request_id, params } => {
self.exec_one_off_command(request_id, params).await;
}
@@ -4591,6 +4613,89 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn fuzzy_file_search_session_start(
&mut self,
request_id: RequestId,
params: FuzzyFileSearchSessionStartParams,
) {
let FuzzyFileSearchSessionStartParams { session_id, roots } = params;
if session_id.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "sessionId must not be empty".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
let session =
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone());
match session {
Ok(session) => {
let mut sessions = self.fuzzy_search_sessions.lock().await;
sessions.insert(session_id, session);
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start fuzzy file search session: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn fuzzy_file_search_session_update(
&mut self,
request_id: RequestId,
params: FuzzyFileSearchSessionUpdateParams,
) {
let FuzzyFileSearchSessionUpdateParams { session_id, query } = params;
let found = {
let sessions = self.fuzzy_search_sessions.lock().await;
if let Some(session) = sessions.get(&session_id) {
session.update_query(query);
true
} else {
false
}
};
if !found {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("fuzzy file search session not found: {session_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
.await;
}
async fn fuzzy_file_search_session_stop(
&mut self,
request_id: RequestId,
params: FuzzyFileSearchSessionStopParams,
) {
let FuzzyFileSearchSessionStopParams { session_id } = params;
{
let mut sessions = self.fuzzy_search_sessions.lock().await;
sessions.remove(&session_id);
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
.await;
}
async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) {
if !self.config.feedback_enabled {
let error = JSONRPCErrorError {

View File

@@ -1,12 +1,18 @@
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::FuzzyFileSearchResult;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
use codex_app_server_protocol::ServerNotification;
use codex_file_search as file_search;
use tracing::warn;
use crate::outgoing_message::OutgoingMessageSender;
const MATCH_LIMIT: usize = 50;
const MAX_THREADS: usize = 12;
@@ -77,3 +83,144 @@ pub(crate) async fn run_fuzzy_file_search(
files
}
pub(crate) struct FuzzyFileSearchSession {
session: file_search::FileSearchSession,
shared: Arc<SessionShared>,
}
impl FuzzyFileSearchSession {
pub(crate) fn update_query(&self, query: String) {
if self.shared.canceled.load(Ordering::Relaxed) {
return;
}
{
#[expect(clippy::unwrap_used)]
let mut latest_query = self.shared.latest_query.lock().unwrap();
*latest_query = query.clone();
}
self.session.update_query(&query);
}
}
impl Drop for FuzzyFileSearchSession {
fn drop(&mut self) {
self.shared.canceled.store(true, Ordering::Relaxed);
}
}
pub(crate) fn start_fuzzy_file_search_session(
session_id: String,
roots: Vec<String>,
outgoing: Arc<OutgoingMessageSender>,
) -> anyhow::Result<FuzzyFileSearchSession> {
#[expect(clippy::expect_used)]
let limit = NonZero::new(MATCH_LIMIT).expect("MATCH_LIMIT should be a valid non-zero usize");
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cores.min(MAX_THREADS);
#[expect(clippy::expect_used)]
let threads = NonZero::new(threads.max(1)).expect("threads should be non-zero");
let search_dirs: Vec<PathBuf> = roots.iter().map(PathBuf::from).collect();
let canceled = Arc::new(AtomicBool::new(false));
let shared = Arc::new(SessionShared {
session_id,
latest_query: Mutex::new(String::new()),
outgoing,
runtime: tokio::runtime::Handle::current(),
canceled: canceled.clone(),
});
let reporter = Arc::new(SessionReporterImpl {
shared: shared.clone(),
});
let session = file_search::create_session(
search_dirs,
file_search::FileSearchOptions {
limit,
threads,
compute_indices: true,
..Default::default()
},
reporter,
Some(canceled),
)?;
Ok(FuzzyFileSearchSession { session, shared })
}
struct SessionShared {
session_id: String,
latest_query: Mutex<String>,
outgoing: Arc<OutgoingMessageSender>,
runtime: tokio::runtime::Handle,
canceled: Arc<AtomicBool>,
}
struct SessionReporterImpl {
shared: Arc<SessionShared>,
}
impl SessionReporterImpl {
fn send_snapshot(&self, snapshot: &file_search::FileSearchSnapshot) {
if self.shared.canceled.load(Ordering::Relaxed) {
return;
}
let query = {
#[expect(clippy::unwrap_used)]
self.shared.latest_query.lock().unwrap().clone()
};
if query.is_empty() || snapshot.query != query {
return;
}
let files = collect_files(snapshot);
let notification = ServerNotification::FuzzyFileSearchSessionUpdated(
FuzzyFileSearchSessionUpdatedNotification {
session_id: self.shared.session_id.clone(),
query,
files,
},
);
let outgoing = self.shared.outgoing.clone();
self.shared.runtime.spawn(async move {
outgoing.send_server_notification(notification).await;
});
}
}
impl file_search::SessionReporter for SessionReporterImpl {
fn on_update(&self, snapshot: &file_search::FileSearchSnapshot) {
self.send_snapshot(snapshot);
}
fn on_complete(&self) {}
}
fn collect_files(snapshot: &file_search::FileSearchSnapshot) -> Vec<FuzzyFileSearchResult> {
let mut files = snapshot
.matches
.iter()
.map(|m| {
let file_name = m.path.file_name().unwrap_or_default();
FuzzyFileSearchResult {
root: m.root.to_string_lossy().to_string(),
path: m.path.to_string_lossy().to_string(),
file_name: file_name.to_string_lossy().to_string(),
score: m.score,
indices: m.indices.clone(),
}
})
.collect::<Vec<_>>();
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
FuzzyFileSearchResult,
_,
_,
>(|f| f.score, |f| f.path.as_str()));
files
}

View File

@@ -599,6 +599,43 @@ impl McpProcess {
self.send_request("fuzzyFileSearch", Some(params)).await
}
pub async fn send_fuzzy_file_search_session_start_request(
&mut self,
session_id: &str,
roots: Vec<String>,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
"roots": roots,
});
self.send_request("fuzzyFileSearch/sessionStart", Some(params))
.await
}
pub async fn send_fuzzy_file_search_session_update_request(
&mut self,
session_id: &str,
query: &str,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
"query": query,
});
self.send_request("fuzzyFileSearch/sessionUpdate", Some(params))
.await
}
pub async fn send_fuzzy_file_search_session_stop_request(
&mut self,
session_id: &str,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
});
self.send_request("fuzzyFileSearch/sessionStop", Some(params))
.await
}
async fn send_request(
&mut self,
method: &str,

View File

@@ -125,3 +125,98 @@ async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_streams_updates() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-1";
let start_request_id = mcp
.send_fuzzy_file_search_session_start_request(session_id, vec![root_path.clone()])
.await?;
let _start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let update_request_id = mcp
.send_fuzzy_file_search_session_update_request(session_id, "alp")
.await?;
let _update_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
)
.await??;
let mut matched = false;
for _ in 0..5 {
let notif = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("fuzzyFileSearch/sessionUpdated"),
)
.await??;
let params = notif
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
assert_eq!(params["sessionId"], session_id);
assert_eq!(params["query"], "alp");
let files = params["files"]
.as_array()
.ok_or_else(|| anyhow!("files not array"))?;
if files.is_empty() {
continue;
}
assert_eq!(files.len(), 1);
assert_eq!(files[0]["root"], root_path);
assert_eq!(files[0]["path"], "alpha.txt");
matched = true;
break;
}
assert_eq!(matched, true);
let stop_request_id = mcp
.send_fuzzy_file_search_session_stop_request(session_id)
.await?;
let _stop_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(stop_request_id)),
)
.await??;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_before_start_errors() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_fuzzy_file_search_session_update_request("missing", "alp")
.await?;
let err = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert_eq!(
err.error.message,
"fuzzy file search session not found: missing"
);
Ok(())
}

View File

@@ -139,19 +139,6 @@ impl Drop for FileSearchSession {
}
pub fn create_session(
search_directory: &Path,
options: FileSearchOptions,
reporter: Arc<dyn SessionReporter>,
) -> anyhow::Result<FileSearchSession> {
create_session_inner(
vec![search_directory.to_path_buf()],
options,
reporter,
None,
)
}
fn create_session_inner(
search_directories: Vec<PathBuf>,
options: FileSearchOptions,
reporter: Arc<dyn SessionReporter>,
@@ -291,7 +278,7 @@ pub fn run(
cancel_flag: Option<Arc<AtomicBool>>,
) -> anyhow::Result<FileSearchResults> {
let reporter = Arc::new(RunReporter::default());
let session = create_session_inner(roots, options, reporter.clone(), cancel_flag)?;
let session = create_session(roots, options, reporter.clone(), cancel_flag)?;
session.update_query(pattern_text);
@@ -746,8 +733,13 @@ mod tests {
fn session_scanned_file_count_is_monotonic_across_queries() {
let dir = create_temp_tree(200);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("file-00");
thread::sleep(Duration::from_millis(20));
@@ -766,8 +758,13 @@ mod tests {
fn session_streams_updates_before_walk_complete() {
let dir = create_temp_tree(600);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("file-0");
let completed = reporter.wait_for_complete(Duration::from_secs(5));
@@ -783,8 +780,13 @@ mod tests {
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("alpha");
assert!(reporter.wait_for_complete(Duration::from_secs(5)));
@@ -809,7 +811,7 @@ mod tests {
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
let reporter = Arc::new(RecordingReporter::default());
let session = create_session_inner(
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
@@ -838,7 +840,7 @@ mod tests {
let cancel_flag = Arc::new(AtomicBool::new(false));
let reporter_a = Arc::new(RecordingReporter::default());
let session_a = create_session_inner(
let session_a = create_session(
vec![root_a.path().to_path_buf()],
FileSearchOptions::default(),
reporter_a,
@@ -847,7 +849,7 @@ mod tests {
.expect("session_a");
let reporter_b = Arc::new(RecordingReporter::default());
let session_b = create_session_inner(
let session_b = create_session(
vec![root_b.path().to_path_buf()],
FileSearchOptions::default(),
reporter_b.clone(),
@@ -869,8 +871,13 @@ mod tests {
fn session_emits_updates_when_query_changes() {
let dir = create_temp_tree(200);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("zzzzzzzz");
let completed = reporter.wait_for_complete(Duration::from_secs(5));

View File

@@ -70,12 +70,13 @@ impl FileSearchManager {
session_token,
});
let session = file_search::create_session(
&self.search_dir,
vec![self.search_dir.clone()],
file_search::FileSearchOptions {
compute_indices: true,
..Default::default()
},
reporter,
None,
);
match session {
Ok(session) => st.session = Some(session),