Compare commits

...

3 Commits

Author SHA1 Message Date
xli-oai
a745393622 mark experimental feature list as shared read 2026-05-07 02:50:14 -07:00
xli-oai
ff1ad6deae add request serialization queue timing logs 2026-05-07 02:36:02 -07:00
xli-oai
c2a4cae364 add plugin list timing logs 2026-05-06 22:18:20 -07:00
4 changed files with 244 additions and 20 deletions

View File

@@ -794,7 +794,7 @@ client_request_definitions! {
},
ExperimentalFeatureList => "experimentalFeature/list" {
params: v2::ExperimentalFeatureListParams,
serialization: global("config"),
serialization: global_shared_read("config"),
response: v2::ExperimentalFeatureListResponse,
},
ExperimentalFeatureEnablementSet => "experimentalFeature/enablement/set" {
@@ -1681,6 +1681,26 @@ mod tests {
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
);
let experimental_feature_list = ClientRequest::ExperimentalFeatureList {
request_id: request_id(),
params: v2::ExperimentalFeatureListParams::default(),
};
assert_eq!(
experimental_feature_list.serialization_scope(),
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
);
let experimental_feature_enablement_set = ClientRequest::ExperimentalFeatureEnablementSet {
request_id: request_id(),
params: v2::ExperimentalFeatureEnablementSetParams {
enablement: Default::default(),
},
};
assert_eq!(
experimental_feature_enablement_set.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let plugin_uninstall = ClientRequest::PluginUninstall {
request_id: request_id(),
params: v2::PluginUninstallParams {

View File

@@ -771,6 +771,8 @@ impl MessageProcessor {
);
let serialization_scope = codex_request.serialization_scope();
let serialization_method = codex_request.method();
let serialization_request_id = connection_request_id.request_id.to_string();
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
let device_key_requests_allowed = session.allows_device_key_requests();
@@ -797,7 +799,8 @@ impl MessageProcessor {
}
}
.instrument(span),
);
)
.with_log_metadata(serialization_method, serialization_request_id);
if let Some(scope) = serialization_scope {
let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope);

View File

@@ -348,6 +348,7 @@ impl PluginRequestProcessor {
&self,
params: PluginListParams,
) -> Result<PluginListResponse, JSONRPCErrorError> {
let request_started_at = Instant::now();
let plugins_manager = self.thread_manager.plugins_manager();
let PluginListParams {
cwds,
@@ -358,35 +359,80 @@ impl PluginRequestProcessor {
let marketplace_kinds =
marketplace_kinds.unwrap_or_else(|| vec![PluginListMarketplaceKind::Local]);
let include_local = marketplace_kinds.contains(&PluginListMarketplaceKind::Local);
info!(
roots_count = roots.len(),
explicit_marketplace_kinds,
marketplace_kinds_count = marketplace_kinds.len(),
include_local,
"plugin/list timing started"
);
let config_started_at = Instant::now();
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
info!(
elapsed_ms = config_started_at.elapsed().as_millis(),
plugins_enabled = config.features.enabled(Feature::Plugins),
remote_plugins_enabled = config.features.enabled(Feature::RemotePlugin),
"plugin/list timing loaded config"
);
let empty_response = || PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
};
if !config.features.enabled(Feature::Plugins) {
info!(
elapsed_ms = request_started_at.elapsed().as_millis(),
"plugin/list timing completed with plugins disabled"
);
return Ok(empty_response());
}
let auth_started_at = Instant::now();
let auth = self.auth_manager.auth().await;
if !self
info!(
elapsed_ms = auth_started_at.elapsed().as_millis(),
has_auth = auth.is_some(),
"plugin/list timing loaded auth"
);
let workspace_setting_started_at = Instant::now();
let workspace_plugins_enabled = self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await
{
.await;
info!(
elapsed_ms = workspace_setting_started_at.elapsed().as_millis(),
workspace_plugins_enabled, "plugin/list timing checked workspace setting"
);
if !workspace_plugins_enabled {
info!(
elapsed_ms = request_started_at.elapsed().as_millis(),
"plugin/list timing completed with workspace plugins disabled"
);
return Ok(empty_response());
}
let plugins_input = config.plugins_config_input();
let (mut data, marketplace_load_errors) = if include_local {
let background_tasks_started_at = Instant::now();
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
&plugins_input,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback()),
);
info!(
elapsed_ms = background_tasks_started_at.elapsed().as_millis(),
"plugin/list timing started background tasks"
);
let config_for_marketplace_listing = plugins_input.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
let shared_plugin_ids_started_at = Instant::now();
let shared_plugin_ids_by_local_path = load_shared_plugin_ids_by_local_path(&config);
info!(
elapsed_ms = shared_plugin_ids_started_at.elapsed().as_millis(),
shared_plugin_ids_count = shared_plugin_ids_by_local_path.len(),
"plugin/list timing loaded shared plugin ids"
);
let local_listing_started_at = Instant::now();
match tokio::task::spawn_blocking(move || {
let outcome = plugins_manager_for_marketplace_listing
.list_marketplaces_for_config(&config_for_marketplace_listing, &roots)?;
@@ -447,7 +493,21 @@ impl PluginRequestProcessor {
})
.await
{
Ok(Ok(outcome)) => outcome,
Ok(Ok(outcome)) => {
let plugin_count = outcome
.0
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum::<usize>();
info!(
elapsed_ms = local_listing_started_at.elapsed().as_millis(),
marketplace_count = outcome.0.len(),
plugin_count,
load_error_count = outcome.1.len(),
"plugin/list timing listed local marketplaces"
);
outcome
}
Ok(Err(err)) => {
return Err(Self::marketplace_error(err, "list marketplace plugins"));
}
@@ -472,6 +532,7 @@ impl PluginRequestProcessor {
remote_sources.push(RemoteMarketplaceSource::SharedWithMe);
}
if !remote_sources.is_empty() {
let remote_marketplaces_started_at = Instant::now();
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
@@ -483,6 +544,7 @@ impl PluginRequestProcessor {
.await
{
Ok(remote_marketplaces) => {
let fetched_remote_marketplace_count = remote_marketplaces.len();
for remote_marketplace in remote_marketplaces
.into_iter()
.map(remote_marketplace_to_info)
@@ -496,11 +558,25 @@ impl PluginRequestProcessor {
data.push(remote_marketplace);
}
}
info!(
elapsed_ms = remote_marketplaces_started_at.elapsed().as_millis(),
remote_source_count = remote_sources.len(),
fetched_remote_marketplace_count,
merged_marketplace_count = data.len(),
"plugin/list timing fetched remote marketplaces"
);
}
Err(
RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode,
) => {}
err @ (RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode),
) => {
info!(
elapsed_ms = remote_marketplaces_started_at.elapsed().as_millis(),
remote_source_count = remote_sources.len(),
error = %err,
"plugin/list timing skipped remote marketplaces"
);
}
Err(err) => {
warn!(
error = %err,
@@ -508,12 +584,15 @@ impl PluginRequestProcessor {
);
}
}
} else {
info!("plugin/list timing skipped remote marketplaces");
}
let featured_plugin_ids = if data
let curated_marketplace_present = data
.iter()
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
{
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME);
let featured_plugin_ids_started_at = Instant::now();
let featured_plugin_ids = if curated_marketplace_present {
match plugins_manager
.featured_plugin_ids_for_config(&plugins_input, auth.as_ref())
.await
@@ -530,6 +609,24 @@ impl PluginRequestProcessor {
} else {
Vec::new()
};
info!(
elapsed_ms = featured_plugin_ids_started_at.elapsed().as_millis(),
curated_marketplace_present,
featured_plugin_id_count = featured_plugin_ids.len(),
"plugin/list timing loaded featured ids"
);
let plugin_count = data
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum::<usize>();
info!(
elapsed_ms = request_started_at.elapsed().as_millis(),
marketplace_count = data.len(),
plugin_count,
load_error_count = marketplace_load_errors.len(),
featured_plugin_id_count = featured_plugin_ids.len(),
"plugin/list timing completed"
);
Ok(PluginListResponse {
marketplaces: data,

View File

@@ -4,6 +4,7 @@ use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use codex_app_server_protocol::ClientRequestSerializationScope;
use futures::future::join_all;
@@ -106,6 +107,8 @@ impl RequestSerializationQueueKey {
pub(crate) struct QueuedInitializedRequest {
gate: Arc<ConnectionRpcGate>,
future: BoxFutureUnit,
method: String,
request_id: String,
}
impl QueuedInitializedRequest {
@@ -116,11 +119,19 @@ impl QueuedInitializedRequest {
Self {
gate,
future: Box::pin(future),
method: "<unknown>".to_string(),
request_id: "<unknown>".to_string(),
}
}
pub(crate) fn with_log_metadata(mut self, method: String, request_id: String) -> Self {
self.method = method;
self.request_id = request_id;
self
}
pub(crate) async fn run(self) {
let Self { gate, future } = self;
let Self { gate, future, .. } = self;
gate.run(future).await;
}
}
@@ -128,6 +139,46 @@ impl QueuedInitializedRequest {
struct QueuedSerializedRequest {
access: RequestSerializationAccess,
request: QueuedInitializedRequest,
enqueued_at: Instant,
}
impl QueuedSerializedRequest {
async fn run(
self,
key: RequestSerializationQueueKey,
batch_size: usize,
queue_depth_after_pop: usize,
) {
let Self {
access,
request,
enqueued_at,
} = self;
let method = request.method.clone();
let request_id = request.request_id.clone();
tracing::info!(
?key,
?access,
method,
request_id,
queue_wait_ms = enqueued_at.elapsed().as_millis(),
batch_size,
queue_depth_after_pop,
"serialized request started"
);
let started_at = Instant::now();
request.run().await;
tracing::info!(
?key,
?access,
method,
request_id,
run_ms = started_at.elapsed().as_millis(),
"serialized request completed"
);
}
}
#[derive(Clone, Default)]
@@ -142,23 +193,70 @@ impl RequestSerializationQueues {
access: RequestSerializationAccess,
request: QueuedInitializedRequest,
) {
let request = QueuedSerializedRequest { access, request };
let should_spawn = {
let method = request.method.clone();
let request_id = request.request_id.clone();
let request = QueuedSerializedRequest {
access,
request,
enqueued_at: Instant::now(),
};
let (
should_spawn,
queue_depth_before,
queued_exclusive_count,
head_access,
head_method,
head_request_id,
) = {
let mut queues = self.inner.lock().await;
match queues.get_mut(&key) {
Some(queue) => {
let queue_depth_before = queue.len();
let queued_exclusive_count = queue
.iter()
.filter(|request| request.access == RequestSerializationAccess::Exclusive)
.count();
let head_request = queue.front();
let head_access = head_request.map(|request| request.access);
let head_method = head_request
.map(|request| request.request.method.clone())
.unwrap_or_else(|| "<none>".to_string());
let head_request_id = head_request
.map(|request| request.request.request_id.clone())
.unwrap_or_else(|| "<none>".to_string());
queue.push_back(request);
false
(
false,
queue_depth_before,
queued_exclusive_count,
head_access,
head_method,
head_request_id,
)
}
None => {
let mut queue = VecDeque::new();
queue.push_back(request);
queues.insert(key.clone(), queue);
true
(true, 0, 0, None, "<none>".to_string(), "<none>".to_string())
}
}
};
tracing::info!(
?key,
?access,
method,
request_id,
queue_depth_before,
queue_depth_after = queue_depth_before + 1,
queued_exclusive_count,
?head_access,
head_method,
head_request_id,
"serialized request queued"
);
if should_spawn {
let queues = self.clone();
let span = tracing::debug_span!("app_server.serialized_request_queue", ?key);
@@ -168,7 +266,7 @@ impl RequestSerializationQueues {
async fn drain(self, key: RequestSerializationQueueKey) {
loop {
let requests = {
let (requests, queue_depth_after_pop) = {
let mut queues = self.inner.lock().await;
let Some(queue) = queues.get_mut(&key) else {
return;
@@ -187,7 +285,7 @@ impl RequestSerializationQueues {
requests.push(request);
}
}
requests
(requests, queue.len())
}
None => {
queues.remove(&key);
@@ -196,7 +294,13 @@ impl RequestSerializationQueues {
}
};
join_all(requests.into_iter().map(|request| request.request.run())).await;
let batch_size = requests.len();
join_all(
requests
.into_iter()
.map(|request| request.run(key.clone(), batch_size, queue_depth_after_pop)),
)
.await;
}
}
}