mirror of
https://github.com/openai/codex.git
synced 2026-05-14 08:12:36 +00:00
[codex-analytics] ingest server requests and responses
This commit is contained in:
@@ -426,7 +426,7 @@ async fn ingest_rejected_turn_steer(
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -486,7 +486,7 @@ async fn ingest_turn_prerequisites(
|
||||
ingest_initialize(reducer, out).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-2", /*ephemeral*/ false, "gpt-5",
|
||||
@@ -500,7 +500,7 @@ async fn ingest_turn_prerequisites(
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
|
||||
@@ -510,7 +510,7 @@ async fn ingest_turn_prerequisites(
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
@@ -862,7 +862,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-no-client",
|
||||
@@ -906,7 +906,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response(
|
||||
"thread-1", /*ephemeral*/ true, "gpt-5",
|
||||
@@ -986,7 +986,7 @@ async fn compaction_event_ingests_custom_fact() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
"thread-1",
|
||||
@@ -1097,7 +1097,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-guardian",
|
||||
@@ -1867,7 +1867,7 @@ async fn accepted_turn_steer_emits_expected_event() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -1879,7 +1879,7 @@ async fn accepted_turn_steer_emits_expected_event() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
|
||||
},
|
||||
@@ -2021,7 +2021,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
|
||||
ingest_initialize(&mut reducer, &mut out).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
|
||||
@@ -2045,7 +2045,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
|
||||
// failed turn/start request and attach request-scoped connection metadata.
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
@@ -2162,7 +2162,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -2174,7 +2174,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
|
||||
},
|
||||
@@ -2184,7 +2184,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(5),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -2208,7 +2208,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(6),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -2220,7 +2220,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)),
|
||||
},
|
||||
|
||||
@@ -27,6 +27,8 @@ use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
@@ -49,8 +51,7 @@ pub(crate) struct AnalyticsEventsQueue {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AnalyticsEventsClient {
|
||||
queue: AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
queue: Option<AnalyticsEventsQueue>,
|
||||
}
|
||||
|
||||
impl AnalyticsEventsQueue {
|
||||
@@ -119,11 +120,15 @@ impl AnalyticsEventsClient {
|
||||
analytics_enabled: Option<bool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
|
||||
analytics_enabled,
|
||||
queue: (analytics_enabled != Some(false))
|
||||
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disabled() -> Self {
|
||||
Self { queue: None }
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
@@ -182,7 +187,7 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
|
||||
self.record_fact(AnalyticsFact::Request {
|
||||
self.record_fact(AnalyticsFact::ClientRequest {
|
||||
connection_id,
|
||||
request_id,
|
||||
request: Box::new(request),
|
||||
@@ -190,7 +195,10 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
let Some(queue) = self.queue.as_ref() else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
@@ -205,7 +213,10 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
||||
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
let Some(queue) = self.queue.as_ref() else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
@@ -268,14 +279,13 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
|
||||
if self.analytics_enabled == Some(false) {
|
||||
return;
|
||||
if let Some(queue) = self.queue.as_ref() {
|
||||
queue.try_send(input);
|
||||
}
|
||||
self.queue.try_send(input);
|
||||
}
|
||||
|
||||
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
|
||||
self.record_fact(AnalyticsFact::Response {
|
||||
self.record_fact(AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
response: Box::new(response),
|
||||
});
|
||||
@@ -299,6 +309,19 @@ impl AnalyticsEventsClient {
|
||||
pub fn track_notification(&self, notification: ServerNotification) {
|
||||
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
|
||||
}
|
||||
|
||||
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
|
||||
self.record_fact(AnalyticsFact::ServerRequest {
|
||||
connection_id,
|
||||
request: Box::new(request),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_server_response(&self, response: ServerResponse) {
|
||||
self.record_fact(AnalyticsFact::ServerResponse {
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
|
||||
@@ -7,6 +7,8 @@ use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::config_types::ApprovalsReviewer;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
@@ -272,12 +274,12 @@ pub(crate) enum AnalyticsFact {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
},
|
||||
Request {
|
||||
ClientRequest {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
Response {
|
||||
ClientResponse {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
@@ -287,6 +289,13 @@ pub(crate) enum AnalyticsFact {
|
||||
error: JSONRPCErrorError,
|
||||
error_type: Option<AnalyticsJsonRpcError>,
|
||||
},
|
||||
ServerRequest {
|
||||
connection_id: u64,
|
||||
request: Box<ServerRequest>,
|
||||
},
|
||||
ServerResponse {
|
||||
response: Box<ServerResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
|
||||
@@ -172,14 +172,14 @@ impl AnalyticsReducer {
|
||||
rpc_transport,
|
||||
);
|
||||
}
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id,
|
||||
request_id,
|
||||
request,
|
||||
} => {
|
||||
self.ingest_request(connection_id, request_id, *request);
|
||||
}
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
response,
|
||||
} => {
|
||||
@@ -196,6 +196,13 @@ impl AnalyticsReducer {
|
||||
AnalyticsFact::Notification(notification) => {
|
||||
self.ingest_notification(*notification, out);
|
||||
}
|
||||
AnalyticsFact::ServerRequest {
|
||||
connection_id: _connection_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::ServerResponse {
|
||||
response: _response,
|
||||
} => {}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
|
||||
@@ -889,6 +889,23 @@ macro_rules! server_request_definitions {
|
||||
$(Self::$variant { request_id, .. } => request_id,)*
|
||||
}
|
||||
}
|
||||
|
||||
pub fn response_from_result(
|
||||
&self,
|
||||
result: crate::Result,
|
||||
) -> serde_json::Result<ServerResponse> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant { request_id, .. } => {
|
||||
let response = serde_json::from_value::<$response>(result)?;
|
||||
Ok(ServerResponse::$variant {
|
||||
request_id: request_id.clone(),
|
||||
response,
|
||||
})
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Typed response from the client to the server.
|
||||
|
||||
16
codex-rs/app-server/src/analytics_utils.rs
Normal file
16
codex-rs/app-server/src/analytics_utils.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_core::config::Config;
|
||||
use codex_login::AuthManager;
|
||||
|
||||
pub(crate) fn analytics_events_client_from_config(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: &Config,
|
||||
) -> AnalyticsEventsClient {
|
||||
AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
)
|
||||
}
|
||||
@@ -3436,7 +3436,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3505,7 +3508,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3595,7 +3601,10 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
let thread_watch_manager = ThreadWatchManager::new();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4221,7 +4230,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let event_turn_id = "complete1".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4287,7 +4299,10 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4335,7 +4350,10 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4377,7 +4395,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4431,7 +4452,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-123".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4520,7 +4544,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-456".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4593,7 +4620,10 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4855,7 +4885,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4893,7 +4926,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4919,7 +4955,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let conversation_id = ThreadId::new();
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
|
||||
@@ -10593,7 +10593,10 @@ mod tests {
|
||||
let connection_id = ConnectionId(7);
|
||||
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
vec![connection_id],
|
||||
|
||||
@@ -726,7 +726,10 @@ mod tests {
|
||||
let manager = CommandExecManager::default();
|
||||
let err = manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
request_id: ConnectionRequestId {
|
||||
connection_id: ConnectionId(1),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(42),
|
||||
@@ -762,7 +765,10 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-99".to_string()),
|
||||
exec_request: windows_sandbox_exec_request(),
|
||||
@@ -809,7 +815,10 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-100".to_string()),
|
||||
exec_request: ExecRequest::new(
|
||||
|
||||
@@ -234,7 +234,10 @@ mod tests {
|
||||
const OUTGOING_BUFFER: usize = 1;
|
||||
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
|
||||
FsWatchManager::new_with_file_watcher(
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
Arc::new(FileWatcher::noop()),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
@@ -365,7 +366,15 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
|
||||
.await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
@@ -390,9 +399,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
|
||||
.await;
|
||||
let config_manager = ConfigManager::new(
|
||||
args.config.codex_home.to_path_buf(),
|
||||
args.cli_overrides,
|
||||
@@ -405,6 +411,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
analytics_events_client,
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
config_manager,
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
@@ -69,6 +70,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::Registry;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
mod analytics_utils;
|
||||
mod app_server_tracing;
|
||||
mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
@@ -728,13 +730,19 @@ pub async fn run_main_with_transport_options(
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let initialize_notification_sender = outgoing_message_sender.clone();
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let initialize_notification_sender = outgoing_message_sender.clone();
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config: Arc::new(config),
|
||||
config_manager,
|
||||
|
||||
@@ -247,6 +247,7 @@ impl ConnectionSessionState {
|
||||
|
||||
pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) arg0_paths: Arg0DispatchPaths,
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) config_manager: ConfigManager,
|
||||
@@ -267,6 +268,7 @@ impl MessageProcessor {
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config,
|
||||
config_manager,
|
||||
@@ -283,11 +285,6 @@ impl MessageProcessor {
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
}));
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::ConnectionSessionState;
|
||||
use super::MessageProcessor;
|
||||
use super::MessageProcessorArgs;
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
@@ -264,7 +265,6 @@ async fn build_test_processor(
|
||||
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await;
|
||||
let config_manager = ConfigManager::new(
|
||||
@@ -275,8 +275,15 @@ async fn build_test_processor(
|
||||
Arg0DispatchPaths::default(),
|
||||
Arc::new(codex_config::NoopThreadConfigLoader),
|
||||
);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config,
|
||||
config_manager,
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
@@ -118,6 +119,7 @@ pub(crate) struct OutgoingMessageSender {
|
||||
/// We keep them here because this is where responses, errors, and
|
||||
/// disconnect cleanup all get handled.
|
||||
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -204,12 +206,16 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
}
|
||||
|
||||
impl OutgoingMessageSender {
|
||||
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
pub(crate) fn new(
|
||||
sender: mpsc::Sender<OutgoingEnvelope>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
) -> Self {
|
||||
Self {
|
||||
next_server_request_id: AtomicI64::new(0),
|
||||
sender,
|
||||
request_id_to_callback: Mutex::new(HashMap::new()),
|
||||
request_contexts: Mutex::new(HashMap::new()),
|
||||
analytics_events_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,7 +305,7 @@ impl OutgoingMessageSender {
|
||||
);
|
||||
}
|
||||
|
||||
let outgoing_message = OutgoingMessage::Request(request);
|
||||
let outgoing_message = OutgoingMessage::Request(request.clone());
|
||||
let send_result = match connection_ids {
|
||||
None => {
|
||||
self.sender
|
||||
@@ -322,6 +328,9 @@ impl OutgoingMessageSender {
|
||||
{
|
||||
send_error = Some(err);
|
||||
break;
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_server_request(connection_id.0, request.clone());
|
||||
}
|
||||
}
|
||||
match send_error {
|
||||
@@ -365,6 +374,9 @@ impl OutgoingMessageSender {
|
||||
|
||||
match entry {
|
||||
Some((id, entry)) => {
|
||||
if let Ok(response) = entry.request.response_from_result(result.clone()) {
|
||||
self.analytics_events_client.track_server_response(response);
|
||||
}
|
||||
if let Err(err) = entry.callback.send(Ok(result)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
}
|
||||
@@ -665,6 +677,8 @@ mod tests {
|
||||
use codex_app_server_protocol::AccountUpdatedNotification;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalParams;
|
||||
@@ -675,6 +689,7 @@ mod tests {
|
||||
use codex_app_server_protocol::ModelVerificationNotification;
|
||||
use codex_app_server_protocol::RateLimitSnapshot;
|
||||
use codex_app_server_protocol::RateLimitWindow;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_app_server_protocol::ToolRequestUserInputParams;
|
||||
use codex_protocol::ThreadId;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -900,10 +915,51 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_request_response_from_result_decodes_typed_response() {
|
||||
let request = ServerRequest::CommandExecutionRequestApproval {
|
||||
request_id: RequestId::Integer(7),
|
||||
params: CommandExecutionRequestApprovalParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
approval_id: None,
|
||||
reason: None,
|
||||
network_approval_context: None,
|
||||
command: Some("echo hi".to_string()),
|
||||
cwd: None,
|
||||
command_actions: None,
|
||||
additional_permissions: None,
|
||||
proposed_execpolicy_amendment: None,
|
||||
proposed_network_policy_amendments: None,
|
||||
available_decisions: None,
|
||||
},
|
||||
};
|
||||
|
||||
let response = request
|
||||
.response_from_result(json!({
|
||||
"decision": "acceptForSession",
|
||||
}))
|
||||
.expect("decode typed server response");
|
||||
|
||||
let ServerResponse::CommandExecutionRequestApproval {
|
||||
request_id,
|
||||
response,
|
||||
} = response
|
||||
else {
|
||||
panic!("expected command execution approval response");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
assert_eq!(
|
||||
response.decision,
|
||||
CommandExecutionApprovalDecision::AcceptForSession
|
||||
);
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn send_response_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -938,7 +994,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_clears_registered_request_context() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -963,7 +1020,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_error_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1001,7 +1059,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let send_task = tokio::spawn(async move {
|
||||
outgoing
|
||||
.send_server_notification_to_connection_and_wait(
|
||||
@@ -1045,7 +1104,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn connection_closed_clears_registered_request_contexts() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let closed_connection_request = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1079,7 +1139,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn notify_client_error_forwards_error_to_waiter() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
|
||||
let (request_id, wait_for_result) = outgoing
|
||||
.send_request(ServerRequestPayload::ApplyPatchApproval(
|
||||
@@ -1113,7 +1174,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
@@ -1171,7 +1235,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
|
||||
@@ -722,6 +722,7 @@ mod tests {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)));
|
||||
|
||||
manager
|
||||
@@ -764,6 +765,7 @@ mod tests {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)));
|
||||
|
||||
manager
|
||||
|
||||
Reference in New Issue
Block a user