diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index addf428e55..cab23590b0 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -771,6 +771,8 @@ impl MessageProcessor { ); let serialization_scope = codex_request.serialization_scope(); + let serialization_method = codex_request.method(); + let serialization_request_id = connection_request_id.request_id.to_string(); let app_server_client_name = session.app_server_client_name().map(str::to_string); let client_version = session.client_version().map(str::to_string); let device_key_requests_allowed = session.allows_device_key_requests(); @@ -797,7 +799,8 @@ impl MessageProcessor { } } .instrument(span), - ); + ) + .with_log_metadata(serialization_method, serialization_request_id); if let Some(scope) = serialization_scope { let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope); diff --git a/codex-rs/app-server/src/request_serialization.rs b/codex-rs/app-server/src/request_serialization.rs index 0dd167b74d..8f26b7bb91 100644 --- a/codex-rs/app-server/src/request_serialization.rs +++ b/codex-rs/app-server/src/request_serialization.rs @@ -4,6 +4,7 @@ use std::future::Future; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; +use std::time::Instant; use codex_app_server_protocol::ClientRequestSerializationScope; use futures::future::join_all; @@ -106,6 +107,8 @@ impl RequestSerializationQueueKey { pub(crate) struct QueuedInitializedRequest { gate: Arc, future: BoxFutureUnit, + method: String, + request_id: String, } impl QueuedInitializedRequest { @@ -116,11 +119,19 @@ impl QueuedInitializedRequest { Self { gate, future: Box::pin(future), + method: "".to_string(), + request_id: "".to_string(), } } + pub(crate) fn with_log_metadata(mut self, method: String, request_id: String) -> Self { + self.method = method; + self.request_id = request_id; + self + } + pub(crate) async fn run(self) { - let Self { gate, future } = self; + let Self { gate, future, .. } = self; gate.run(future).await; } } @@ -128,6 +139,46 @@ impl QueuedInitializedRequest { struct QueuedSerializedRequest { access: RequestSerializationAccess, request: QueuedInitializedRequest, + enqueued_at: Instant, +} + +impl QueuedSerializedRequest { + async fn run( + self, + key: RequestSerializationQueueKey, + batch_size: usize, + queue_depth_after_pop: usize, + ) { + let Self { + access, + request, + enqueued_at, + } = self; + let method = request.method.clone(); + let request_id = request.request_id.clone(); + tracing::info!( + ?key, + ?access, + method, + request_id, + queue_wait_ms = enqueued_at.elapsed().as_millis(), + batch_size, + queue_depth_after_pop, + "serialized request started" + ); + + let started_at = Instant::now(); + request.run().await; + + tracing::info!( + ?key, + ?access, + method, + request_id, + run_ms = started_at.elapsed().as_millis(), + "serialized request completed" + ); + } } #[derive(Clone, Default)] @@ -142,23 +193,70 @@ impl RequestSerializationQueues { access: RequestSerializationAccess, request: QueuedInitializedRequest, ) { - let request = QueuedSerializedRequest { access, request }; - let should_spawn = { + let method = request.method.clone(); + let request_id = request.request_id.clone(); + let request = QueuedSerializedRequest { + access, + request, + enqueued_at: Instant::now(), + }; + let ( + should_spawn, + queue_depth_before, + queued_exclusive_count, + head_access, + head_method, + head_request_id, + ) = { let mut queues = self.inner.lock().await; match queues.get_mut(&key) { Some(queue) => { + let queue_depth_before = queue.len(); + let queued_exclusive_count = queue + .iter() + .filter(|request| request.access == RequestSerializationAccess::Exclusive) + .count(); + let head_request = queue.front(); + let head_access = head_request.map(|request| request.access); + let head_method = head_request + .map(|request| request.request.method.clone()) + .unwrap_or_else(|| "".to_string()); + let head_request_id = head_request + .map(|request| request.request.request_id.clone()) + .unwrap_or_else(|| "".to_string()); queue.push_back(request); - false + ( + false, + queue_depth_before, + queued_exclusive_count, + head_access, + head_method, + head_request_id, + ) } None => { let mut queue = VecDeque::new(); queue.push_back(request); queues.insert(key.clone(), queue); - true + (true, 0, 0, None, "".to_string(), "".to_string()) } } }; + tracing::info!( + ?key, + ?access, + method, + request_id, + queue_depth_before, + queue_depth_after = queue_depth_before + 1, + queued_exclusive_count, + ?head_access, + head_method, + head_request_id, + "serialized request queued" + ); + if should_spawn { let queues = self.clone(); let span = tracing::debug_span!("app_server.serialized_request_queue", ?key); @@ -168,7 +266,7 @@ impl RequestSerializationQueues { async fn drain(self, key: RequestSerializationQueueKey) { loop { - let requests = { + let (requests, queue_depth_after_pop) = { let mut queues = self.inner.lock().await; let Some(queue) = queues.get_mut(&key) else { return; @@ -187,7 +285,7 @@ impl RequestSerializationQueues { requests.push(request); } } - requests + (requests, queue.len()) } None => { queues.remove(&key); @@ -196,7 +294,13 @@ impl RequestSerializationQueues { } }; - join_all(requests.into_iter().map(|request| request.request.run())).await; + let batch_size = requests.len(); + join_all( + requests + .into_iter() + .map(|request| request.run(key.clone(), batch_size, queue_depth_after_pop)), + ) + .await; } } }