Compare commits

...

1 Commits

Author SHA1 Message Date
rhan-oai
d02455011f [codex-analytics] ingest server requests and responses 2026-04-07 22:37:09 -07:00
5 changed files with 117 additions and 1 deletions

View File

@@ -16,6 +16,8 @@ use crate::facts::TrackEventsContext;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
@@ -227,6 +229,19 @@ impl AnalyticsEventsClient {
response: Box::new(response),
});
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
request: Box::new(request),
});
}
pub fn track_server_response(&self, response: ServerResponse) {
self.record_fact(AnalyticsFact::ServerResponse {
response: Box::new(response),
});
}
}
async fn send_track_events(

View File

@@ -5,6 +5,8 @@ use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::protocol::SkillScope;
use codex_protocol::protocol::SubAgentSource;
@@ -81,6 +83,13 @@ pub(crate) enum AnalyticsFact {
connection_id: u64,
response: Box<ClientResponse>,
},
ServerRequest {
connection_id: u64,
request: Box<ServerRequest>,
},
ServerResponse {
response: Box<ServerResponse>,
},
Notification(Box<ServerNotification>),
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.

View File

@@ -76,6 +76,13 @@ impl AnalyticsReducer {
} => {
self.ingest_response(connection_id, *response, out);
}
AnalyticsFact::ServerRequest {
connection_id: _connection_id,
request: _request,
} => {}
AnalyticsFact::ServerResponse {
response: _response,
} => {}
AnalyticsFact::Notification(_notification) => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {

View File

@@ -239,6 +239,7 @@ impl MessageProcessor {
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
);
outgoing.set_analytics_events_client(analytics_events_client.clone());
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());

View File

@@ -1,15 +1,18 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ServerResponse;
use codex_otel::span_w3c_trace_context;
use codex_protocol::ThreadId;
use codex_protocol::protocol::W3cTraceContext;
@@ -117,6 +120,7 @@ pub(crate) struct OutgoingMessageSender {
/// We keep them here because this is where responses, errors, and
/// disconnect cleanup all get handled.
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
analytics_events_client: StdMutex<Option<AnalyticsEventsClient>>,
}
#[derive(Clone)]
@@ -209,9 +213,28 @@ impl OutgoingMessageSender {
sender,
request_id_to_callback: Mutex::new(HashMap::new()),
request_contexts: Mutex::new(HashMap::new()),
analytics_events_client: StdMutex::new(None),
}
}
pub(crate) fn set_analytics_events_client(
&self,
analytics_events_client: AnalyticsEventsClient,
) {
let mut client = self
.analytics_events_client
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*client = Some(analytics_events_client);
}
fn analytics_events_client(&self) -> Option<AnalyticsEventsClient> {
self.analytics_events_client
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
pub(crate) async fn register_request_context(&self, request_context: RequestContext) {
let mut request_contexts = self.request_contexts.lock().await;
if request_contexts
@@ -298,7 +321,7 @@ impl OutgoingMessageSender {
);
}
let outgoing_message = OutgoingMessage::Request(request);
let outgoing_message = OutgoingMessage::Request(request.clone());
let send_result = match connection_ids {
None => {
self.sender
@@ -321,6 +344,9 @@ impl OutgoingMessageSender {
{
send_error = Some(err);
break;
} else if let Some(analytics_events_client) = self.analytics_events_client() {
analytics_events_client
.track_server_request(connection_id.0, request.clone());
}
}
match send_error {
@@ -364,6 +390,11 @@ impl OutgoingMessageSender {
match entry {
Some((id, entry)) => {
if let Some(response) = server_response_from_result(&entry.request, result.clone())
&& let Some(analytics_events_client) = self.analytics_events_client()
{
analytics_events_client.track_server_response(response);
}
if let Err(err) = entry.callback.send(Ok(result)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
}
@@ -621,6 +652,14 @@ impl OutgoingMessageSender {
}
}
fn server_response_from_result(request: &ServerRequest, result: Result) -> Option<ServerResponse> {
let mut value = serde_json::to_value(request).ok()?;
let object = value.as_object_mut()?;
object.remove("params");
object.insert("response".to_string(), result);
serde_json::from_value(value).ok()
}
/// Outgoing message from the server to the client.
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
@@ -654,6 +693,8 @@ mod tests {
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
@@ -838,6 +879,49 @@ mod tests {
);
}
#[test]
fn server_response_from_result_decodes_typed_response_with_original_method() {
let request = ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(7),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
approval_id: None,
reason: None,
network_approval_context: None,
command: Some("echo hi".to_string()),
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
available_decisions: None,
},
};
let response = server_response_from_result(
&request,
json!({
"decision": "acceptForSession",
}),
)
.expect("decode typed server response");
let ServerResponse::CommandExecutionRequestApproval {
request_id,
response,
} = response
else {
panic!("expected command execution approval response");
};
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(
response.decision,
CommandExecutionApprovalDecision::AcceptForSession
);
}
#[tokio::test]
async fn send_response_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);