add request serialization queue timing logs

This commit is contained in:
xli-oai
2026-05-07 02:36:02 -07:00
parent c2a4cae364
commit ff1ad6deae
2 changed files with 116 additions and 9 deletions

View File

@@ -771,6 +771,8 @@ impl MessageProcessor {
);
let serialization_scope = codex_request.serialization_scope();
let serialization_method = codex_request.method();
let serialization_request_id = connection_request_id.request_id.to_string();
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
let device_key_requests_allowed = session.allows_device_key_requests();
@@ -797,7 +799,8 @@ impl MessageProcessor {
}
}
.instrument(span),
);
)
.with_log_metadata(serialization_method, serialization_request_id);
if let Some(scope) = serialization_scope {
let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope);

View File

@@ -4,6 +4,7 @@ use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use codex_app_server_protocol::ClientRequestSerializationScope;
use futures::future::join_all;
@@ -106,6 +107,8 @@ impl RequestSerializationQueueKey {
pub(crate) struct QueuedInitializedRequest {
gate: Arc<ConnectionRpcGate>,
future: BoxFutureUnit,
method: String,
request_id: String,
}
impl QueuedInitializedRequest {
@@ -116,11 +119,19 @@ impl QueuedInitializedRequest {
Self {
gate,
future: Box::pin(future),
method: "<unknown>".to_string(),
request_id: "<unknown>".to_string(),
}
}
pub(crate) fn with_log_metadata(mut self, method: String, request_id: String) -> Self {
self.method = method;
self.request_id = request_id;
self
}
pub(crate) async fn run(self) {
let Self { gate, future } = self;
let Self { gate, future, .. } = self;
gate.run(future).await;
}
}
@@ -128,6 +139,46 @@ impl QueuedInitializedRequest {
struct QueuedSerializedRequest {
access: RequestSerializationAccess,
request: QueuedInitializedRequest,
enqueued_at: Instant,
}
impl QueuedSerializedRequest {
async fn run(
self,
key: RequestSerializationQueueKey,
batch_size: usize,
queue_depth_after_pop: usize,
) {
let Self {
access,
request,
enqueued_at,
} = self;
let method = request.method.clone();
let request_id = request.request_id.clone();
tracing::info!(
?key,
?access,
method,
request_id,
queue_wait_ms = enqueued_at.elapsed().as_millis(),
batch_size,
queue_depth_after_pop,
"serialized request started"
);
let started_at = Instant::now();
request.run().await;
tracing::info!(
?key,
?access,
method,
request_id,
run_ms = started_at.elapsed().as_millis(),
"serialized request completed"
);
}
}
#[derive(Clone, Default)]
@@ -142,23 +193,70 @@ impl RequestSerializationQueues {
access: RequestSerializationAccess,
request: QueuedInitializedRequest,
) {
let request = QueuedSerializedRequest { access, request };
let should_spawn = {
let method = request.method.clone();
let request_id = request.request_id.clone();
let request = QueuedSerializedRequest {
access,
request,
enqueued_at: Instant::now(),
};
let (
should_spawn,
queue_depth_before,
queued_exclusive_count,
head_access,
head_method,
head_request_id,
) = {
let mut queues = self.inner.lock().await;
match queues.get_mut(&key) {
Some(queue) => {
let queue_depth_before = queue.len();
let queued_exclusive_count = queue
.iter()
.filter(|request| request.access == RequestSerializationAccess::Exclusive)
.count();
let head_request = queue.front();
let head_access = head_request.map(|request| request.access);
let head_method = head_request
.map(|request| request.request.method.clone())
.unwrap_or_else(|| "<none>".to_string());
let head_request_id = head_request
.map(|request| request.request.request_id.clone())
.unwrap_or_else(|| "<none>".to_string());
queue.push_back(request);
false
(
false,
queue_depth_before,
queued_exclusive_count,
head_access,
head_method,
head_request_id,
)
}
None => {
let mut queue = VecDeque::new();
queue.push_back(request);
queues.insert(key.clone(), queue);
true
(true, 0, 0, None, "<none>".to_string(), "<none>".to_string())
}
}
};
tracing::info!(
?key,
?access,
method,
request_id,
queue_depth_before,
queue_depth_after = queue_depth_before + 1,
queued_exclusive_count,
?head_access,
head_method,
head_request_id,
"serialized request queued"
);
if should_spawn {
let queues = self.clone();
let span = tracing::debug_span!("app_server.serialized_request_queue", ?key);
@@ -168,7 +266,7 @@ impl RequestSerializationQueues {
async fn drain(self, key: RequestSerializationQueueKey) {
loop {
let requests = {
let (requests, queue_depth_after_pop) = {
let mut queues = self.inner.lock().await;
let Some(queue) = queues.get_mut(&key) else {
return;
@@ -187,7 +285,7 @@ impl RequestSerializationQueues {
requests.push(request);
}
}
requests
(requests, queue.len())
}
None => {
queues.remove(&key);
@@ -196,7 +294,13 @@ impl RequestSerializationQueues {
}
};
join_all(requests.into_iter().map(|request| request.request.run())).await;
let batch_size = requests.len();
join_all(
requests
.into_iter()
.map(|request| request.run(key.clone(), batch_size, queue_depth_after_pop)),
)
.await;
}
}
}