Compare commits

...

4 Commits

Author SHA1 Message Date
xli-oai
965237c280 Add plugin read detail logs 2026-05-18 17:14:28 -07:00
xli-oai
9778c48849 Include API paths in plugin latency labels 2026-05-18 17:14:28 -07:00
xli-oai
371b757a88 Add production plugin API latency probe 2026-05-18 17:14:28 -07:00
xli-oai
c33adf538b Add latency logs for plugin and mention flows 2026-05-18 17:14:28 -07:00
10 changed files with 1251 additions and 42 deletions

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;
use codex_app_server_protocol::FuzzyFileSearchMatchType;
use codex_app_server_protocol::FuzzyFileSearchResult;
@@ -26,6 +27,7 @@ pub(crate) async fn run_fuzzy_file_search(
if roots.is_empty() {
return Vec::new();
}
let started_at = Instant::now();
#[expect(clippy::expect_used)]
let limit = NonZero::new(MATCH_LIMIT).expect("MATCH_LIMIT should be a valid non-zero usize");
@@ -87,6 +89,11 @@ pub(crate) async fn run_fuzzy_file_search(
_,
>(|f| f.score, |f| f.path.as_str()));
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
file_count = files.len(),
"fuzzy file search completed"
);
files
}
@@ -105,6 +112,11 @@ impl FuzzyFileSearchSession {
let mut latest_query = self.shared.latest_query.lock().unwrap();
*latest_query = query.clone();
}
{
#[expect(clippy::unwrap_used)]
let mut latest_query_started_at = self.shared.latest_query_started_at.lock().unwrap();
*latest_query_started_at = Some(Instant::now());
}
self.session.update_query(&query);
}
}
@@ -133,6 +145,7 @@ pub(crate) fn start_fuzzy_file_search_session(
let shared = Arc::new(SessionShared {
session_id,
latest_query_started_at: Mutex::new(None),
latest_query: Mutex::new(String::new()),
outgoing,
runtime: tokio::runtime::Handle::current(),
@@ -159,6 +172,7 @@ pub(crate) fn start_fuzzy_file_search_session(
struct SessionShared {
session_id: String,
latest_query_started_at: Mutex<Option<Instant>>,
latest_query: Mutex<String>,
outgoing: Arc<OutgoingMessageSender>,
runtime: tokio::runtime::Handle,
@@ -179,6 +193,14 @@ impl SessionReporterImpl {
#[expect(clippy::unwrap_used)]
self.shared.latest_query.lock().unwrap().clone()
};
let elapsed_ms = {
#[expect(clippy::unwrap_used)]
self.shared
.latest_query_started_at
.lock()
.unwrap()
.map(|started_at| started_at.elapsed().as_millis())
};
if snapshot.query != query {
return;
}
@@ -188,6 +210,13 @@ impl SessionReporterImpl {
} else {
collect_files(snapshot)
};
tracing::info!(
elapsed_ms = ?elapsed_ms,
session_id = %self.shared.session_id,
query_len = query.len(),
file_count = files.len(),
"fuzzy file search session snapshot delivered"
);
let notification = ServerNotification::FuzzyFileSearchSessionUpdated(
FuzzyFileSearchSessionUpdatedNotification {

View File

@@ -1100,13 +1100,13 @@ impl MessageProcessor {
self.marketplace_processor.marketplace_upgrade(params).await
}
ClientRequest::PluginList { params, .. } => {
self.plugin_processor.plugin_list(params).await
self.plugin_processor.plugin_list(&request_id, params).await
}
ClientRequest::PluginInstalled { params, .. } => {
self.plugin_processor.plugin_installed(params).await
}
ClientRequest::PluginRead { params, .. } => {
self.plugin_processor.plugin_read(params).await
self.plugin_processor.plugin_read(&request_id, params).await
}
ClientRequest::PluginSkillRead { params, .. } => {
self.plugin_processor.plugin_skill_read(params).await

View File

@@ -13,6 +13,7 @@ use codex_mcp::McpOAuthLoginSupport;
use codex_mcp::oauth_login_support;
use codex_mcp::should_retry_without_scopes;
use codex_rmcp_client::perform_oauth_login_silent;
use std::time::Instant;
#[derive(Clone)]
pub(crate) struct PluginRequestProcessor {
@@ -295,9 +296,10 @@ impl PluginRequestProcessor {
pub(crate) async fn plugin_list(
&self,
request_id: &ConnectionRequestId,
params: PluginListParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.plugin_list_response(params)
self.plugin_list_response(request_id, params)
.await
.map(|response| Some(response.into()))
}
@@ -313,9 +315,10 @@ impl PluginRequestProcessor {
pub(crate) async fn plugin_read(
&self,
request_id: &ConnectionRequestId,
params: PluginReadParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.plugin_read_response(params)
self.plugin_read_response(request_id, params)
.await
.map(|response| Some(response.into()))
}
@@ -463,8 +466,11 @@ impl PluginRequestProcessor {
async fn plugin_list_response(
&self,
request_id: &ConnectionRequestId,
params: PluginListParams,
) -> Result<PluginListResponse, JSONRPCErrorError> {
let request_id = request_id.request_id.to_string();
let started_at = Instant::now();
let plugins_manager = self.thread_manager.plugins_manager();
let PluginListParams {
cwds,
@@ -475,33 +481,85 @@ impl PluginRequestProcessor {
let marketplace_kinds =
marketplace_kinds.unwrap_or_else(|| vec![PluginListMarketplaceKind::Local]);
let include_local = marketplace_kinds.contains(&PluginListMarketplaceKind::Local);
tracing::info!(
request_id = %request_id,
root_count = roots.len(),
explicit_marketplace_kinds,
marketplace_kinds = ?marketplace_kinds,
include_local,
"plugin/list request started"
);
let config_started_at = Instant::now();
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
tracing::info!(
request_id = %request_id,
elapsed_ms = config_started_at.elapsed().as_millis(),
plugins_enabled = config.features.enabled(Feature::Plugins),
remote_plugins_enabled = config.features.enabled(Feature::RemotePlugin),
plugin_sharing_enabled = config.features.enabled(Feature::PluginSharing),
"plugin/list loaded config"
);
let empty_response = || PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
};
if !config.features.enabled(Feature::Plugins) {
tracing::info!(
request_id = %request_id,
elapsed_ms = started_at.elapsed().as_millis(),
"plugin/list completed with plugins feature disabled"
);
return Ok(empty_response());
}
let auth_started_at = Instant::now();
let auth = self.auth_manager.auth().await;
tracing::info!(
request_id = %request_id,
elapsed_ms = auth_started_at.elapsed().as_millis(),
has_auth = auth.is_some(),
"plugin/list loaded auth state"
);
let workspace_setting_started_at = Instant::now();
if !self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await
{
tracing::info!(
request_id = %request_id,
elapsed_ms = started_at.elapsed().as_millis(),
workspace_check_elapsed_ms = workspace_setting_started_at.elapsed().as_millis(),
"plugin/list completed with workspace Codex plugins disabled"
);
return Ok(empty_response());
}
tracing::info!(
request_id = %request_id,
elapsed_ms = workspace_setting_started_at.elapsed().as_millis(),
"plugin/list checked workspace Codex plugins setting"
);
let plugins_input = config.plugins_config_input();
if include_local || marketplace_kinds.contains(&PluginListMarketplaceKind::SharedWithMe) {
let background_tasks_started_at = Instant::now();
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
&plugins_input,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback()),
);
tracing::info!(
request_id = %request_id,
elapsed_us = background_tasks_started_at.elapsed().as_micros(),
root_count = roots.len(),
include_local,
include_shared_with_me =
marketplace_kinds.contains(&PluginListMarketplaceKind::SharedWithMe),
"plugin/list scheduled background refresh tasks"
);
}
let (mut data, marketplace_load_errors) = if include_local {
let local_started_at = Instant::now();
let config_for_marketplace_listing = plugins_input.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
let shared_plugin_ids_by_local_path = load_shared_plugin_ids_by_local_path(&config)?;
@@ -550,11 +608,36 @@ impl PluginRequestProcessor {
})
.await
{
Ok(Ok(outcome)) => outcome,
Ok(Ok(outcome)) => {
let plugin_count: usize = outcome
.0
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum();
tracing::info!(
request_id = %request_id,
elapsed_ms = local_started_at.elapsed().as_millis(),
marketplace_count = outcome.0.len(),
plugin_count,
marketplace_load_error_count = outcome.1.len(),
"plugin/list local marketplace listing completed"
);
outcome
}
Ok(Err(err)) => {
tracing::warn!(
request_id = %request_id,
elapsed_ms = local_started_at.elapsed().as_millis(),
"plugin/list local marketplace listing failed: {err}"
);
return Err(Self::marketplace_error(err, "list marketplace plugins"));
}
Err(err) => {
tracing::warn!(
request_id = %request_id,
elapsed_ms = local_started_at.elapsed().as_millis(),
"plugin/list local marketplace listing task failed: {err}"
);
return Err(internal_error(format!(
"failed to list marketplace plugins: {err}"
)));
@@ -576,10 +659,23 @@ impl PluginRequestProcessor {
{
remote_sources.push(RemoteMarketplaceSource::SharedWithMe);
}
if !remote_sources.is_empty() {
if remote_sources.is_empty() {
tracing::info!(
request_id = %request_id,
remote_plugins_enabled = config.features.enabled(Feature::RemotePlugin),
plugin_sharing_enabled = config.features.enabled(Feature::PluginSharing),
"plugin/list remote marketplace fetch skipped"
);
} else {
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
let remote_started_at = Instant::now();
tracing::info!(
request_id = %request_id,
remote_sources = ?remote_sources,
"plugin/list remote marketplace fetch started"
);
match codex_core_plugins::remote::fetch_remote_marketplaces(
&remote_plugin_service_config,
auth.as_ref(),
@@ -588,6 +684,17 @@ impl PluginRequestProcessor {
.await
{
Ok(remote_marketplaces) => {
let remote_plugin_count: usize = remote_marketplaces
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum();
tracing::info!(
request_id = %request_id,
elapsed_ms = remote_started_at.elapsed().as_millis(),
marketplace_count = remote_marketplaces.len(),
plugin_count = remote_plugin_count,
"plugin/list remote marketplace fetch completed"
);
for remote_marketplace in remote_marketplaces
.into_iter()
.map(remote_marketplace_to_info)
@@ -606,16 +713,35 @@ impl PluginRequestProcessor {
err @ (RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode),
) if explicit_marketplace_kinds => {
warn!(
request_id = %request_id,
error = %err,
elapsed_ms = remote_started_at.elapsed().as_millis(),
"plugin/list remote plugin catalog fetch failed for explicit remote request"
);
return Err(remote_plugin_catalog_error_to_jsonrpc(
err,
"list remote plugin catalog",
));
}
Err(
RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode,
) => {}
err @ (RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode),
) => {
tracing::info!(
request_id = %request_id,
error = %err,
elapsed_ms = remote_started_at.elapsed().as_millis(),
"plugin/list remote plugin catalog fetch skipped because auth is unavailable"
);
}
Err(err) if explicit_marketplace_kinds => {
warn!(
request_id = %request_id,
error = %err,
elapsed_ms = remote_started_at.elapsed().as_millis(),
"plugin/list remote plugin catalog fetch failed for explicit remote request"
);
return Err(remote_plugin_catalog_error_to_jsonrpc(
err,
"list remote plugin catalog",
@@ -623,34 +749,65 @@ impl PluginRequestProcessor {
}
Err(err) => {
warn!(
request_id = %request_id,
error = %err,
elapsed_ms = remote_started_at.elapsed().as_millis(),
"plugin/list remote plugin catalog fetch failed; returning local marketplaces only"
);
}
}
}
let featured_plugin_ids = if data
let has_openai_curated_marketplace = data
.iter()
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
{
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME);
let featured_plugin_ids = if has_openai_curated_marketplace {
let featured_started_at = Instant::now();
match plugins_manager
.featured_plugin_ids_for_config(&plugins_input, auth.as_ref())
.await
{
Ok(featured_plugin_ids) => featured_plugin_ids,
Ok(featured_plugin_ids) => {
tracing::info!(
request_id = %request_id,
elapsed_ms = featured_started_at.elapsed().as_millis(),
featured_plugin_id_count = featured_plugin_ids.len(),
"plugin/list featured plugin ids fetched"
);
featured_plugin_ids
}
Err(err) => {
warn!(
request_id = %request_id,
error = %err,
elapsed_ms = featured_started_at.elapsed().as_millis(),
"plugin/list featured plugin fetch failed; returning empty featured ids"
);
Vec::new()
}
}
} else {
tracing::info!(
request_id = %request_id,
marketplace_count = data.len(),
"plugin/list featured plugin fetch skipped because curated marketplace is absent"
);
Vec::new()
};
let plugin_count: usize = data
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum();
tracing::info!(
request_id = %request_id,
elapsed_ms = started_at.elapsed().as_millis(),
marketplace_count = data.len(),
plugin_count,
marketplace_load_error_count = marketplace_load_errors.len(),
featured_plugin_id_count = featured_plugin_ids.len(),
"plugin/list request completed"
);
Ok(PluginListResponse {
marketplaces: data,
marketplace_load_errors,
@@ -847,8 +1004,11 @@ impl PluginRequestProcessor {
async fn plugin_read_response(
&self,
request_id: &ConnectionRequestId,
params: PluginReadParams,
) -> Result<PluginReadResponse, JSONRPCErrorError> {
let request_id = request_id.request_id.to_string();
let started_at = Instant::now();
let plugins_manager = self.thread_manager.plugins_manager();
let PluginReadParams {
marketplace_path,
@@ -859,30 +1019,116 @@ impl PluginRequestProcessor {
(Some(marketplace_path), None) => Ok(marketplace_path),
(None, Some(remote_marketplace_name)) => Err(remote_marketplace_name),
(Some(_), Some(_)) | (None, None) => {
tracing::warn!(
request_id = %request_id,
elapsed_us = started_at.elapsed().as_micros(),
"plugin/read rejected request with invalid source selector"
);
return Err(invalid_request(
"plugin/read requires exactly one of marketplacePath or remoteMarketplaceName",
));
}
};
let source_kind = if read_source.is_ok() {
"local"
} else {
"remote"
};
tracing::info!(
request_id = %request_id,
source_kind,
plugin_name = %plugin_name,
marketplace_path = ?read_source.as_ref().ok(),
remote_marketplace_name = ?read_source.as_ref().err(),
"plugin/read request started"
);
let config_cwd = read_source.as_ref().ok().and_then(|marketplace_path| {
marketplace_path.as_path().parent().map(Path::to_path_buf)
});
let config_started_at = Instant::now();
let config = self.load_latest_config(config_cwd).await?;
tracing::info!(
request_id = %request_id,
elapsed_ms = config_started_at.elapsed().as_millis(),
plugins_enabled = config.features.enabled(Feature::Plugins),
remote_plugins_enabled = config.features.enabled(Feature::RemotePlugin),
plugin_sharing_enabled = config.features.enabled(Feature::PluginSharing),
"plugin/read loaded config"
);
let plugins_input = config.plugins_config_input();
let plugin = match read_source {
Ok(marketplace_path) => {
let marketplace_path_for_log = marketplace_path.clone();
let local_started_at = Instant::now();
let request = PluginReadRequest {
plugin_name,
marketplace_path,
};
let outcome = plugins_manager
tracing::info!(
request_id = %request_id,
marketplace_path = %marketplace_path_for_log.display(),
plugin_name = %request.plugin_name,
"plugin/read local marketplace read started"
);
let outcome = match plugins_manager
.read_plugin_for_config(&plugins_input, &request)
.await
.map_err(|err| Self::marketplace_error(err, "read plugin details"))?;
{
Ok(outcome) => {
tracing::info!(
request_id = %request_id,
elapsed_ms = local_started_at.elapsed().as_millis(),
plugin_id = ?outcome.plugin.id,
marketplace_name = %outcome.marketplace_name,
installed = outcome.plugin.installed,
enabled = outcome.plugin.enabled,
availability = ?PluginAvailability::Available,
install_policy = ?outcome.plugin.policy.installation,
auth_policy = ?outcome.plugin.policy.authentication,
has_description = outcome.plugin.description.is_some(),
has_interface = outcome.plugin.interface.is_some(),
keyword_count = outcome.plugin.keywords.len(),
skill_count = outcome.plugin.skills.len(),
hook_count = outcome.plugin.hooks.len(),
app_count = outcome.plugin.apps.len(),
mcp_server_count = outcome.plugin.mcp_server_names.len(),
"plugin/read local marketplace read completed"
);
outcome
}
Err(err) => {
tracing::warn!(
request_id = %request_id,
elapsed_ms = local_started_at.elapsed().as_millis(),
"plugin/read local marketplace read failed: {err}"
);
return Err(Self::marketplace_error(err, "read plugin details"));
}
};
let share_mapping_started_at = Instant::now();
let shared_plugin_ids_by_local_path =
load_shared_plugin_ids_by_local_path(&config)?;
match load_shared_plugin_ids_by_local_path(&config) {
Ok(shared_plugin_ids_by_local_path) => {
tracing::info!(
request_id = %request_id,
elapsed_us = share_mapping_started_at.elapsed().as_micros(),
shared_plugin_path_count = shared_plugin_ids_by_local_path.len(),
"plugin/read loaded local share mappings"
);
shared_plugin_ids_by_local_path
}
Err(err) => {
tracing::warn!(
request_id = %request_id,
elapsed_us = share_mapping_started_at.elapsed().as_micros(),
error = ?err,
"plugin/read failed to load local share mappings"
);
return Err(err);
}
};
let share_context = share_context_for_source(
&outcome.plugin.source,
&shared_plugin_ids_by_local_path,
@@ -893,6 +1139,7 @@ impl PluginRequestProcessor {
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
let share_context_started_at = Instant::now();
match codex_core_plugins::remote::fetch_remote_plugin_share_context(
&remote_plugin_service_config,
auth.as_ref(),
@@ -901,12 +1148,22 @@ impl PluginRequestProcessor {
.await
{
Ok(Some(remote_share_context)) => {
tracing::info!(
request_id = %request_id,
elapsed_ms = share_context_started_at.elapsed().as_millis(),
remote_plugin_id = %context.remote_plugin_id,
has_share_principals = remote_share_context
.share_principals
.is_some(),
"plugin/read hydrated local plugin share context"
);
if remote_share_context.share_principals.is_some() {
Some(remote_plugin_share_context_to_info(remote_share_context))
} else {
let remote_version = remote_share_context.remote_version;
let remote_plugin_id = context.remote_plugin_id.clone();
warn!(
request_id = %request_id,
remote_plugin_id = %remote_plugin_id,
"remote shared plugin detail did not include share principals; returning local share mapping context with remote version"
);
@@ -918,6 +1175,8 @@ impl PluginRequestProcessor {
}
Ok(None) => {
warn!(
request_id = %request_id,
elapsed_ms = share_context_started_at.elapsed().as_millis(),
remote_plugin_id = %context.remote_plugin_id,
"remote shared plugin detail did not include share context; returning local share mapping context"
);
@@ -925,6 +1184,8 @@ impl PluginRequestProcessor {
}
Err(err) => {
warn!(
request_id = %request_id,
elapsed_ms = share_context_started_at.elapsed().as_millis(),
remote_plugin_id = %context.remote_plugin_id,
error = %err,
"failed to hydrate local plugin share context; returning local share mapping context"
@@ -936,9 +1197,18 @@ impl PluginRequestProcessor {
None => None,
};
let environment_manager = self.thread_manager.environment_manager();
let apps_started_at = Instant::now();
let app_summaries =
load_plugin_app_summaries(&config, &outcome.plugin.apps, &environment_manager)
.await;
tracing::info!(
request_id = %request_id,
elapsed_ms = apps_started_at.elapsed().as_millis(),
requested_app_count = outcome.plugin.apps.len(),
resolved_app_count = app_summaries.len(),
app_needs_auth_count = app_summaries.iter().filter(|app| app.needs_auth).count(),
"plugin/read loaded local plugin app summaries"
);
let visible_skills = outcome
.plugin
.skills
@@ -950,6 +1220,14 @@ impl PluginRequestProcessor {
})
.cloned()
.collect::<Vec<_>>();
tracing::info!(
request_id = %request_id,
total_skill_count = outcome.plugin.skills.len(),
visible_skill_count = visible_skills.len(),
hidden_skill_count = outcome.plugin.skills.len().saturating_sub(visible_skills.len()),
disabled_skill_count = outcome.plugin.disabled_skill_paths.len(),
"plugin/read filtered local plugin skills for product"
);
PluginDetail {
marketplace_name: outcome.marketplace_name,
marketplace_path: outcome.marketplace_path,
@@ -988,25 +1266,98 @@ impl PluginRequestProcessor {
}
Err(remote_marketplace_name) => {
if !config.features.enabled(Feature::Plugins) {
tracing::info!(
request_id = %request_id,
elapsed_ms = started_at.elapsed().as_millis(),
remote_marketplace_name = %remote_marketplace_name,
"plugin/read completed with plugins feature disabled"
);
return Err(invalid_request(format!(
"remote plugin read is not enabled for marketplace {remote_marketplace_name}"
)));
}
let auth_started_at = Instant::now();
let auth = self.auth_manager.auth().await;
tracing::info!(
request_id = %request_id,
elapsed_ms = auth_started_at.elapsed().as_millis(),
has_auth = auth.is_some(),
remote_marketplace_name = %remote_marketplace_name,
"plugin/read loaded auth state for remote detail"
);
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
validate_remote_plugin_id(&plugin_name)?;
let remote_detail = codex_core_plugins::remote::fetch_remote_plugin_detail(
let validate_started_at = Instant::now();
if let Err(err) = validate_remote_plugin_id(&plugin_name) {
tracing::warn!(
request_id = %request_id,
elapsed_us = validate_started_at.elapsed().as_micros(),
remote_marketplace_name = %remote_marketplace_name,
plugin_name = %plugin_name,
error = ?err,
"plugin/read rejected invalid remote plugin id"
);
return Err(err);
}
tracing::info!(
request_id = %request_id,
elapsed_us = validate_started_at.elapsed().as_micros(),
remote_marketplace_name = %remote_marketplace_name,
plugin_name = %plugin_name,
"plugin/read validated remote plugin id"
);
let remote_detail_started_at = Instant::now();
tracing::info!(
request_id = %request_id,
remote_marketplace_name = %remote_marketplace_name,
plugin_name = %plugin_name,
"plugin/read remote detail fetch started"
);
let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
.map_err(|err| {
remote_plugin_catalog_error_to_jsonrpc(err, "read remote plugin details")
})?;
{
Ok(remote_detail) => {
tracing::info!(
request_id = %request_id,
elapsed_ms = remote_detail_started_at.elapsed().as_millis(),
remote_marketplace_name = %remote_marketplace_name,
remote_plugin_id = %remote_detail.summary.remote_plugin_id,
installed = remote_detail.summary.installed,
enabled = remote_detail.summary.enabled,
availability = ?remote_detail.summary.availability,
install_policy = ?remote_detail.summary.install_policy,
auth_policy = ?remote_detail.summary.auth_policy,
has_description = remote_detail.description.is_some(),
has_share_context = remote_detail.summary.share_context.is_some(),
has_interface = remote_detail.summary.interface.is_some(),
keyword_count = remote_detail.summary.keywords.len(),
skill_count = remote_detail.skills.len(),
app_count = remote_detail.app_ids.len(),
has_bundle_download_url = remote_detail.bundle_download_url.is_some(),
"plugin/read remote detail fetch completed"
);
remote_detail
}
Err(err) => {
tracing::warn!(
request_id = %request_id,
elapsed_ms = remote_detail_started_at.elapsed().as_millis(),
remote_marketplace_name = %remote_marketplace_name,
plugin_name = %plugin_name,
"plugin/read remote detail fetch failed: {err}"
);
return Err(remote_plugin_catalog_error_to_jsonrpc(
err,
"read remote plugin details",
));
}
};
let plugin_apps = remote_detail
.app_ids
.iter()
@@ -1014,12 +1365,84 @@ impl PluginRequestProcessor {
.map(codex_plugin::AppConnectorId)
.collect::<Vec<_>>();
let environment_manager = self.thread_manager.environment_manager();
let apps_started_at = Instant::now();
let app_summaries =
load_plugin_app_summaries(&config, &plugin_apps, &environment_manager).await;
tracing::info!(
request_id = %request_id,
elapsed_ms = apps_started_at.elapsed().as_millis(),
requested_app_count = plugin_apps.len(),
resolved_app_count = app_summaries.len(),
app_needs_auth_count = app_summaries.iter().filter(|app| app.needs_auth).count(),
"plugin/read loaded remote plugin app summaries"
);
remote_plugin_detail_to_info(remote_detail, app_summaries)
}
};
let interface = plugin.summary.interface.as_ref();
let enabled_skill_count = plugin.skills.iter().filter(|skill| skill.enabled).count();
let app_needs_auth_count = plugin.apps.iter().filter(|app| app.needs_auth).count();
tracing::info!(
request_id = %request_id,
elapsed_ms = started_at.elapsed().as_millis(),
source_kind,
marketplace_name = %plugin.marketplace_name,
marketplace_path = ?plugin.marketplace_path,
plugin_id = %plugin.summary.id,
remote_plugin_id = ?plugin.summary.remote_plugin_id,
plugin_name = %plugin.summary.name,
source = ?plugin.summary.source,
installed = plugin.summary.installed,
enabled = plugin.summary.enabled,
availability = ?plugin.summary.availability,
install_policy = ?plugin.summary.install_policy,
auth_policy = ?plugin.summary.auth_policy,
has_description = plugin.description.is_some(),
has_share_context = plugin.summary.share_context.is_some(),
has_interface = interface.is_some(),
has_display_name = interface
.and_then(|interface| interface.display_name.as_ref())
.is_some(),
has_short_description = interface
.and_then(|interface| interface.short_description.as_ref())
.is_some(),
has_long_description = interface
.and_then(|interface| interface.long_description.as_ref())
.is_some(),
capability_count = interface
.map(|interface| interface.capabilities.len())
.unwrap_or_default(),
default_prompt_count = interface
.and_then(|interface| interface.default_prompt.as_ref())
.map(Vec::len)
.unwrap_or_default(),
has_composer_icon = interface
.and_then(|interface| interface.composer_icon.as_ref())
.is_some(),
has_composer_icon_url = interface
.and_then(|interface| interface.composer_icon_url.as_ref())
.is_some(),
has_logo = interface
.and_then(|interface| interface.logo.as_ref())
.is_some(),
has_logo_url = interface
.and_then(|interface| interface.logo_url.as_ref())
.is_some(),
screenshot_count = interface
.map(|interface| interface.screenshots.len())
.unwrap_or_default(),
screenshot_url_count = interface
.map(|interface| interface.screenshot_urls.len())
.unwrap_or_default(),
skill_count = plugin.skills.len(),
enabled_skill_count,
hook_count = plugin.hooks.len(),
app_count = plugin.apps.len(),
app_needs_auth_count,
mcp_server_count = plugin.mcp_servers.len(),
"plugin/read request completed"
);
Ok(PluginReadResponse { plugin })
}

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;
use crate::error_code::internal_error;
use crate::error_code::invalid_request;
@@ -45,6 +46,13 @@ impl SearchRequestProcessor {
roots,
cancellation_token,
} = params;
let started_at = Instant::now();
tracing::info!(
query_len = query.len(),
root_count = roots.len(),
has_cancellation_token = cancellation_token.is_some(),
"fuzzyFileSearch request started"
);
let cancel_flag = match cancellation_token.clone() {
Some(token) => {
@@ -75,6 +83,11 @@ impl SearchRequestProcessor {
}
}
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
file_count = results.len(),
"fuzzyFileSearch request completed"
);
Ok(FuzzyFileSearchResponse { files: results })
}
@@ -87,6 +100,12 @@ impl SearchRequestProcessor {
return Err(invalid_request("sessionId must not be empty"));
}
let started_at = Instant::now();
tracing::info!(
session_id = %session_id,
root_count = roots.len(),
"fuzzyFileSearch session start requested"
);
let session =
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone())
.map_err(|err| {
@@ -96,6 +115,10 @@ impl SearchRequestProcessor {
.lock()
.await
.insert(session_id, session);
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
"fuzzyFileSearch session started"
);
Ok(FuzzyFileSearchSessionStartResponse {})
}
@@ -104,6 +127,8 @@ impl SearchRequestProcessor {
params: FuzzyFileSearchSessionUpdateParams,
) -> Result<FuzzyFileSearchSessionUpdateResponse, JSONRPCErrorError> {
let FuzzyFileSearchSessionUpdateParams { session_id, query } = params;
let started_at = Instant::now();
let query_len = query.len();
let found = {
let sessions = self.fuzzy_search_sessions.lock().await;
if let Some(session) = sessions.get(&session_id) {
@@ -119,6 +144,12 @@ impl SearchRequestProcessor {
)));
}
tracing::info!(
elapsed_us = started_at.elapsed().as_micros(),
session_id = %session_id,
query_len,
"fuzzyFileSearch session query dispatched"
);
Ok(FuzzyFileSearchSessionUpdateResponse {})
}
@@ -127,8 +158,14 @@ impl SearchRequestProcessor {
params: FuzzyFileSearchSessionStopParams,
) -> Result<FuzzyFileSearchSessionStopResponse, JSONRPCErrorError> {
let FuzzyFileSearchSessionStopParams { session_id } = params;
let started_at = Instant::now();
self.fuzzy_search_sessions.lock().await.remove(&session_id);
tracing::info!(
elapsed_us = started_at.elapsed().as_micros(),
session_id = %session_id,
"fuzzyFileSearch session stopped"
);
Ok(FuzzyFileSearchSessionStopResponse {})
}
}

View File

@@ -18,6 +18,7 @@ use std::collections::HashSet;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;
use url::Url;
mod remote_installed_plugin_sync;
@@ -485,6 +486,12 @@ pub async fn fetch_remote_marketplaces(
auth: Option<&CodexAuth>,
sources: &[RemoteMarketplaceSource],
) -> Result<Vec<RemoteMarketplace>, RemotePluginCatalogError> {
let started_at = Instant::now();
tracing::info!(
source_count = sources.len(),
sources = ?sources,
"remote plugin marketplace fetch started"
);
let auth = ensure_chatgpt_auth(auth)?;
let mut marketplaces = Vec::new();
let needs_workspace_installed = sources.iter().any(|source| {
@@ -494,12 +501,21 @@ pub async fn fetch_remote_marketplaces(
)
});
let workspace_installed_plugins = if needs_workspace_installed {
Some(fetch_installed_plugins_for_scope(config, auth, RemotePluginScope::Workspace).await?)
let workspace_installed_started_at = Instant::now();
let plugins =
fetch_installed_plugins_for_scope(config, auth, RemotePluginScope::Workspace).await?;
tracing::info!(
elapsed_ms = workspace_installed_started_at.elapsed().as_millis(),
plugin_count = plugins.len(),
"remote plugin workspace installed prerequisite fetched"
);
Some(plugins)
} else {
None
};
for source in sources {
let source_started_at = Instant::now();
match source {
RemoteMarketplaceSource::Global => {
let scope = RemotePluginScope::Global;
@@ -507,6 +523,8 @@ pub async fn fetch_remote_marketplaces(
fetch_directory_plugins_for_scope(config, auth, scope),
fetch_installed_plugins_for_scope(config, auth, scope),
)?;
let directory_plugin_count = directory_plugins.len();
let installed_plugin_count = installed_plugins.len();
if let Some(marketplace) = build_remote_marketplace(
scope.marketplace_name(),
scope.marketplace_display_name(),
@@ -514,13 +532,34 @@ pub async fn fetch_remote_marketplaces(
installed_plugins,
/*include_installed_only*/ true,
)? {
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
directory_plugin_count,
installed_plugin_count,
plugin_count = marketplace.plugins.len(),
"remote plugin marketplace source fetched"
);
marketplaces.push(marketplace);
} else {
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
directory_plugin_count,
installed_plugin_count,
plugin_count = 0,
"remote plugin marketplace source fetched empty"
);
}
}
RemoteMarketplaceSource::WorkspaceDirectory => {
let scope = RemotePluginScope::Workspace;
let directory_plugins =
fetch_directory_plugins_for_scope(config, auth, scope).await?;
let directory_plugin_count = directory_plugins.len();
let installed_plugin_count = workspace_installed_plugins
.as_ref()
.map_or(0, std::vec::Vec::len);
if let Some(marketplace) = build_remote_marketplace(
scope.marketplace_name(),
scope.marketplace_display_name(),
@@ -528,7 +567,24 @@ pub async fn fetch_remote_marketplaces(
workspace_installed_plugins.clone().unwrap_or_default(),
/*include_installed_only*/ false,
)? {
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
directory_plugin_count,
installed_plugin_count,
plugin_count = marketplace.plugins.len(),
"remote plugin marketplace source fetched"
);
marketplaces.push(marketplace);
} else {
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
directory_plugin_count,
installed_plugin_count,
plugin_count = 0,
"remote plugin marketplace source fetched empty"
);
}
}
RemoteMarketplaceSource::SharedWithMe => {
@@ -536,6 +592,7 @@ pub async fn fetch_remote_marketplaces(
// with the user. Installed unlisted plugins that are not returned there are
// link-installed and stay in the separate unlisted bucket.
let shared_plugins = fetch_shared_workspace_plugins(config, auth).await?;
let shared_plugin_count = shared_plugins.len();
let shared_plugin_ids = shared_plugins
.iter()
.map(|plugin| plugin.id.clone())
@@ -558,6 +615,14 @@ pub async fn fetch_remote_marketplaces(
workspace_installed_plugins.clone().unwrap_or_default(),
/*include_installed_only*/ false,
)? {
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
shared_plugin_count,
plugin_count = marketplace.plugins.len(),
marketplace_name = %REMOTE_WORKSPACE_SHARED_WITH_ME_PRIVATE_MARKETPLACE_NAME,
"remote plugin shared marketplace bucket fetched"
);
marketplaces.push(marketplace);
}
@@ -586,12 +651,36 @@ pub async fn fetch_remote_marketplaces(
unlisted_installed_plugins,
/*include_installed_only*/ true,
)? {
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
shared_plugin_count,
plugin_count = marketplace.plugins.len(),
marketplace_name = %REMOTE_WORKSPACE_SHARED_WITH_ME_UNLISTED_MARKETPLACE_NAME,
"remote plugin shared marketplace bucket fetched"
);
marketplaces.push(marketplace);
}
tracing::info!(
elapsed_ms = source_started_at.elapsed().as_millis(),
source = ?source,
shared_plugin_count,
"remote plugin marketplace source fetched"
);
}
}
}
let plugin_count: usize = marketplaces
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum();
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
marketplace_count = marketplaces.len(),
plugin_count,
"remote plugin marketplace fetch completed"
);
Ok(marketplaces)
}
@@ -1175,17 +1264,35 @@ async fn fetch_directory_plugins_for_scope(
auth: &CodexAuth,
scope: RemotePluginScope,
) -> Result<Vec<RemotePluginDirectoryItem>, RemotePluginCatalogError> {
let started_at = Instant::now();
let mut plugins = Vec::new();
let mut page_token = None;
let mut page_count = 0usize;
loop {
let page_started_at = Instant::now();
let response =
get_remote_plugin_list_page(config, auth, scope, page_token.as_deref()).await?;
page_count += 1;
tracing::info!(
elapsed_ms = page_started_at.elapsed().as_millis(),
scope = ?scope,
page_plugin_count = response.plugins.len(),
has_next_page = response.pagination.next_page_token.is_some(),
"remote plugin directory page fetched"
);
plugins.extend(response.plugins);
let Some(next_page_token) = response.pagination.next_page_token else {
break;
};
page_token = Some(next_page_token);
}
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
scope = ?scope,
page_count,
plugin_count = plugins.len(),
"remote plugin directory fetched"
);
Ok(plugins)
}
@@ -1193,17 +1300,33 @@ async fn fetch_shared_workspace_plugins(
config: &RemotePluginServiceConfig,
auth: &CodexAuth,
) -> Result<Vec<RemotePluginDirectoryItem>, RemotePluginCatalogError> {
let started_at = Instant::now();
let mut plugins = Vec::new();
let mut page_token = None;
let mut page_count = 0usize;
loop {
let page_started_at = Instant::now();
let response =
get_remote_shared_workspace_plugins_page(config, auth, page_token.as_deref()).await?;
page_count += 1;
tracing::info!(
elapsed_ms = page_started_at.elapsed().as_millis(),
page_plugin_count = response.plugins.len(),
has_next_page = response.pagination.next_page_token.is_some(),
"remote shared workspace plugin page fetched"
);
plugins.extend(response.plugins);
let Some(next_page_token) = response.pagination.next_page_token else {
break;
};
page_token = Some(next_page_token);
}
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
page_count,
plugin_count = plugins.len(),
"remote shared workspace plugins fetched"
);
Ok(plugins)
}
@@ -1224,9 +1347,12 @@ async fn fetch_installed_plugins_for_scope_with_download_url(
scope: RemotePluginScope,
include_download_urls: bool,
) -> Result<Vec<RemotePluginInstalledItem>, RemotePluginCatalogError> {
let started_at = Instant::now();
let mut plugins = Vec::new();
let mut page_token = None;
let mut page_count = 0usize;
loop {
let page_started_at = Instant::now();
let response = get_remote_plugin_installed_page(
config,
auth,
@@ -1235,12 +1361,29 @@ async fn fetch_installed_plugins_for_scope_with_download_url(
include_download_urls,
)
.await?;
page_count += 1;
tracing::info!(
elapsed_ms = page_started_at.elapsed().as_millis(),
scope = ?scope,
include_download_urls,
page_plugin_count = response.plugins.len(),
has_next_page = response.pagination.next_page_token.is_some(),
"remote installed plugin page fetched"
);
plugins.extend(response.plugins);
let Some(next_page_token) = response.pagination.next_page_token else {
break;
};
page_token = Some(next_page_token);
}
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
scope = ?scope,
include_download_urls,
page_count,
plugin_count = plugins.len(),
"remote installed plugins fetched"
);
Ok(plugins)
}
@@ -1359,16 +1502,51 @@ async fn send_and_decode<T: for<'de> Deserialize<'de>>(
request: RequestBuilder,
url: &str,
) -> Result<T, RemotePluginCatalogError> {
let response = request
.send()
.await
.map_err(|source| RemotePluginCatalogError::Request {
url: url.to_string(),
source,
})?;
let started_at = Instant::now();
let url_path = Url::parse(url)
.map(|url| url.path().to_string())
.unwrap_or_else(|_| "<invalid-url>".to_string());
let response = match request.send().await {
Ok(response) => {
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
url_path = %url_path,
status = %response.status(),
"remote plugin HTTP response received"
);
response
}
Err(source) => {
tracing::warn!(
elapsed_ms = started_at.elapsed().as_millis(),
url_path = %url_path,
"remote plugin HTTP request failed: {source}"
);
return Err(RemotePluginCatalogError::Request {
url: url.to_string(),
source,
});
}
};
let status = response.status();
let body_started_at = Instant::now();
let body = response.text().await.unwrap_or_default();
tracing::info!(
elapsed_ms = body_started_at.elapsed().as_millis(),
total_elapsed_ms = started_at.elapsed().as_millis(),
url_path = %url_path,
status = %status,
body_bytes = body.len(),
"remote plugin HTTP body read"
);
if !status.is_success() {
tracing::warn!(
elapsed_ms = started_at.elapsed().as_millis(),
url_path = %url_path,
status = %status,
body_bytes = body.len(),
"remote plugin HTTP request returned unexpected status"
);
return Err(RemotePluginCatalogError::UnexpectedStatus {
url: url.to_string(),
status,
@@ -1376,8 +1554,26 @@ async fn send_and_decode<T: for<'de> Deserialize<'de>>(
});
}
serde_json::from_str(&body).map_err(|source| RemotePluginCatalogError::Decode {
url: url.to_string(),
source,
})
let decode_started_at = Instant::now();
let decoded = serde_json::from_str(&body).map_err(|source| {
tracing::warn!(
elapsed_ms = decode_started_at.elapsed().as_millis(),
total_elapsed_ms = started_at.elapsed().as_millis(),
url_path = %url_path,
body_bytes = body.len(),
"remote plugin HTTP response decode failed: {source}"
);
RemotePluginCatalogError::Decode {
url: url.to_string(),
source,
}
})?;
tracing::info!(
elapsed_ms = decode_started_at.elapsed().as_millis(),
total_elapsed_ms = started_at.elapsed().as_millis(),
url_path = %url_path,
body_bytes = body.len(),
"remote plugin HTTP response decoded"
);
Ok(decoded)
}

View File

@@ -19,6 +19,7 @@ use crate::hooks_rpc::fetch_hooks_list;
use crate::hooks_rpc::write_hook_trust;
use crate::hooks_rpc::write_hook_trusts;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::time::Instant;
impl App {
pub(super) fn fetch_mcp_inventory(
@@ -658,17 +659,38 @@ pub(super) async fn request_plugin_list(
cwd: PathBuf,
) -> Result<PluginListResponse> {
let cwd = AbsolutePathBuf::try_from(cwd).wrap_err("plugin list cwd must be absolute")?;
let request_id = RequestId::String(format!("plugin-list-{}", Uuid::new_v4()));
request_handle
let request_id = format!("plugin-list-{}", Uuid::new_v4());
let started_at = Instant::now();
tracing::info!(
request_id = %request_id,
root_count = 1,
"plugin/list TUI request started"
);
let response: PluginListResponse = request_handle
.request_typed(ClientRequest::PluginList {
request_id,
request_id: RequestId::String(request_id.clone()),
params: PluginListParams {
cwds: Some(vec![cwd]),
marketplace_kinds: None,
},
})
.await
.wrap_err("plugin/list failed in TUI")
.wrap_err("plugin/list failed in TUI")?;
let plugin_count: usize = response
.marketplaces
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum();
tracing::info!(
request_id = %request_id,
elapsed_ms = started_at.elapsed().as_millis(),
marketplace_count = response.marketplaces.len(),
plugin_count,
marketplace_load_error_count = response.marketplace_load_errors.len(),
featured_plugin_id_count = response.featured_plugin_ids.len(),
"plugin/list TUI request completed"
);
Ok(response)
}
pub(super) async fn fetch_plugin_detail(

View File

@@ -12,6 +12,7 @@ use codex_app_server_protocol::PluginSummary;
use codex_core_plugins::PluginsManager;
use codex_plugin::PluginCapabilitySummary;
use std::collections::HashMap;
use std::time::Instant;
#[derive(Debug, Clone)]
struct PluginMentionEntry {
@@ -41,14 +42,47 @@ pub(super) async fn fetch_plugin_mentions(
request_handle: AppServerRequestHandle,
config: crate::legacy_core::config::Config,
) -> Result<Vec<PluginCapabilitySummary>> {
let started_at = Instant::now();
let list_started_at = Instant::now();
let response = request_plugin_list(request_handle, config.cwd.to_path_buf()).await?;
let list_elapsed_ms = list_started_at.elapsed().as_millis();
let marketplace_count = response.marketplaces.len();
let list_plugin_count: usize = response
.marketplaces
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum();
tracing::info!(
elapsed_ms = list_elapsed_ms,
marketplace_count,
plugin_count = list_plugin_count,
marketplace_load_error_count = response.marketplace_load_errors.len(),
featured_plugin_id_count = response.featured_plugin_ids.len(),
"plugin mention plugin/list phase completed"
);
let mention_entries = plugin_mention_entries_from_list_response(response);
tracing::info!(
mention_entry_count = mention_entries.len(),
"plugin mention list response filtered"
);
let capabilities_started_at = Instant::now();
let capabilities_by_config_name = load_plugin_mention_capabilities(&config).await;
tracing::info!(
elapsed_ms = capabilities_started_at.elapsed().as_millis(),
capability_count = capabilities_by_config_name.len(),
"plugin mention local capability phase completed"
);
Ok(mention_entries
let plugins = mention_entries
.into_iter()
.filter_map(|entry| entry.capability_summary(&capabilities_by_config_name))
.collect())
.collect::<Vec<_>>();
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
mention_count = plugins.len(),
"plugin mentions fetched"
);
Ok(plugins)
}
async fn load_plugin_mention_capabilities(

View File

@@ -3969,6 +3969,7 @@ impl ChatComposer {
return;
}
let started_at = Instant::now();
if query.is_empty() {
self.app_event_tx
.send(AppEvent::StartFileSearch(String::new()));
@@ -3983,6 +3984,9 @@ impl ChatComposer {
self.skills.as_deref(),
self.plugins.as_deref(),
);
let candidate_count = candidates.len();
let skill_count = self.skills.as_ref().map_or(0, std::vec::Vec::len);
let plugin_count = self.plugins.as_ref().map_or(0, std::vec::Vec::len);
match &mut self.popups.active {
ActivePopup::MentionV2(popup) => {
@@ -3996,6 +4000,14 @@ impl ChatComposer {
}
}
tracing::info!(
elapsed_us = started_at.elapsed().as_micros(),
query_len = query.len(),
candidate_count,
skill_count,
plugin_count,
"mentions v2 @ popup synchronized"
);
self.popups.dismissed_mention_token = None;
}

View File

@@ -9,6 +9,7 @@ use codex_file_search as file_search;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
@@ -21,6 +22,7 @@ pub(crate) struct FileSearchManager {
struct SearchState {
latest_query: String,
latest_query_started_at: Option<Instant>,
session: Option<file_search::FileSearchSession>,
session_token: usize,
}
@@ -30,6 +32,7 @@ impl FileSearchManager {
Self {
state: Arc::new(Mutex::new(SearchState {
latest_query: String::new(),
latest_query_started_at: None,
session: None,
session_token: 0,
})),
@@ -47,6 +50,7 @@ impl FileSearchManager {
let mut st = self.state.lock().unwrap();
st.session.take();
st.latest_query.clear();
st.latest_query_started_at = None;
}
/// Call whenever the user edits the `@` token.
@@ -58,21 +62,33 @@ impl FileSearchManager {
}
st.latest_query.clear();
st.latest_query.push_str(&query);
st.latest_query_started_at = Some(Instant::now());
if query.is_empty() {
tracing::info!("TUI @ file search query cleared");
st.latest_query_started_at = None;
st.session.take();
return;
}
if st.session.is_none() {
let session_was_missing = st.session.is_none();
if session_was_missing {
self.start_session_locked(&mut st);
}
if let Some(session) = st.session.as_ref() {
let update_started_at = Instant::now();
session.update_query(&query);
tracing::info!(
elapsed_us = update_started_at.elapsed().as_micros(),
query_len = query.len(),
session_was_missing,
"TUI @ file search query dispatched"
);
}
}
fn start_session_locked(&self, st: &mut SearchState) {
let started_at = Instant::now();
st.session_token = st.session_token.wrapping_add(1);
let session_token = st.session_token;
let reporter = Arc::new(TuiSessionReporter {
@@ -90,9 +106,21 @@ impl FileSearchManager {
/*cancel_flag*/ None,
);
match session {
Ok(session) => st.session = Some(session),
Ok(session) => {
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
session_token,
root_count = 1,
"TUI @ file search session started"
);
st.session = Some(session);
}
Err(err) => {
tracing::warn!("file search session failed to start: {err}");
tracing::warn!(
elapsed_ms = started_at.elapsed().as_millis(),
root_count = 1,
"file search session failed to start: {err}"
);
st.session = None;
}
}
@@ -109,6 +137,9 @@ impl TuiSessionReporter {
fn send_snapshot(&self, snapshot: &file_search::FileSearchSnapshot) {
#[expect(clippy::unwrap_used)]
let st = self.state.lock().unwrap();
let elapsed_ms = st
.latest_query_started_at
.map(|started_at| started_at.elapsed().as_millis());
if st.session_token != self.session_token
|| st.latest_query.is_empty()
|| snapshot.query.is_empty()
@@ -116,7 +147,14 @@ impl TuiSessionReporter {
return;
}
let query = snapshot.query.clone();
let match_count = snapshot.matches.len();
drop(st);
tracing::info!(
elapsed_ms = ?elapsed_ms,
query_len = query.len(),
match_count,
"TUI @ file search snapshot delivered"
);
self.app_tx.send(AppEvent::FileSearchResult {
query,
matches: snapshot.matches.clone(),

View File

@@ -0,0 +1,418 @@
#!/usr/bin/env python3
"""Measure latency for production plugin APIs used by Codex plugin loading."""
from __future__ import annotations
import argparse
import csv
import json
import math
import statistics
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
DEFAULT_BASE_URL = "https://chatgpt.com"
DEFAULT_AUTH_JSON = "~/.codex/auth.json"
DEFAULT_OUTPUT_DIR = "/tmp"
@dataclass(frozen=True)
class ProbeEndpoint:
label: str
path: str
timeout_sec: float
def api_label(self) -> str:
return f"{self.label} | GET {self.path}"
ENDPOINTS: tuple[ProbeEndpoint, ...] = (
ProbeEndpoint(
label="workspace_installed_for_shared",
path="/backend-api/ps/plugins/installed?scope=WORKSPACE",
timeout_sec=30.0,
),
ProbeEndpoint(
label="workspace_shared",
path="/backend-api/ps/plugins/workspace/shared?limit=200",
timeout_sec=30.0,
),
ProbeEndpoint(
label="workspace_directory",
path="/backend-api/ps/plugins/list?scope=WORKSPACE&limit=200",
timeout_sec=30.0,
),
ProbeEndpoint(
label="global_directory",
path="/backend-api/ps/plugins/list?scope=GLOBAL&limit=200",
timeout_sec=30.0,
),
ProbeEndpoint(
label="global_installed",
path="/backend-api/ps/plugins/installed?scope=GLOBAL",
timeout_sec=30.0,
),
ProbeEndpoint(
label="featured_plugin_ids",
path="/backend-api/plugins/featured?platform=codex",
timeout_sec=10.0,
),
ProbeEndpoint(
label="created_by_me_workspace",
path="/backend-api/ps/plugins/workspace/created?limit=200",
timeout_sec=30.0,
),
ProbeEndpoint(
label="workspace_installed_after_created",
path="/backend-api/ps/plugins/installed?scope=WORKSPACE",
timeout_sec=30.0,
),
)
CSV_FIELDS = (
"iteration",
"order",
"label",
"method",
"url_path_query",
"started_at",
"ended_at",
"latency_ms",
"latency_ns",
"status",
"success",
"response_bytes",
"request_id",
"error",
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Sequentially benchmark production Codex plugin-loading APIs. "
"The default run performs 100 iterations of all endpoints."
)
)
parser.add_argument("--iterations", type=int, default=100)
parser.add_argument("--call-gap-sec", type=float, default=1.0)
parser.add_argument("--iteration-gap-sec", type=float, default=10.0)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--auth-json", default=DEFAULT_AUTH_JSON)
parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
parser.add_argument(
"--user-agent",
default="codex-plugin-api-latency-probe/1.0",
help="User-Agent sent to production APIs.",
)
return parser.parse_args()
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="milliseconds")
def load_auth_headers(auth_json_path: str, user_agent: str) -> dict[str, str]:
path = Path(auth_json_path).expanduser()
try:
payload = json.loads(path.read_text())
except FileNotFoundError as exc:
raise SystemExit(f"auth file not found: {path}") from exc
except json.JSONDecodeError as exc:
raise SystemExit(f"auth file is not valid JSON: {path}: {exc}") from exc
tokens = payload.get("tokens")
if not isinstance(tokens, dict):
raise SystemExit(f"auth file does not contain tokens object: {path}")
access_token = tokens.get("access_token")
if not isinstance(access_token, str) or not access_token.strip():
raise SystemExit(
"auth file does not contain tokens.access_token. "
"This probe requires ChatGPT token auth from ~/.codex/auth.json."
)
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"originator": "codex_cli_rs",
"User-Agent": user_agent,
}
account_id = tokens.get("account_id")
if isinstance(account_id, str) and account_id.strip():
headers["ChatGPT-Account-ID"] = account_id
return headers
def join_url(base_url: str, path: str) -> str:
base_url = base_url.rstrip("/")
if not path.startswith("/"):
path = f"/{path}"
return f"{base_url}{path}"
def response_request_id(headers: Any) -> str:
for name in (
"x-request-id",
"openai-request-id",
"cf-ray",
"x-envoy-upstream-service-time",
):
value = headers.get(name)
if value:
return str(value)
return ""
def perform_request(
endpoint: ProbeEndpoint,
base_url: str,
headers: dict[str, str],
iteration: int,
order: int,
) -> dict[str, Any]:
url = join_url(base_url, endpoint.path)
request = urllib.request.Request(url, headers=headers, method="GET")
started_at = utc_now_iso()
started_ns = time.perf_counter_ns()
status = ""
response_bytes = 0
request_id = ""
error = ""
try:
with urllib.request.urlopen(request, timeout=endpoint.timeout_sec) as response:
body = response.read()
status = str(response.status)
response_bytes = len(body)
request_id = response_request_id(response.headers)
except urllib.error.HTTPError as exc:
body = exc.read()
status = str(exc.code)
response_bytes = len(body)
request_id = response_request_id(exc.headers)
error = f"HTTPError: {exc.code} {exc.reason}"
except urllib.error.URLError as exc:
error = f"URLError: {exc.reason}"
except TimeoutError as exc:
error = f"TimeoutError: {exc}"
except Exception as exc: # Keep the long-running probe alive across one bad call.
error = f"{type(exc).__name__}: {exc}"
ended_ns = time.perf_counter_ns()
ended_at = utc_now_iso()
latency_ns = ended_ns - started_ns
latency_ms = latency_ns / 1_000_000
success = status.startswith("2") and not error
return {
"iteration": iteration,
"order": order,
"label": endpoint.api_label(),
"method": "GET",
"url_path_query": endpoint.path,
"started_at": started_at,
"ended_at": ended_at,
"latency_ms": f"{latency_ms:.3f}",
"latency_ns": latency_ns,
"status": status,
"success": str(success).lower(),
"response_bytes": response_bytes,
"request_id": request_id,
"error": error,
}
def nearest_rank(values: list[float], percentile: float) -> float | None:
if not values:
return None
sorted_values = sorted(values)
rank = max(1, math.ceil((percentile / 100) * len(sorted_values)))
return sorted_values[rank - 1]
def summarize_rows(rows: list[dict[str, Any]]) -> dict[str, Any]:
summaries: dict[str, Any] = {}
for endpoint in ENDPOINTS:
api_label = endpoint.api_label()
endpoint_rows = [row for row in rows if row["label"] == api_label]
latencies = [float(row["latency_ms"]) for row in endpoint_rows]
success_rows = [row for row in endpoint_rows if row["success"] == "true"]
error_rows = [row for row in endpoint_rows if row["success"] != "true"]
non_2xx_rows = [
row
for row in endpoint_rows
if not str(row["status"]).startswith("2")
]
if latencies:
summary = {
"count": len(endpoint_rows),
"success_count": len(success_rows),
"error_count": len(error_rows),
"non_2xx_count": len(non_2xx_rows),
"min_ms": min(latencies),
"mean_ms": statistics.fmean(latencies),
"p50_ms": statistics.median(latencies),
"p90_ms": nearest_rank(latencies, 90),
"p95_ms": nearest_rank(latencies, 95),
"p99_ms": nearest_rank(latencies, 99),
"max_ms": max(latencies),
"stdev_ms": statistics.stdev(latencies) if len(latencies) > 1 else 0.0,
}
else:
summary = {
"count": 0,
"success_count": 0,
"error_count": 0,
"non_2xx_count": 0,
"min_ms": None,
"mean_ms": None,
"p50_ms": None,
"p90_ms": None,
"p95_ms": None,
"p99_ms": None,
"max_ms": None,
"stdev_ms": None,
}
summaries[api_label] = summary
return {
"generated_at": utc_now_iso(),
"percentile_method": "nearest_rank",
"endpoints": summaries,
}
def write_outputs(
rows: list[dict[str, Any]],
csv_path: Path,
summary_path: Path,
) -> dict[str, Any]:
with csv_path.open("w", newline="") as file:
writer = csv.DictWriter(file, fieldnames=CSV_FIELDS)
writer.writeheader()
writer.writerows(rows)
summary = summarize_rows(rows)
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
return summary
def print_summary(summary: dict[str, Any]) -> None:
print()
print("Latency summary, milliseconds")
print(
"label,count,success,error,non_2xx,min,mean,p50,p90,p95,p99,max,stdev"
)
for endpoint in ENDPOINTS:
api_label = endpoint.api_label()
item = summary["endpoints"][api_label]
values = [
api_label,
item["count"],
item["success_count"],
item["error_count"],
item["non_2xx_count"],
item["min_ms"],
item["mean_ms"],
item["p50_ms"],
item["p90_ms"],
item["p95_ms"],
item["p99_ms"],
item["max_ms"],
item["stdev_ms"],
]
print(
",".join(
f"{value:.3f}" if isinstance(value, float) else str(value)
for value in values
)
)
def validate_args(args: argparse.Namespace) -> None:
if args.iterations < 1:
raise SystemExit("--iterations must be >= 1")
if args.call_gap_sec < 0:
raise SystemExit("--call-gap-sec must be >= 0")
if args.iteration_gap_sec < 0:
raise SystemExit("--iteration-gap-sec must be >= 0")
def main() -> int:
args = parse_args()
validate_args(args)
headers = load_auth_headers(args.auth_json, args.user_agent)
output_dir = Path(args.output_dir).expanduser()
output_dir.mkdir(parents=True, exist_ok=True)
stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
csv_path = output_dir / f"codex-plugin-api-latency-{stamp}.csv"
summary_path = output_dir / f"codex-plugin-api-latency-{stamp}.summary.json"
rows: list[dict[str, Any]] = []
total_calls = args.iterations * len(ENDPOINTS)
completed_calls = 0
print(f"base_url={args.base_url.rstrip('/')}")
print(f"iterations={args.iterations}")
print(f"call_gap_sec={args.call_gap_sec}")
print(f"iteration_gap_sec={args.iteration_gap_sec}")
print(f"csv_path={csv_path}")
print(f"summary_path={summary_path}")
print(f"total_calls={total_calls}")
print()
try:
for iteration in range(1, args.iterations + 1):
for order, endpoint in enumerate(ENDPOINTS, start=1):
row = perform_request(
endpoint=endpoint,
base_url=args.base_url,
headers=headers,
iteration=iteration,
order=order,
)
rows.append(row)
completed_calls += 1
status = row["status"] or "no-status"
outcome = "ok" if row["success"] == "true" else "error"
print(
f"[{completed_calls}/{total_calls}] "
f"iteration={iteration} order={order} label={row['label']} "
f"status={status} outcome={outcome} "
f"latency_ms={row['latency_ms']}"
)
sys.stdout.flush()
if order != len(ENDPOINTS) and args.call_gap_sec > 0:
time.sleep(args.call_gap_sec)
if iteration != args.iterations and args.iteration_gap_sec > 0:
time.sleep(args.iteration_gap_sec)
except KeyboardInterrupt:
print()
print("Interrupted; writing partial outputs.")
summary = write_outputs(rows, csv_path, summary_path)
print_summary(summary)
print()
print(f"Wrote CSV: {csv_path}")
print(f"Wrote summary: {summary_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())