mirror of
https://github.com/openai/codex.git
synced 2026-05-28 15:00:16 +00:00
[codex] Parallelize skills list cwd loading (#21441)
## Summary - process `skills/list` cwd entries with bounded concurrency of 5 - preserve the caller's requested cwd order in the response - add coverage that verifies response ordering remains stable ## Why Cold-start desktop traces showed that `skills/list` can dominate the shared config queue when it scans many workspace roots serially. The expensive work is largely independent per cwd, so the request was paying the sum of all cwd costs instead of the cost of the slowest bounded batch. ## Impact This keeps current request semantics intact while reducing the wall-clock time of large multi-root `skills/list` calls. That should also reduce how long later config-family requests, such as `plugin/list`, wait behind `skills/list` during startup. ## Validation - `just fmt` - `cargo test -p codex-app-server` - `cargo test -p codex-app-server skills_list_preserves_requested_cwd_order`
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use futures::StreamExt;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CatalogRequestProcessor {
|
||||
@@ -9,6 +10,8 @@ pub(crate) struct CatalogRequestProcessor {
|
||||
pub(super) workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
|
||||
}
|
||||
|
||||
const SKILLS_LIST_CWD_CONCURRENCY: usize = 5;
|
||||
|
||||
fn skills_to_info(
|
||||
skills: &[codex_core::skills::SkillMetadata],
|
||||
disabled_paths: &HashSet<AbsolutePathBuf>,
|
||||
@@ -430,56 +433,76 @@ impl CatalogRequestProcessor {
|
||||
.environment_manager()
|
||||
.default_environment()
|
||||
.map(|environment| environment.get_filesystem());
|
||||
let mut data = Vec::new();
|
||||
for cwd in cwds {
|
||||
let (cwd_abs, config_layer_stack) = match self.resolve_cwd_config(&cwd).await {
|
||||
Ok(resolved) => resolved,
|
||||
Err(message) => {
|
||||
let error_path = cwd.clone();
|
||||
data.push(codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills: Vec::new(),
|
||||
errors: vec![codex_app_server_protocol::SkillErrorInfo {
|
||||
path: error_path,
|
||||
message,
|
||||
}],
|
||||
});
|
||||
continue;
|
||||
let mut data = futures::stream::iter(cwds.into_iter().enumerate())
|
||||
.map(|(index, cwd)| {
|
||||
let config = &config;
|
||||
let extra_roots_by_cwd = &extra_roots_by_cwd;
|
||||
let fs = fs.clone();
|
||||
let plugins_manager = &plugins_manager;
|
||||
let skills_manager = &skills_manager;
|
||||
async move {
|
||||
let (cwd_abs, config_layer_stack) = match self.resolve_cwd_config(&cwd).await {
|
||||
Ok(resolved) => resolved,
|
||||
Err(message) => {
|
||||
let error_path = cwd.clone();
|
||||
return (
|
||||
index,
|
||||
codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills: Vec::new(),
|
||||
errors: vec![codex_app_server_protocol::SkillErrorInfo {
|
||||
path: error_path,
|
||||
message,
|
||||
}],
|
||||
},
|
||||
);
|
||||
}
|
||||
};
|
||||
let extra_roots = extra_roots_by_cwd
|
||||
.get(&cwd)
|
||||
.map_or(&[][..], std::vec::Vec::as_slice);
|
||||
let effective_skill_roots = if workspace_codex_plugins_enabled {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.effective_skill_roots_for_layer_stack(
|
||||
&config_layer_stack,
|
||||
&plugins_input,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let skills_input = codex_core::skills::SkillsLoadInput::new(
|
||||
cwd_abs.clone(),
|
||||
effective_skill_roots,
|
||||
config_layer_stack,
|
||||
config.bundled_skills_enabled(),
|
||||
);
|
||||
let outcome = skills_manager
|
||||
.skills_for_cwd_with_extra_user_roots(
|
||||
&skills_input,
|
||||
force_reload,
|
||||
extra_roots,
|
||||
fs,
|
||||
)
|
||||
.await;
|
||||
let errors = errors_to_info(&outcome.errors);
|
||||
let skills = skills_to_info(&outcome.skills, &outcome.disabled_paths);
|
||||
(
|
||||
index,
|
||||
codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills,
|
||||
errors,
|
||||
},
|
||||
)
|
||||
}
|
||||
};
|
||||
let extra_roots = extra_roots_by_cwd
|
||||
.get(&cwd)
|
||||
.map_or(&[][..], std::vec::Vec::as_slice);
|
||||
let effective_skill_roots = if workspace_codex_plugins_enabled {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.effective_skill_roots_for_layer_stack(&config_layer_stack, &plugins_input)
|
||||
.await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let skills_input = codex_core::skills::SkillsLoadInput::new(
|
||||
cwd_abs.clone(),
|
||||
effective_skill_roots,
|
||||
config_layer_stack,
|
||||
config.bundled_skills_enabled(),
|
||||
);
|
||||
let outcome = skills_manager
|
||||
.skills_for_cwd_with_extra_user_roots(
|
||||
&skills_input,
|
||||
force_reload,
|
||||
extra_roots,
|
||||
fs.clone(),
|
||||
)
|
||||
.await;
|
||||
let errors = errors_to_info(&outcome.errors);
|
||||
let skills = skills_to_info(&outcome.skills, &outcome.disabled_paths);
|
||||
data.push(codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills,
|
||||
errors,
|
||||
});
|
||||
}
|
||||
})
|
||||
.buffer_unordered(SKILLS_LIST_CWD_CONCURRENCY)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
data.sort_unstable_by_key(|(index, _)| *index);
|
||||
let data = data.into_iter().map(|(_, entry)| entry).collect();
|
||||
Ok(SkillsListResponse { data })
|
||||
}
|
||||
|
||||
|
||||
@@ -533,6 +533,44 @@ async fn skills_list_accepts_relative_cwds() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_list_preserves_requested_cwd_order() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let first_cwd = TempDir::new()?;
|
||||
let second_cwd = TempDir::new()?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_skills_list_request(SkillsListParams {
|
||||
cwds: vec![
|
||||
first_cwd.path().to_path_buf(),
|
||||
second_cwd.path().to_path_buf(),
|
||||
],
|
||||
force_reload: true,
|
||||
per_cwd_extra_user_roots: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let SkillsListResponse { data } = to_response(response)?;
|
||||
assert_eq!(
|
||||
data.iter()
|
||||
.map(|entry| entry.cwd.clone())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
first_cwd.path().to_path_buf(),
|
||||
second_cwd.path().to_path_buf(),
|
||||
]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_list_ignores_per_cwd_extra_roots_for_unknown_cwd() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
|
||||
Reference in New Issue
Block a user