Compare commits

...

4 Commits

Author SHA1 Message Date
xli-oai
1c3cb14fb5 Prototype lower-level skills list coordination 2026-05-07 12:43:22 -07:00
xli-oai
1789ab69d1 add plugin list and scheduler timing logs 2026-05-07 10:52:34 -07:00
xli-oai
8114282c02 mark experimental feature list as shared read 2026-05-07 03:46:22 -07:00
xli-oai
fe7044a2aa allow later shared reads to join running read window 2026-05-07 03:21:22 -07:00
9 changed files with 519 additions and 122 deletions

View File

@@ -589,7 +589,7 @@ client_request_definitions! {
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
serialization: global_shared_read("config"),
serialization: None,
response: v2::SkillsListResponse,
},
HooksList => "hooks/list" {
@@ -794,7 +794,7 @@ client_request_definitions! {
},
ExperimentalFeatureList => "experimentalFeature/list" {
params: v2::ExperimentalFeatureListParams,
serialization: global("config"),
serialization: global_shared_read("config"),
response: v2::ExperimentalFeatureListResponse,
},
ExperimentalFeatureEnablementSet => "experimentalFeature/enablement/set" {
@@ -1667,10 +1667,7 @@ mod tests {
per_cwd_extra_user_roots: None,
},
};
assert_eq!(
skills_list.serialization_scope(),
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
);
assert_eq!(skills_list.serialization_scope(), None);
let plugin_list = ClientRequest::PluginList {
request_id: request_id(),
@@ -1684,6 +1681,26 @@ mod tests {
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
);
let experimental_feature_list = ClientRequest::ExperimentalFeatureList {
request_id: request_id(),
params: v2::ExperimentalFeatureListParams::default(),
};
assert_eq!(
experimental_feature_list.serialization_scope(),
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
);
let experimental_feature_enablement_set = ClientRequest::ExperimentalFeatureEnablementSet {
request_id: request_id(),
params: v2::ExperimentalFeatureEnablementSetParams {
enablement: Default::default(),
},
};
assert_eq!(
experimental_feature_enablement_set.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let plugin_uninstall = ClientRequest::PluginUninstall {
request_id: request_id(),
params: v2::PluginUninstallParams {

View File

@@ -20,6 +20,9 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
use toml::Value as TomlValue;
use tracing::warn;
@@ -33,6 +36,7 @@ pub(crate) struct ConfigManager {
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
arg0_paths: Arg0DispatchPaths,
thread_config_loader: Arc<RwLock<Arc<dyn ThreadConfigLoader>>>,
shared_state: Arc<AsyncRwLock<()>>,
}
impl ConfigManager {
@@ -52,6 +56,7 @@ impl ConfigManager {
cloud_requirements: Arc::new(RwLock::new(cloud_requirements)),
arg0_paths,
thread_config_loader: Arc::new(RwLock::new(thread_config_loader)),
shared_state: Arc::new(AsyncRwLock::new(())),
}
}
@@ -73,6 +78,14 @@ impl ConfigManager {
.unwrap_or_default()
}
pub(crate) async fn read_shared_state(&self) -> RwLockReadGuard<'_, ()> {
self.shared_state.read().await
}
pub(crate) async fn write_shared_state(&self) -> RwLockWriteGuard<'_, ()> {
self.shared_state.write().await
}
pub(crate) fn extend_runtime_feature_enablement<I>(&self, enablement: I) -> Result<(), ()>
where
I: IntoIterator<Item = (String, bool)>,

View File

@@ -171,6 +171,7 @@ impl ConfigManager {
&self,
params: ConfigValueWriteParams,
) -> Result<ConfigWriteResponse, ConfigManagerError> {
let _guard = self.write_shared_state().await;
let edits = vec![(params.key_path, params.value, params.merge_strategy)];
self.apply_edits(params.file_path, params.expected_version, edits)
.await
@@ -180,6 +181,7 @@ impl ConfigManager {
&self,
params: ConfigBatchWriteParams,
) -> Result<ConfigWriteResponse, ConfigManagerError> {
let _guard = self.write_shared_state().await;
let edits = params
.edits
.into_iter()

View File

@@ -768,6 +768,8 @@ impl MessageProcessor {
);
let serialization_scope = codex_request.serialization_scope();
let serialization_method = codex_request.method();
let serialization_request_id = connection_request_id.request_id.to_string();
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
let device_key_requests_allowed = session.allows_device_key_requests();
@@ -794,7 +796,8 @@ impl MessageProcessor {
}
}
.instrument(span),
);
)
.with_log_metadata(serialization_method, serialization_request_id);
if let Some(scope) = serialization_scope {
let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope);

View File

@@ -1,5 +1,6 @@
use super::*;
use futures::StreamExt;
use std::time::Instant;
#[derive(Clone)]
pub(crate) struct CatalogRequestProcessor {
@@ -12,6 +13,19 @@ pub(crate) struct CatalogRequestProcessor {
const SKILLS_LIST_CWD_CONCURRENCY: usize = 5;
enum PreparedSkillsListEntry {
Ready {
index: usize,
cwd: PathBuf,
skills_input: codex_core::skills::SkillsLoadInput,
extra_roots: Vec<AbsolutePathBuf>,
},
Error {
index: usize,
entry: codex_app_server_protocol::SkillsListEntry,
},
}
fn skills_to_info(
skills: &[codex_core::skills::SkillMetadata],
disabled_paths: &HashSet<AbsolutePathBuf>,
@@ -421,68 +435,102 @@ impl CatalogRequestProcessor {
.extend(valid_extra_roots);
}
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
let auth = self.auth_manager.auth().await;
let workspace_codex_plugins_enabled = self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await;
let snapshot_started_at = Instant::now();
let prepared_entries = {
let _guard = self.config_manager.read_shared_state().await;
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
let auth = self.auth_manager.auth().await;
let workspace_codex_plugins_enabled = self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await;
let plugins_manager = self.thread_manager.plugins_manager();
futures::stream::iter(cwds.into_iter().enumerate())
.map(|(index, cwd)| {
let config = &config;
let extra_roots_by_cwd = &extra_roots_by_cwd;
let plugins_manager = &plugins_manager;
async move {
let (cwd_abs, config_layer_stack) = match self
.resolve_cwd_config(&cwd)
.await
{
Ok(resolved) => resolved,
Err(message) => {
let error_path = cwd.clone();
return PreparedSkillsListEntry::Error {
index,
entry: codex_app_server_protocol::SkillsListEntry {
cwd,
skills: Vec::new(),
errors: vec![codex_app_server_protocol::SkillErrorInfo {
path: error_path,
message,
}],
},
};
}
};
let extra_roots = extra_roots_by_cwd.get(&cwd).cloned().unwrap_or_default();
let effective_skill_roots = if workspace_codex_plugins_enabled {
let plugins_input = config.plugins_config_input();
plugins_manager
.effective_skill_roots_for_layer_stack(
&config_layer_stack,
&plugins_input,
)
.await
} else {
Vec::new()
};
PreparedSkillsListEntry::Ready {
index,
cwd,
skills_input: codex_core::skills::SkillsLoadInput::new(
cwd_abs.clone(),
effective_skill_roots,
config_layer_stack,
config.bundled_skills_enabled(),
),
extra_roots,
}
}
})
.buffer_unordered(SKILLS_LIST_CWD_CONCURRENCY)
.collect::<Vec<_>>()
.await
};
warn!(
elapsed_ms = snapshot_started_at.elapsed().as_millis(),
entry_count = prepared_entries.len(),
"skills/list prepared lower-level snapshot"
);
let skills_manager = self.thread_manager.skills_manager();
let plugins_manager = self.thread_manager.plugins_manager();
let fs = self
.thread_manager
.environment_manager()
.default_environment()
.map(|environment| environment.get_filesystem());
let mut data = futures::stream::iter(cwds.into_iter().enumerate())
.map(|(index, cwd)| {
let config = &config;
let extra_roots_by_cwd = &extra_roots_by_cwd;
let mut data = futures::stream::iter(prepared_entries.into_iter())
.map(|prepared_entry| {
let fs = fs.clone();
let plugins_manager = &plugins_manager;
let skills_manager = &skills_manager;
async move {
let (cwd_abs, config_layer_stack) = match self.resolve_cwd_config(&cwd).await {
Ok(resolved) => resolved,
Err(message) => {
let error_path = cwd.clone();
return (
index,
codex_app_server_protocol::SkillsListEntry {
cwd,
skills: Vec::new(),
errors: vec![codex_app_server_protocol::SkillErrorInfo {
path: error_path,
message,
}],
},
);
let (index, cwd, skills_input, extra_roots) = match prepared_entry {
PreparedSkillsListEntry::Ready {
index,
cwd,
skills_input,
extra_roots,
} => (index, cwd, skills_input, extra_roots),
PreparedSkillsListEntry::Error { index, entry } => {
return (index, entry);
}
};
let extra_roots = extra_roots_by_cwd
.get(&cwd)
.map_or(&[][..], std::vec::Vec::as_slice);
let effective_skill_roots = if workspace_codex_plugins_enabled {
let plugins_input = config.plugins_config_input();
plugins_manager
.effective_skill_roots_for_layer_stack(
&config_layer_stack,
&plugins_input,
)
.await
} else {
Vec::new()
};
let skills_input = codex_core::skills::SkillsLoadInput::new(
cwd_abs.clone(),
effective_skill_roots,
config_layer_stack,
config.bundled_skills_enabled(),
);
let outcome = skills_manager
.skills_for_cwd_with_extra_user_roots(
&skills_input,
force_reload,
extra_roots,
&extra_roots,
fs,
)
.await;
@@ -585,6 +633,7 @@ impl CatalogRequestProcessor {
&self,
params: SkillsConfigWriteParams,
) -> Result<SkillsConfigWriteResponse, JSONRPCErrorError> {
let _guard = self.config_manager.write_shared_state().await;
let SkillsConfigWriteParams {
path,
name,

View File

@@ -363,13 +363,16 @@ impl ConfigRequestProcessor {
return Ok(ExperimentalFeatureEnablementSetResponse { enablement });
}
self.config_manager
.extend_runtime_feature_enablement(
enablement
.iter()
.map(|(name, enabled)| (name.clone(), *enabled)),
)
.map_err(|_| internal_error("failed to update feature enablement"))?;
{
let _guard = self.config_manager.write_shared_state().await;
self.config_manager
.extend_runtime_feature_enablement(
enablement
.iter()
.map(|(name, enabled)| (name.clone(), *enabled)),
)
.map_err(|_| internal_error("failed to update feature enablement"))?;
}
self.load_latest_config(/*fallback_cwd*/ None).await?;
self.reload_user_config().await;

View File

@@ -51,6 +51,7 @@ impl MarketplaceRequestProcessor {
&self,
params: MarketplaceRemoveParams,
) -> Result<MarketplaceRemoveResponse, JSONRPCErrorError> {
let _guard = self.config_manager.write_shared_state().await;
remove_marketplace(
self.config.codex_home.to_path_buf(),
CoreMarketplaceRemoveRequest {
@@ -72,6 +73,7 @@ impl MarketplaceRequestProcessor {
&self,
params: MarketplaceUpgradeParams,
) -> Result<MarketplaceUpgradeResponse, JSONRPCErrorError> {
let _guard = self.config_manager.write_shared_state().await;
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
let plugins_manager = self.thread_manager.plugins_manager();
let MarketplaceUpgradeParams { marketplace_name } = params;
@@ -105,6 +107,7 @@ impl MarketplaceRequestProcessor {
&self,
params: MarketplaceAddParams,
) -> Result<MarketplaceAddResponse, JSONRPCErrorError> {
let _guard = self.config_manager.write_shared_state().await;
add_marketplace_to_codex_home(
self.config.codex_home.to_path_buf(),
MarketplaceAddRequest {

View File

@@ -261,6 +261,7 @@ impl PluginRequestProcessor {
&self,
params: PluginInstallParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
let _guard = self.config_manager.write_shared_state().await;
self.plugin_install_response(params)
.await
.map(|response| Some(response.into()))
@@ -270,6 +271,7 @@ impl PluginRequestProcessor {
&self,
params: PluginUninstallParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
let _guard = self.config_manager.write_shared_state().await;
self.plugin_uninstall_response(params)
.await
.map(|response| Some(response.into()))
@@ -348,6 +350,7 @@ impl PluginRequestProcessor {
&self,
params: PluginListParams,
) -> Result<PluginListResponse, JSONRPCErrorError> {
let request_started_at = Instant::now();
let plugins_manager = self.thread_manager.plugins_manager();
let PluginListParams {
cwds,
@@ -358,35 +361,80 @@ impl PluginRequestProcessor {
let marketplace_kinds =
marketplace_kinds.unwrap_or_else(|| vec![PluginListMarketplaceKind::Local]);
let include_local = marketplace_kinds.contains(&PluginListMarketplaceKind::Local);
warn!(
roots_count = roots.len(),
explicit_marketplace_kinds,
marketplace_kinds_count = marketplace_kinds.len(),
include_local,
"plugin/list timing started"
);
let config_started_at = Instant::now();
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
warn!(
elapsed_ms = config_started_at.elapsed().as_millis(),
plugins_enabled = config.features.enabled(Feature::Plugins),
remote_plugins_enabled = config.features.enabled(Feature::RemotePlugin),
"plugin/list timing loaded config"
);
let empty_response = || PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
};
if !config.features.enabled(Feature::Plugins) {
warn!(
elapsed_ms = request_started_at.elapsed().as_millis(),
"plugin/list timing completed with plugins disabled"
);
return Ok(empty_response());
}
let auth_started_at = Instant::now();
let auth = self.auth_manager.auth().await;
if !self
warn!(
elapsed_ms = auth_started_at.elapsed().as_millis(),
has_auth = auth.is_some(),
"plugin/list timing loaded auth"
);
let workspace_setting_started_at = Instant::now();
let workspace_plugins_enabled = self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await
{
.await;
warn!(
elapsed_ms = workspace_setting_started_at.elapsed().as_millis(),
workspace_plugins_enabled, "plugin/list timing checked workspace setting"
);
if !workspace_plugins_enabled {
warn!(
elapsed_ms = request_started_at.elapsed().as_millis(),
"plugin/list timing completed with workspace plugins disabled"
);
return Ok(empty_response());
}
let plugins_input = config.plugins_config_input();
let (mut data, marketplace_load_errors) = if include_local {
let background_tasks_started_at = Instant::now();
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
&plugins_input,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback()),
);
warn!(
elapsed_ms = background_tasks_started_at.elapsed().as_millis(),
"plugin/list timing started background tasks"
);
let config_for_marketplace_listing = plugins_input.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
let shared_plugin_ids_started_at = Instant::now();
let shared_plugin_ids_by_local_path = load_shared_plugin_ids_by_local_path(&config);
warn!(
elapsed_ms = shared_plugin_ids_started_at.elapsed().as_millis(),
shared_plugin_ids_count = shared_plugin_ids_by_local_path.len(),
"plugin/list timing loaded shared plugin ids"
);
let local_listing_started_at = Instant::now();
match tokio::task::spawn_blocking(move || {
let outcome = plugins_manager_for_marketplace_listing
.list_marketplaces_for_config(&config_for_marketplace_listing, &roots)?;
@@ -447,7 +495,21 @@ impl PluginRequestProcessor {
})
.await
{
Ok(Ok(outcome)) => outcome,
Ok(Ok(outcome)) => {
let plugin_count = outcome
.0
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum::<usize>();
warn!(
elapsed_ms = local_listing_started_at.elapsed().as_millis(),
marketplace_count = outcome.0.len(),
plugin_count,
load_error_count = outcome.1.len(),
"plugin/list timing listed local marketplaces"
);
outcome
}
Ok(Err(err)) => {
return Err(Self::marketplace_error(err, "list marketplace plugins"));
}
@@ -472,6 +534,7 @@ impl PluginRequestProcessor {
remote_sources.push(RemoteMarketplaceSource::SharedWithMe);
}
if !remote_sources.is_empty() {
let remote_marketplaces_started_at = Instant::now();
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
@@ -483,6 +546,7 @@ impl PluginRequestProcessor {
.await
{
Ok(remote_marketplaces) => {
let fetched_remote_marketplace_count = remote_marketplaces.len();
for remote_marketplace in remote_marketplaces
.into_iter()
.map(remote_marketplace_to_info)
@@ -496,11 +560,25 @@ impl PluginRequestProcessor {
data.push(remote_marketplace);
}
}
warn!(
elapsed_ms = remote_marketplaces_started_at.elapsed().as_millis(),
remote_source_count = remote_sources.len(),
fetched_remote_marketplace_count,
merged_marketplace_count = data.len(),
"plugin/list timing fetched remote marketplaces"
);
}
Err(
RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode,
) => {}
err @ (RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode),
) => {
warn!(
elapsed_ms = remote_marketplaces_started_at.elapsed().as_millis(),
remote_source_count = remote_sources.len(),
error = %err,
"plugin/list timing skipped remote marketplaces"
);
}
Err(err) => {
warn!(
error = %err,
@@ -508,12 +586,15 @@ impl PluginRequestProcessor {
);
}
}
} else {
warn!("plugin/list timing skipped remote marketplaces");
}
let featured_plugin_ids = if data
let curated_marketplace_present = data
.iter()
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
{
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME);
let featured_plugin_ids_started_at = Instant::now();
let featured_plugin_ids = if curated_marketplace_present {
match plugins_manager
.featured_plugin_ids_for_config(&plugins_input, auth.as_ref())
.await
@@ -530,6 +611,24 @@ impl PluginRequestProcessor {
} else {
Vec::new()
};
warn!(
elapsed_ms = featured_plugin_ids_started_at.elapsed().as_millis(),
curated_marketplace_present,
featured_plugin_id_count = featured_plugin_ids.len(),
"plugin/list timing loaded featured ids"
);
let plugin_count = data
.iter()
.map(|marketplace| marketplace.plugins.len())
.sum::<usize>();
warn!(
elapsed_ms = request_started_at.elapsed().as_millis(),
marketplace_count = data.len(),
plugin_count,
load_error_count = marketplace_load_errors.len(),
featured_plugin_id_count = featured_plugin_ids.len(),
"plugin/list timing completed"
);
Ok(PluginListResponse {
marketplaces: data,

View File

@@ -4,9 +4,9 @@ use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use codex_app_server_protocol::ClientRequestSerializationScope;
use futures::future::join_all;
use tokio::sync::Mutex;
use tracing::Instrument;
@@ -106,6 +106,8 @@ impl RequestSerializationQueueKey {
pub(crate) struct QueuedInitializedRequest {
gate: Arc<ConnectionRpcGate>,
future: BoxFutureUnit,
method: String,
request_id: String,
}
impl QueuedInitializedRequest {
@@ -116,11 +118,19 @@ impl QueuedInitializedRequest {
Self {
gate,
future: Box::pin(future),
method: "<unknown>".to_string(),
request_id: "<unknown>".to_string(),
}
}
pub(crate) fn with_log_metadata(mut self, method: String, request_id: String) -> Self {
self.method = method;
self.request_id = request_id;
self
}
pub(crate) async fn run(self) {
let Self { gate, future } = self;
let Self { gate, future, .. } = self;
gate.run(future).await;
}
}
@@ -128,11 +138,113 @@ impl QueuedInitializedRequest {
struct QueuedSerializedRequest {
access: RequestSerializationAccess,
request: QueuedInitializedRequest,
enqueued_at: Instant,
}
impl QueuedSerializedRequest {
async fn run(
self,
key: RequestSerializationQueueKey,
batch_size: usize,
queue_depth_after_pop: usize,
) {
let Self {
access,
request,
enqueued_at,
} = self;
let method = request.method.clone();
let request_id = request.request_id.clone();
tracing::warn!(
?key,
?access,
method,
request_id,
queue_wait_ms = enqueued_at.elapsed().as_millis(),
batch_size,
queue_depth_after_pop,
"serialized request started"
);
let started_at = Instant::now();
request.run().await;
tracing::warn!(
?key,
?access,
method,
request_id,
run_ms = started_at.elapsed().as_millis(),
"serialized request completed"
);
}
}
#[derive(Default)]
struct RequestSerializationQueueState {
pending: VecDeque<QueuedSerializedRequest>,
running_shared_reads: usize,
exclusive_running: bool,
}
impl RequestSerializationQueueState {
fn enqueue(&mut self, request: QueuedSerializedRequest) {
self.pending.push_back(request);
}
fn take_ready_requests(&mut self) -> Vec<QueuedSerializedRequest> {
if self.exclusive_running {
return Vec::new();
}
match self.pending.front().map(|request| request.access) {
Some(RequestSerializationAccess::Exclusive) if self.running_shared_reads == 0 => {
let Some(request) = self.pending.pop_front() else {
return Vec::new();
};
self.exclusive_running = true;
vec![request]
}
Some(RequestSerializationAccess::SharedRead) => {
let mut requests = Vec::new();
while self
.pending
.front()
.is_some_and(|request| request.access == RequestSerializationAccess::SharedRead)
{
let Some(request) = self.pending.pop_front() else {
break;
};
self.running_shared_reads += 1;
requests.push(request);
}
requests
}
Some(RequestSerializationAccess::Exclusive) | None => Vec::new(),
}
}
fn complete(&mut self, access: RequestSerializationAccess) {
match access {
RequestSerializationAccess::Exclusive => {
debug_assert!(self.exclusive_running);
self.exclusive_running = false;
}
RequestSerializationAccess::SharedRead => {
debug_assert!(self.running_shared_reads > 0);
self.running_shared_reads -= 1;
}
}
}
fn is_idle(&self) -> bool {
self.pending.is_empty() && self.running_shared_reads == 0 && !self.exclusive_running
}
}
#[derive(Clone, Default)]
pub(crate) struct RequestSerializationQueues {
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, VecDeque<QueuedSerializedRequest>>>>,
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, RequestSerializationQueueState>>>,
}
impl RequestSerializationQueues {
@@ -142,62 +254,112 @@ impl RequestSerializationQueues {
access: RequestSerializationAccess,
request: QueuedInitializedRequest,
) {
let request = QueuedSerializedRequest { access, request };
let should_spawn = {
let mut queues = self.inner.lock().await;
match queues.get_mut(&key) {
Some(queue) => {
queue.push_back(request);
false
}
None => {
let mut queue = VecDeque::new();
queue.push_back(request);
queues.insert(key.clone(), queue);
true
}
}
let method = request.method.clone();
let request_id = request.request_id.clone();
let request = QueuedSerializedRequest {
access,
request,
enqueued_at: Instant::now(),
};
let (
ready_requests,
queue_depth_before,
queued_exclusive_count,
head_access,
head_method,
head_request_id,
queue_depth_after_pop,
) = {
let mut queues = self.inner.lock().await;
let queue = queues.entry(key.clone()).or_default();
let queue_depth_before = queue.pending.len();
let queued_exclusive_count = queue
.pending
.iter()
.filter(|request| request.access == RequestSerializationAccess::Exclusive)
.count();
let head_request = queue.pending.front();
let head_access = head_request.map(|request| request.access);
let head_method = head_request
.map(|request| request.request.method.clone())
.unwrap_or_else(|| "<none>".to_string());
let head_request_id = head_request
.map(|request| request.request.request_id.clone())
.unwrap_or_else(|| "<none>".to_string());
queue.enqueue(request);
let ready_requests = queue.take_ready_requests();
(
ready_requests,
queue_depth_before,
queued_exclusive_count,
head_access,
head_method,
head_request_id,
queue.pending.len(),
)
};
tracing::warn!(
?key,
?access,
method,
request_id,
queue_depth_before,
queue_depth_after = queue_depth_before + 1,
queued_exclusive_count,
?head_access,
head_method,
head_request_id,
"serialized request queued"
);
if should_spawn {
self.spawn_ready_requests(key, ready_requests, queue_depth_after_pop);
}
fn spawn_ready_requests(
&self,
key: RequestSerializationQueueKey,
requests: Vec<QueuedSerializedRequest>,
queue_depth_after_pop: usize,
) {
let batch_size = requests.len();
for request in requests {
let queues = self.clone();
let span = tracing::debug_span!("app_server.serialized_request_queue", ?key);
tokio::spawn(async move { queues.drain(key).await }.instrument(span));
let request_key = key.clone();
let span = tracing::debug_span!("app_server.serialized_request_queue", ?request_key);
tokio::spawn(
async move {
let access = request.access;
request
.run(request_key.clone(), batch_size, queue_depth_after_pop)
.await;
queues.complete(request_key, access).await;
}
.instrument(span),
);
}
}
async fn drain(self, key: RequestSerializationQueueKey) {
loop {
let requests = {
let mut queues = self.inner.lock().await;
let Some(queue) = queues.get_mut(&key) else {
return;
};
match queue.pop_front() {
Some(request) => {
let access = request.access;
let mut requests = vec![request];
if access == RequestSerializationAccess::SharedRead {
while queue.front().is_some_and(|request| {
request.access == RequestSerializationAccess::SharedRead
}) {
let Some(request) = queue.pop_front() else {
break;
};
requests.push(request);
}
}
requests
}
None => {
queues.remove(&key);
return;
}
}
async fn complete(
&self,
key: RequestSerializationQueueKey,
access: RequestSerializationAccess,
) {
let (ready_requests, queue_depth_after_pop) = {
let mut queues = self.inner.lock().await;
let Some(queue) = queues.get_mut(&key) else {
return;
};
queue.complete(access);
let ready_requests = queue.take_ready_requests();
let queue_depth_after_pop = queue.pending.len();
let should_remove = queue.is_idle();
if should_remove {
queues.remove(&key);
}
(ready_requests, queue_depth_after_pop)
};
join_all(requests.into_iter().map(|request| request.request.run())).await;
}
self.spawn_ready_requests(key, ready_requests, queue_depth_after_pop);
}
}
@@ -504,6 +666,52 @@ mod tests {
.expect("shared reads should still be waiting");
}
#[tokio::test]
async fn later_shared_reads_join_running_shared_reads_without_queued_write() {
let queues = RequestSerializationQueues::default();
let key = RequestSerializationQueueKey::Global("test");
let (first_read_started_tx, first_read_started_rx) = oneshot::channel::<()>();
let (first_read_release_tx, first_read_release_rx) = oneshot::channel::<()>();
let (later_read_started_tx, later_read_started_rx) = oneshot::channel::<()>();
queues
.enqueue(
key.clone(),
RequestSerializationAccess::SharedRead,
QueuedInitializedRequest::new(gate(), async move {
first_read_started_tx
.send(())
.expect("receiver should be open");
let _ = first_read_release_rx.await;
}),
)
.await;
timeout(queue_drain_timeout(), first_read_started_rx)
.await
.expect("first read should start")
.expect("sender should be open");
queues
.enqueue(
key,
RequestSerializationAccess::SharedRead,
QueuedInitializedRequest::new(gate(), async move {
later_read_started_tx
.send(())
.expect("receiver should be open");
}),
)
.await;
timeout(queue_drain_timeout(), later_read_started_rx)
.await
.expect("later read should join running reads")
.expect("sender should be open");
first_read_release_tx
.send(())
.expect("first read should still be waiting");
}
#[tokio::test]
async fn exclusive_write_waits_for_running_shared_reads() {
let queues = RequestSerializationQueues::default();