mirror of
https://github.com/openai/codex.git
synced 2026-05-18 02:02:30 +00:00
Compare commits
4 Commits
btraut/fix
...
xli-codex/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c3cb14fb5 | ||
|
|
1789ab69d1 | ||
|
|
8114282c02 | ||
|
|
fe7044a2aa |
@@ -589,7 +589,7 @@ client_request_definitions! {
|
||||
},
|
||||
SkillsList => "skills/list" {
|
||||
params: v2::SkillsListParams,
|
||||
serialization: global_shared_read("config"),
|
||||
serialization: None,
|
||||
response: v2::SkillsListResponse,
|
||||
},
|
||||
HooksList => "hooks/list" {
|
||||
@@ -794,7 +794,7 @@ client_request_definitions! {
|
||||
},
|
||||
ExperimentalFeatureList => "experimentalFeature/list" {
|
||||
params: v2::ExperimentalFeatureListParams,
|
||||
serialization: global("config"),
|
||||
serialization: global_shared_read("config"),
|
||||
response: v2::ExperimentalFeatureListResponse,
|
||||
},
|
||||
ExperimentalFeatureEnablementSet => "experimentalFeature/enablement/set" {
|
||||
@@ -1667,10 +1667,7 @@ mod tests {
|
||||
per_cwd_extra_user_roots: None,
|
||||
},
|
||||
};
|
||||
assert_eq!(
|
||||
skills_list.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
assert_eq!(skills_list.serialization_scope(), None);
|
||||
|
||||
let plugin_list = ClientRequest::PluginList {
|
||||
request_id: request_id(),
|
||||
@@ -1684,6 +1681,26 @@ mod tests {
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
|
||||
let experimental_feature_list = ClientRequest::ExperimentalFeatureList {
|
||||
request_id: request_id(),
|
||||
params: v2::ExperimentalFeatureListParams::default(),
|
||||
};
|
||||
assert_eq!(
|
||||
experimental_feature_list.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
|
||||
let experimental_feature_enablement_set = ClientRequest::ExperimentalFeatureEnablementSet {
|
||||
request_id: request_id(),
|
||||
params: v2::ExperimentalFeatureEnablementSetParams {
|
||||
enablement: Default::default(),
|
||||
},
|
||||
};
|
||||
assert_eq!(
|
||||
experimental_feature_enablement_set.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::Global("config"))
|
||||
);
|
||||
|
||||
let plugin_uninstall = ClientRequest::PluginUninstall {
|
||||
request_id: request_id(),
|
||||
params: v2::PluginUninstallParams {
|
||||
|
||||
@@ -20,6 +20,9 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use tokio::sync::RwLock as AsyncRwLock;
|
||||
use tokio::sync::RwLockReadGuard;
|
||||
use tokio::sync::RwLockWriteGuard;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -33,6 +36,7 @@ pub(crate) struct ConfigManager {
|
||||
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
thread_config_loader: Arc<RwLock<Arc<dyn ThreadConfigLoader>>>,
|
||||
shared_state: Arc<AsyncRwLock<()>>,
|
||||
}
|
||||
|
||||
impl ConfigManager {
|
||||
@@ -52,6 +56,7 @@ impl ConfigManager {
|
||||
cloud_requirements: Arc::new(RwLock::new(cloud_requirements)),
|
||||
arg0_paths,
|
||||
thread_config_loader: Arc::new(RwLock::new(thread_config_loader)),
|
||||
shared_state: Arc::new(AsyncRwLock::new(())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +78,14 @@ impl ConfigManager {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_shared_state(&self) -> RwLockReadGuard<'_, ()> {
|
||||
self.shared_state.read().await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_shared_state(&self) -> RwLockWriteGuard<'_, ()> {
|
||||
self.shared_state.write().await
|
||||
}
|
||||
|
||||
pub(crate) fn extend_runtime_feature_enablement<I>(&self, enablement: I) -> Result<(), ()>
|
||||
where
|
||||
I: IntoIterator<Item = (String, bool)>,
|
||||
|
||||
@@ -171,6 +171,7 @@ impl ConfigManager {
|
||||
&self,
|
||||
params: ConfigValueWriteParams,
|
||||
) -> Result<ConfigWriteResponse, ConfigManagerError> {
|
||||
let _guard = self.write_shared_state().await;
|
||||
let edits = vec![(params.key_path, params.value, params.merge_strategy)];
|
||||
self.apply_edits(params.file_path, params.expected_version, edits)
|
||||
.await
|
||||
@@ -180,6 +181,7 @@ impl ConfigManager {
|
||||
&self,
|
||||
params: ConfigBatchWriteParams,
|
||||
) -> Result<ConfigWriteResponse, ConfigManagerError> {
|
||||
let _guard = self.write_shared_state().await;
|
||||
let edits = params
|
||||
.edits
|
||||
.into_iter()
|
||||
|
||||
@@ -768,6 +768,8 @@ impl MessageProcessor {
|
||||
);
|
||||
|
||||
let serialization_scope = codex_request.serialization_scope();
|
||||
let serialization_method = codex_request.method();
|
||||
let serialization_request_id = connection_request_id.request_id.to_string();
|
||||
let app_server_client_name = session.app_server_client_name().map(str::to_string);
|
||||
let client_version = session.client_version().map(str::to_string);
|
||||
let device_key_requests_allowed = session.allows_device_key_requests();
|
||||
@@ -794,7 +796,8 @@ impl MessageProcessor {
|
||||
}
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
)
|
||||
.with_log_metadata(serialization_method, serialization_request_id);
|
||||
|
||||
if let Some(scope) = serialization_scope {
|
||||
let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::*;
|
||||
use futures::StreamExt;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CatalogRequestProcessor {
|
||||
@@ -12,6 +13,19 @@ pub(crate) struct CatalogRequestProcessor {
|
||||
|
||||
const SKILLS_LIST_CWD_CONCURRENCY: usize = 5;
|
||||
|
||||
enum PreparedSkillsListEntry {
|
||||
Ready {
|
||||
index: usize,
|
||||
cwd: PathBuf,
|
||||
skills_input: codex_core::skills::SkillsLoadInput,
|
||||
extra_roots: Vec<AbsolutePathBuf>,
|
||||
},
|
||||
Error {
|
||||
index: usize,
|
||||
entry: codex_app_server_protocol::SkillsListEntry,
|
||||
},
|
||||
}
|
||||
|
||||
fn skills_to_info(
|
||||
skills: &[codex_core::skills::SkillMetadata],
|
||||
disabled_paths: &HashSet<AbsolutePathBuf>,
|
||||
@@ -421,68 +435,102 @@ impl CatalogRequestProcessor {
|
||||
.extend(valid_extra_roots);
|
||||
}
|
||||
|
||||
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let workspace_codex_plugins_enabled = self
|
||||
.workspace_codex_plugins_enabled(&config, auth.as_ref())
|
||||
.await;
|
||||
let snapshot_started_at = Instant::now();
|
||||
let prepared_entries = {
|
||||
let _guard = self.config_manager.read_shared_state().await;
|
||||
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let workspace_codex_plugins_enabled = self
|
||||
.workspace_codex_plugins_enabled(&config, auth.as_ref())
|
||||
.await;
|
||||
let plugins_manager = self.thread_manager.plugins_manager();
|
||||
futures::stream::iter(cwds.into_iter().enumerate())
|
||||
.map(|(index, cwd)| {
|
||||
let config = &config;
|
||||
let extra_roots_by_cwd = &extra_roots_by_cwd;
|
||||
let plugins_manager = &plugins_manager;
|
||||
async move {
|
||||
let (cwd_abs, config_layer_stack) = match self
|
||||
.resolve_cwd_config(&cwd)
|
||||
.await
|
||||
{
|
||||
Ok(resolved) => resolved,
|
||||
Err(message) => {
|
||||
let error_path = cwd.clone();
|
||||
return PreparedSkillsListEntry::Error {
|
||||
index,
|
||||
entry: codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills: Vec::new(),
|
||||
errors: vec![codex_app_server_protocol::SkillErrorInfo {
|
||||
path: error_path,
|
||||
message,
|
||||
}],
|
||||
},
|
||||
};
|
||||
}
|
||||
};
|
||||
let extra_roots = extra_roots_by_cwd.get(&cwd).cloned().unwrap_or_default();
|
||||
let effective_skill_roots = if workspace_codex_plugins_enabled {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.effective_skill_roots_for_layer_stack(
|
||||
&config_layer_stack,
|
||||
&plugins_input,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
PreparedSkillsListEntry::Ready {
|
||||
index,
|
||||
cwd,
|
||||
skills_input: codex_core::skills::SkillsLoadInput::new(
|
||||
cwd_abs.clone(),
|
||||
effective_skill_roots,
|
||||
config_layer_stack,
|
||||
config.bundled_skills_enabled(),
|
||||
),
|
||||
extra_roots,
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(SKILLS_LIST_CWD_CONCURRENCY)
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
};
|
||||
warn!(
|
||||
elapsed_ms = snapshot_started_at.elapsed().as_millis(),
|
||||
entry_count = prepared_entries.len(),
|
||||
"skills/list prepared lower-level snapshot"
|
||||
);
|
||||
let skills_manager = self.thread_manager.skills_manager();
|
||||
let plugins_manager = self.thread_manager.plugins_manager();
|
||||
let fs = self
|
||||
.thread_manager
|
||||
.environment_manager()
|
||||
.default_environment()
|
||||
.map(|environment| environment.get_filesystem());
|
||||
let mut data = futures::stream::iter(cwds.into_iter().enumerate())
|
||||
.map(|(index, cwd)| {
|
||||
let config = &config;
|
||||
let extra_roots_by_cwd = &extra_roots_by_cwd;
|
||||
let mut data = futures::stream::iter(prepared_entries.into_iter())
|
||||
.map(|prepared_entry| {
|
||||
let fs = fs.clone();
|
||||
let plugins_manager = &plugins_manager;
|
||||
let skills_manager = &skills_manager;
|
||||
async move {
|
||||
let (cwd_abs, config_layer_stack) = match self.resolve_cwd_config(&cwd).await {
|
||||
Ok(resolved) => resolved,
|
||||
Err(message) => {
|
||||
let error_path = cwd.clone();
|
||||
return (
|
||||
index,
|
||||
codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills: Vec::new(),
|
||||
errors: vec![codex_app_server_protocol::SkillErrorInfo {
|
||||
path: error_path,
|
||||
message,
|
||||
}],
|
||||
},
|
||||
);
|
||||
let (index, cwd, skills_input, extra_roots) = match prepared_entry {
|
||||
PreparedSkillsListEntry::Ready {
|
||||
index,
|
||||
cwd,
|
||||
skills_input,
|
||||
extra_roots,
|
||||
} => (index, cwd, skills_input, extra_roots),
|
||||
PreparedSkillsListEntry::Error { index, entry } => {
|
||||
return (index, entry);
|
||||
}
|
||||
};
|
||||
let extra_roots = extra_roots_by_cwd
|
||||
.get(&cwd)
|
||||
.map_or(&[][..], std::vec::Vec::as_slice);
|
||||
let effective_skill_roots = if workspace_codex_plugins_enabled {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.effective_skill_roots_for_layer_stack(
|
||||
&config_layer_stack,
|
||||
&plugins_input,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let skills_input = codex_core::skills::SkillsLoadInput::new(
|
||||
cwd_abs.clone(),
|
||||
effective_skill_roots,
|
||||
config_layer_stack,
|
||||
config.bundled_skills_enabled(),
|
||||
);
|
||||
let outcome = skills_manager
|
||||
.skills_for_cwd_with_extra_user_roots(
|
||||
&skills_input,
|
||||
force_reload,
|
||||
extra_roots,
|
||||
&extra_roots,
|
||||
fs,
|
||||
)
|
||||
.await;
|
||||
@@ -585,6 +633,7 @@ impl CatalogRequestProcessor {
|
||||
&self,
|
||||
params: SkillsConfigWriteParams,
|
||||
) -> Result<SkillsConfigWriteResponse, JSONRPCErrorError> {
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
let SkillsConfigWriteParams {
|
||||
path,
|
||||
name,
|
||||
|
||||
@@ -363,13 +363,16 @@ impl ConfigRequestProcessor {
|
||||
return Ok(ExperimentalFeatureEnablementSetResponse { enablement });
|
||||
}
|
||||
|
||||
self.config_manager
|
||||
.extend_runtime_feature_enablement(
|
||||
enablement
|
||||
.iter()
|
||||
.map(|(name, enabled)| (name.clone(), *enabled)),
|
||||
)
|
||||
.map_err(|_| internal_error("failed to update feature enablement"))?;
|
||||
{
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
self.config_manager
|
||||
.extend_runtime_feature_enablement(
|
||||
enablement
|
||||
.iter()
|
||||
.map(|(name, enabled)| (name.clone(), *enabled)),
|
||||
)
|
||||
.map_err(|_| internal_error("failed to update feature enablement"))?;
|
||||
}
|
||||
|
||||
self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
self.reload_user_config().await;
|
||||
|
||||
@@ -51,6 +51,7 @@ impl MarketplaceRequestProcessor {
|
||||
&self,
|
||||
params: MarketplaceRemoveParams,
|
||||
) -> Result<MarketplaceRemoveResponse, JSONRPCErrorError> {
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
remove_marketplace(
|
||||
self.config.codex_home.to_path_buf(),
|
||||
CoreMarketplaceRemoveRequest {
|
||||
@@ -72,6 +73,7 @@ impl MarketplaceRequestProcessor {
|
||||
&self,
|
||||
params: MarketplaceUpgradeParams,
|
||||
) -> Result<MarketplaceUpgradeResponse, JSONRPCErrorError> {
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
let plugins_manager = self.thread_manager.plugins_manager();
|
||||
let MarketplaceUpgradeParams { marketplace_name } = params;
|
||||
@@ -105,6 +107,7 @@ impl MarketplaceRequestProcessor {
|
||||
&self,
|
||||
params: MarketplaceAddParams,
|
||||
) -> Result<MarketplaceAddResponse, JSONRPCErrorError> {
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
add_marketplace_to_codex_home(
|
||||
self.config.codex_home.to_path_buf(),
|
||||
MarketplaceAddRequest {
|
||||
|
||||
@@ -261,6 +261,7 @@ impl PluginRequestProcessor {
|
||||
&self,
|
||||
params: PluginInstallParams,
|
||||
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
self.plugin_install_response(params)
|
||||
.await
|
||||
.map(|response| Some(response.into()))
|
||||
@@ -270,6 +271,7 @@ impl PluginRequestProcessor {
|
||||
&self,
|
||||
params: PluginUninstallParams,
|
||||
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
||||
let _guard = self.config_manager.write_shared_state().await;
|
||||
self.plugin_uninstall_response(params)
|
||||
.await
|
||||
.map(|response| Some(response.into()))
|
||||
@@ -348,6 +350,7 @@ impl PluginRequestProcessor {
|
||||
&self,
|
||||
params: PluginListParams,
|
||||
) -> Result<PluginListResponse, JSONRPCErrorError> {
|
||||
let request_started_at = Instant::now();
|
||||
let plugins_manager = self.thread_manager.plugins_manager();
|
||||
let PluginListParams {
|
||||
cwds,
|
||||
@@ -358,35 +361,80 @@ impl PluginRequestProcessor {
|
||||
let marketplace_kinds =
|
||||
marketplace_kinds.unwrap_or_else(|| vec![PluginListMarketplaceKind::Local]);
|
||||
let include_local = marketplace_kinds.contains(&PluginListMarketplaceKind::Local);
|
||||
warn!(
|
||||
roots_count = roots.len(),
|
||||
explicit_marketplace_kinds,
|
||||
marketplace_kinds_count = marketplace_kinds.len(),
|
||||
include_local,
|
||||
"plugin/list timing started"
|
||||
);
|
||||
|
||||
let config_started_at = Instant::now();
|
||||
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
warn!(
|
||||
elapsed_ms = config_started_at.elapsed().as_millis(),
|
||||
plugins_enabled = config.features.enabled(Feature::Plugins),
|
||||
remote_plugins_enabled = config.features.enabled(Feature::RemotePlugin),
|
||||
"plugin/list timing loaded config"
|
||||
);
|
||||
let empty_response = || PluginListResponse {
|
||||
marketplaces: Vec::new(),
|
||||
marketplace_load_errors: Vec::new(),
|
||||
featured_plugin_ids: Vec::new(),
|
||||
};
|
||||
if !config.features.enabled(Feature::Plugins) {
|
||||
warn!(
|
||||
elapsed_ms = request_started_at.elapsed().as_millis(),
|
||||
"plugin/list timing completed with plugins disabled"
|
||||
);
|
||||
return Ok(empty_response());
|
||||
}
|
||||
let auth_started_at = Instant::now();
|
||||
let auth = self.auth_manager.auth().await;
|
||||
if !self
|
||||
warn!(
|
||||
elapsed_ms = auth_started_at.elapsed().as_millis(),
|
||||
has_auth = auth.is_some(),
|
||||
"plugin/list timing loaded auth"
|
||||
);
|
||||
let workspace_setting_started_at = Instant::now();
|
||||
let workspace_plugins_enabled = self
|
||||
.workspace_codex_plugins_enabled(&config, auth.as_ref())
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
warn!(
|
||||
elapsed_ms = workspace_setting_started_at.elapsed().as_millis(),
|
||||
workspace_plugins_enabled, "plugin/list timing checked workspace setting"
|
||||
);
|
||||
if !workspace_plugins_enabled {
|
||||
warn!(
|
||||
elapsed_ms = request_started_at.elapsed().as_millis(),
|
||||
"plugin/list timing completed with workspace plugins disabled"
|
||||
);
|
||||
return Ok(empty_response());
|
||||
}
|
||||
let plugins_input = config.plugins_config_input();
|
||||
let (mut data, marketplace_load_errors) = if include_local {
|
||||
let background_tasks_started_at = Instant::now();
|
||||
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
|
||||
&plugins_input,
|
||||
auth.clone(),
|
||||
&roots,
|
||||
Some(self.effective_plugins_changed_callback()),
|
||||
);
|
||||
warn!(
|
||||
elapsed_ms = background_tasks_started_at.elapsed().as_millis(),
|
||||
"plugin/list timing started background tasks"
|
||||
);
|
||||
|
||||
let config_for_marketplace_listing = plugins_input.clone();
|
||||
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
|
||||
let shared_plugin_ids_started_at = Instant::now();
|
||||
let shared_plugin_ids_by_local_path = load_shared_plugin_ids_by_local_path(&config);
|
||||
warn!(
|
||||
elapsed_ms = shared_plugin_ids_started_at.elapsed().as_millis(),
|
||||
shared_plugin_ids_count = shared_plugin_ids_by_local_path.len(),
|
||||
"plugin/list timing loaded shared plugin ids"
|
||||
);
|
||||
let local_listing_started_at = Instant::now();
|
||||
match tokio::task::spawn_blocking(move || {
|
||||
let outcome = plugins_manager_for_marketplace_listing
|
||||
.list_marketplaces_for_config(&config_for_marketplace_listing, &roots)?;
|
||||
@@ -447,7 +495,21 @@ impl PluginRequestProcessor {
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(outcome)) => outcome,
|
||||
Ok(Ok(outcome)) => {
|
||||
let plugin_count = outcome
|
||||
.0
|
||||
.iter()
|
||||
.map(|marketplace| marketplace.plugins.len())
|
||||
.sum::<usize>();
|
||||
warn!(
|
||||
elapsed_ms = local_listing_started_at.elapsed().as_millis(),
|
||||
marketplace_count = outcome.0.len(),
|
||||
plugin_count,
|
||||
load_error_count = outcome.1.len(),
|
||||
"plugin/list timing listed local marketplaces"
|
||||
);
|
||||
outcome
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
return Err(Self::marketplace_error(err, "list marketplace plugins"));
|
||||
}
|
||||
@@ -472,6 +534,7 @@ impl PluginRequestProcessor {
|
||||
remote_sources.push(RemoteMarketplaceSource::SharedWithMe);
|
||||
}
|
||||
if !remote_sources.is_empty() {
|
||||
let remote_marketplaces_started_at = Instant::now();
|
||||
let remote_plugin_service_config = RemotePluginServiceConfig {
|
||||
chatgpt_base_url: config.chatgpt_base_url.clone(),
|
||||
};
|
||||
@@ -483,6 +546,7 @@ impl PluginRequestProcessor {
|
||||
.await
|
||||
{
|
||||
Ok(remote_marketplaces) => {
|
||||
let fetched_remote_marketplace_count = remote_marketplaces.len();
|
||||
for remote_marketplace in remote_marketplaces
|
||||
.into_iter()
|
||||
.map(remote_marketplace_to_info)
|
||||
@@ -496,11 +560,25 @@ impl PluginRequestProcessor {
|
||||
data.push(remote_marketplace);
|
||||
}
|
||||
}
|
||||
warn!(
|
||||
elapsed_ms = remote_marketplaces_started_at.elapsed().as_millis(),
|
||||
remote_source_count = remote_sources.len(),
|
||||
fetched_remote_marketplace_count,
|
||||
merged_marketplace_count = data.len(),
|
||||
"plugin/list timing fetched remote marketplaces"
|
||||
);
|
||||
}
|
||||
Err(
|
||||
RemotePluginCatalogError::AuthRequired
|
||||
| RemotePluginCatalogError::UnsupportedAuthMode,
|
||||
) => {}
|
||||
err @ (RemotePluginCatalogError::AuthRequired
|
||||
| RemotePluginCatalogError::UnsupportedAuthMode),
|
||||
) => {
|
||||
warn!(
|
||||
elapsed_ms = remote_marketplaces_started_at.elapsed().as_millis(),
|
||||
remote_source_count = remote_sources.len(),
|
||||
error = %err,
|
||||
"plugin/list timing skipped remote marketplaces"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
error = %err,
|
||||
@@ -508,12 +586,15 @@ impl PluginRequestProcessor {
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("plugin/list timing skipped remote marketplaces");
|
||||
}
|
||||
|
||||
let featured_plugin_ids = if data
|
||||
let curated_marketplace_present = data
|
||||
.iter()
|
||||
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
|
||||
{
|
||||
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME);
|
||||
let featured_plugin_ids_started_at = Instant::now();
|
||||
let featured_plugin_ids = if curated_marketplace_present {
|
||||
match plugins_manager
|
||||
.featured_plugin_ids_for_config(&plugins_input, auth.as_ref())
|
||||
.await
|
||||
@@ -530,6 +611,24 @@ impl PluginRequestProcessor {
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
warn!(
|
||||
elapsed_ms = featured_plugin_ids_started_at.elapsed().as_millis(),
|
||||
curated_marketplace_present,
|
||||
featured_plugin_id_count = featured_plugin_ids.len(),
|
||||
"plugin/list timing loaded featured ids"
|
||||
);
|
||||
let plugin_count = data
|
||||
.iter()
|
||||
.map(|marketplace| marketplace.plugins.len())
|
||||
.sum::<usize>();
|
||||
warn!(
|
||||
elapsed_ms = request_started_at.elapsed().as_millis(),
|
||||
marketplace_count = data.len(),
|
||||
plugin_count,
|
||||
load_error_count = marketplace_load_errors.len(),
|
||||
featured_plugin_id_count = featured_plugin_ids.len(),
|
||||
"plugin/list timing completed"
|
||||
);
|
||||
|
||||
Ok(PluginListResponse {
|
||||
marketplaces: data,
|
||||
|
||||
@@ -4,9 +4,9 @@ use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_app_server_protocol::ClientRequestSerializationScope;
|
||||
use futures::future::join_all;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::Instrument;
|
||||
|
||||
@@ -106,6 +106,8 @@ impl RequestSerializationQueueKey {
|
||||
pub(crate) struct QueuedInitializedRequest {
|
||||
gate: Arc<ConnectionRpcGate>,
|
||||
future: BoxFutureUnit,
|
||||
method: String,
|
||||
request_id: String,
|
||||
}
|
||||
|
||||
impl QueuedInitializedRequest {
|
||||
@@ -116,11 +118,19 @@ impl QueuedInitializedRequest {
|
||||
Self {
|
||||
gate,
|
||||
future: Box::pin(future),
|
||||
method: "<unknown>".to_string(),
|
||||
request_id: "<unknown>".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_log_metadata(mut self, method: String, request_id: String) -> Self {
|
||||
self.method = method;
|
||||
self.request_id = request_id;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) async fn run(self) {
|
||||
let Self { gate, future } = self;
|
||||
let Self { gate, future, .. } = self;
|
||||
gate.run(future).await;
|
||||
}
|
||||
}
|
||||
@@ -128,11 +138,113 @@ impl QueuedInitializedRequest {
|
||||
struct QueuedSerializedRequest {
|
||||
access: RequestSerializationAccess,
|
||||
request: QueuedInitializedRequest,
|
||||
enqueued_at: Instant,
|
||||
}
|
||||
|
||||
impl QueuedSerializedRequest {
|
||||
async fn run(
|
||||
self,
|
||||
key: RequestSerializationQueueKey,
|
||||
batch_size: usize,
|
||||
queue_depth_after_pop: usize,
|
||||
) {
|
||||
let Self {
|
||||
access,
|
||||
request,
|
||||
enqueued_at,
|
||||
} = self;
|
||||
let method = request.method.clone();
|
||||
let request_id = request.request_id.clone();
|
||||
tracing::warn!(
|
||||
?key,
|
||||
?access,
|
||||
method,
|
||||
request_id,
|
||||
queue_wait_ms = enqueued_at.elapsed().as_millis(),
|
||||
batch_size,
|
||||
queue_depth_after_pop,
|
||||
"serialized request started"
|
||||
);
|
||||
|
||||
let started_at = Instant::now();
|
||||
request.run().await;
|
||||
|
||||
tracing::warn!(
|
||||
?key,
|
||||
?access,
|
||||
method,
|
||||
request_id,
|
||||
run_ms = started_at.elapsed().as_millis(),
|
||||
"serialized request completed"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct RequestSerializationQueueState {
|
||||
pending: VecDeque<QueuedSerializedRequest>,
|
||||
running_shared_reads: usize,
|
||||
exclusive_running: bool,
|
||||
}
|
||||
|
||||
impl RequestSerializationQueueState {
|
||||
fn enqueue(&mut self, request: QueuedSerializedRequest) {
|
||||
self.pending.push_back(request);
|
||||
}
|
||||
|
||||
fn take_ready_requests(&mut self) -> Vec<QueuedSerializedRequest> {
|
||||
if self.exclusive_running {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
match self.pending.front().map(|request| request.access) {
|
||||
Some(RequestSerializationAccess::Exclusive) if self.running_shared_reads == 0 => {
|
||||
let Some(request) = self.pending.pop_front() else {
|
||||
return Vec::new();
|
||||
};
|
||||
self.exclusive_running = true;
|
||||
vec![request]
|
||||
}
|
||||
Some(RequestSerializationAccess::SharedRead) => {
|
||||
let mut requests = Vec::new();
|
||||
while self
|
||||
.pending
|
||||
.front()
|
||||
.is_some_and(|request| request.access == RequestSerializationAccess::SharedRead)
|
||||
{
|
||||
let Some(request) = self.pending.pop_front() else {
|
||||
break;
|
||||
};
|
||||
self.running_shared_reads += 1;
|
||||
requests.push(request);
|
||||
}
|
||||
requests
|
||||
}
|
||||
Some(RequestSerializationAccess::Exclusive) | None => Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn complete(&mut self, access: RequestSerializationAccess) {
|
||||
match access {
|
||||
RequestSerializationAccess::Exclusive => {
|
||||
debug_assert!(self.exclusive_running);
|
||||
self.exclusive_running = false;
|
||||
}
|
||||
RequestSerializationAccess::SharedRead => {
|
||||
debug_assert!(self.running_shared_reads > 0);
|
||||
self.running_shared_reads -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_idle(&self) -> bool {
|
||||
self.pending.is_empty() && self.running_shared_reads == 0 && !self.exclusive_running
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct RequestSerializationQueues {
|
||||
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, VecDeque<QueuedSerializedRequest>>>>,
|
||||
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, RequestSerializationQueueState>>>,
|
||||
}
|
||||
|
||||
impl RequestSerializationQueues {
|
||||
@@ -142,62 +254,112 @@ impl RequestSerializationQueues {
|
||||
access: RequestSerializationAccess,
|
||||
request: QueuedInitializedRequest,
|
||||
) {
|
||||
let request = QueuedSerializedRequest { access, request };
|
||||
let should_spawn = {
|
||||
let mut queues = self.inner.lock().await;
|
||||
match queues.get_mut(&key) {
|
||||
Some(queue) => {
|
||||
queue.push_back(request);
|
||||
false
|
||||
}
|
||||
None => {
|
||||
let mut queue = VecDeque::new();
|
||||
queue.push_back(request);
|
||||
queues.insert(key.clone(), queue);
|
||||
true
|
||||
}
|
||||
}
|
||||
let method = request.method.clone();
|
||||
let request_id = request.request_id.clone();
|
||||
let request = QueuedSerializedRequest {
|
||||
access,
|
||||
request,
|
||||
enqueued_at: Instant::now(),
|
||||
};
|
||||
let (
|
||||
ready_requests,
|
||||
queue_depth_before,
|
||||
queued_exclusive_count,
|
||||
head_access,
|
||||
head_method,
|
||||
head_request_id,
|
||||
queue_depth_after_pop,
|
||||
) = {
|
||||
let mut queues = self.inner.lock().await;
|
||||
let queue = queues.entry(key.clone()).or_default();
|
||||
let queue_depth_before = queue.pending.len();
|
||||
let queued_exclusive_count = queue
|
||||
.pending
|
||||
.iter()
|
||||
.filter(|request| request.access == RequestSerializationAccess::Exclusive)
|
||||
.count();
|
||||
let head_request = queue.pending.front();
|
||||
let head_access = head_request.map(|request| request.access);
|
||||
let head_method = head_request
|
||||
.map(|request| request.request.method.clone())
|
||||
.unwrap_or_else(|| "<none>".to_string());
|
||||
let head_request_id = head_request
|
||||
.map(|request| request.request.request_id.clone())
|
||||
.unwrap_or_else(|| "<none>".to_string());
|
||||
queue.enqueue(request);
|
||||
let ready_requests = queue.take_ready_requests();
|
||||
(
|
||||
ready_requests,
|
||||
queue_depth_before,
|
||||
queued_exclusive_count,
|
||||
head_access,
|
||||
head_method,
|
||||
head_request_id,
|
||||
queue.pending.len(),
|
||||
)
|
||||
};
|
||||
tracing::warn!(
|
||||
?key,
|
||||
?access,
|
||||
method,
|
||||
request_id,
|
||||
queue_depth_before,
|
||||
queue_depth_after = queue_depth_before + 1,
|
||||
queued_exclusive_count,
|
||||
?head_access,
|
||||
head_method,
|
||||
head_request_id,
|
||||
"serialized request queued"
|
||||
);
|
||||
|
||||
if should_spawn {
|
||||
self.spawn_ready_requests(key, ready_requests, queue_depth_after_pop);
|
||||
}
|
||||
|
||||
fn spawn_ready_requests(
|
||||
&self,
|
||||
key: RequestSerializationQueueKey,
|
||||
requests: Vec<QueuedSerializedRequest>,
|
||||
queue_depth_after_pop: usize,
|
||||
) {
|
||||
let batch_size = requests.len();
|
||||
for request in requests {
|
||||
let queues = self.clone();
|
||||
let span = tracing::debug_span!("app_server.serialized_request_queue", ?key);
|
||||
tokio::spawn(async move { queues.drain(key).await }.instrument(span));
|
||||
let request_key = key.clone();
|
||||
let span = tracing::debug_span!("app_server.serialized_request_queue", ?request_key);
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let access = request.access;
|
||||
request
|
||||
.run(request_key.clone(), batch_size, queue_depth_after_pop)
|
||||
.await;
|
||||
queues.complete(request_key, access).await;
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn drain(self, key: RequestSerializationQueueKey) {
|
||||
loop {
|
||||
let requests = {
|
||||
let mut queues = self.inner.lock().await;
|
||||
let Some(queue) = queues.get_mut(&key) else {
|
||||
return;
|
||||
};
|
||||
match queue.pop_front() {
|
||||
Some(request) => {
|
||||
let access = request.access;
|
||||
let mut requests = vec![request];
|
||||
if access == RequestSerializationAccess::SharedRead {
|
||||
while queue.front().is_some_and(|request| {
|
||||
request.access == RequestSerializationAccess::SharedRead
|
||||
}) {
|
||||
let Some(request) = queue.pop_front() else {
|
||||
break;
|
||||
};
|
||||
requests.push(request);
|
||||
}
|
||||
}
|
||||
requests
|
||||
}
|
||||
None => {
|
||||
queues.remove(&key);
|
||||
return;
|
||||
}
|
||||
}
|
||||
async fn complete(
|
||||
&self,
|
||||
key: RequestSerializationQueueKey,
|
||||
access: RequestSerializationAccess,
|
||||
) {
|
||||
let (ready_requests, queue_depth_after_pop) = {
|
||||
let mut queues = self.inner.lock().await;
|
||||
let Some(queue) = queues.get_mut(&key) else {
|
||||
return;
|
||||
};
|
||||
queue.complete(access);
|
||||
let ready_requests = queue.take_ready_requests();
|
||||
let queue_depth_after_pop = queue.pending.len();
|
||||
let should_remove = queue.is_idle();
|
||||
if should_remove {
|
||||
queues.remove(&key);
|
||||
}
|
||||
(ready_requests, queue_depth_after_pop)
|
||||
};
|
||||
|
||||
join_all(requests.into_iter().map(|request| request.request.run())).await;
|
||||
}
|
||||
self.spawn_ready_requests(key, ready_requests, queue_depth_after_pop);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -504,6 +666,52 @@ mod tests {
|
||||
.expect("shared reads should still be waiting");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn later_shared_reads_join_running_shared_reads_without_queued_write() {
|
||||
let queues = RequestSerializationQueues::default();
|
||||
let key = RequestSerializationQueueKey::Global("test");
|
||||
let (first_read_started_tx, first_read_started_rx) = oneshot::channel::<()>();
|
||||
let (first_read_release_tx, first_read_release_rx) = oneshot::channel::<()>();
|
||||
let (later_read_started_tx, later_read_started_rx) = oneshot::channel::<()>();
|
||||
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::SharedRead,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
first_read_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
let _ = first_read_release_rx.await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
timeout(queue_drain_timeout(), first_read_started_rx)
|
||||
.await
|
||||
.expect("first read should start")
|
||||
.expect("sender should be open");
|
||||
|
||||
queues
|
||||
.enqueue(
|
||||
key,
|
||||
RequestSerializationAccess::SharedRead,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
later_read_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
timeout(queue_drain_timeout(), later_read_started_rx)
|
||||
.await
|
||||
.expect("later read should join running reads")
|
||||
.expect("sender should be open");
|
||||
first_read_release_tx
|
||||
.send(())
|
||||
.expect("first read should still be waiting");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exclusive_write_waits_for_running_shared_reads() {
|
||||
let queues = RequestSerializationQueues::default();
|
||||
|
||||
Reference in New Issue
Block a user