Merge remote-tracking branch 'origin/pr17088' into pr18919-resolve

# Conflicts:
#	codex-rs/analytics/src/client.rs
#	codex-rs/app-server/src/in_process.rs
#	codex-rs/app-server/src/lib.rs
#	codex-rs/app-server/src/message_processor.rs
#	codex-rs/app-server/src/message_processor/tracing_tests.rs
#	codex-rs/app-server/src/outgoing_message.rs
This commit is contained in:
Roy Han
2026-04-27 22:32:00 -07:00
11 changed files with 239 additions and 110 deletions

View File

@@ -432,7 +432,7 @@ async fn ingest_rejected_turn_steer(
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -492,7 +492,7 @@ async fn ingest_turn_prerequisites(
ingest_initialize(reducer, out).await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
@@ -506,7 +506,7 @@ async fn ingest_turn_prerequisites(
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
@@ -516,7 +516,7 @@ async fn ingest_turn_prerequisites(
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
@@ -899,7 +899,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-no-client",
@@ -943,7 +943,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_resume_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
@@ -998,7 +998,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(ClientRequest::ThreadArchive {
@@ -1013,7 +1013,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
@@ -1035,7 +1035,7 @@ async fn unrelated_client_responses_are_ignored_by_reducer() {
ingest_initialize(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(ClientResponse::ThreadArchive {
request_id: RequestId::Integer(9),
@@ -1081,7 +1081,7 @@ async fn compaction_event_ingests_custom_fact() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_resume_response_with_source(
"thread-1",
@@ -1192,7 +1192,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-guardian",
@@ -1962,7 +1962,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -1974,7 +1974,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
@@ -2116,7 +2116,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
@@ -2140,7 +2140,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
// failed turn/start request and attach request-scoped connection metadata.
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
@@ -2257,7 +2257,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -2269,7 +2269,7 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
@@ -2279,7 +2279,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(5),
request: Box::new(sample_turn_steer_request(
@@ -2303,7 +2303,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(6),
request: Box::new(sample_turn_steer_request(
@@ -2315,7 +2315,7 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)),
},

View File

@@ -28,13 +28,14 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use std::time::Duration;
use tokio::sync::mpsc;
@@ -51,51 +52,32 @@ pub(crate) struct AnalyticsEventsQueue {
#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
queue: Option<AnalyticsEventsQueue>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Debug, Clone, Copy)]
pub enum AuthManagerRetention {
Strong,
Weak,
}
enum RetainedAuthManager {
Strong(Arc<AuthManager>),
Weak(Weak<AuthManager>),
}
impl RetainedAuthManager {
fn upgrade(&self) -> Option<Arc<AuthManager>> {
match self {
Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)),
Self::Weak(auth_manager) => auth_manager.upgrade(),
}
}
}
impl AnalyticsEventsQueue {
pub(crate) fn new(
auth_manager: Arc<AuthManager>,
base_url: String,
retention: AuthManagerRetention,
) -> Self {
let auth_manager = match retention {
AuthManagerRetention::Strong => RetainedAuthManager::Strong(auth_manager),
AuthManagerRetention::Weak => RetainedAuthManager::Weak(Arc::downgrade(&auth_manager)),
};
Self::spawn(auth_manager, base_url)
}
fn spawn(auth_manager: RetainedAuthManager, base_url: String) -> Self {
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
let auth_manager = match retention {
AuthManagerRetention::Strong => AuthManagerHandle::Strong(auth_manager),
AuthManagerRetention::Weak => AuthManagerHandle::Weak(Arc::downgrade(&auth_manager)),
};
tokio::spawn(async move {
let mut reducer = AnalyticsReducer::default();
while let Some(input) = receiver.recv().await {
let mut events = Vec::new();
reducer.ingest(input, &mut events).await;
let Some(auth_manager) = auth_manager.upgrade() else {
let Some(auth_manager) = auth_manager.get() else {
break;
};
send_track_events(&auth_manager, &base_url, events).await;
@@ -149,29 +131,40 @@ impl AnalyticsEventsQueue {
}
}
enum AuthManagerHandle {
Strong(Arc<AuthManager>),
Weak(std::sync::Weak<AuthManager>),
}
impl AuthManagerHandle {
fn get(&self) -> Option<Arc<AuthManager>> {
match self {
Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)),
Self::Weak(auth_manager) => auth_manager.upgrade(),
}
}
}
impl AnalyticsEventsClient {
pub fn new(
auth_manager: Arc<AuthManager>,
base_url: String,
analytics_enabled: Option<bool>,
retention: AuthManagerRetention,
auth_manager_retention: AuthManagerRetention,
) -> Self {
Self {
queue: AnalyticsEventsQueue::new(auth_manager, base_url, retention),
analytics_enabled,
queue: (analytics_enabled != Some(false)).then(|| {
AnalyticsEventsQueue::new(
Arc::clone(&auth_manager),
base_url,
auth_manager_retention,
)
}),
}
}
pub fn disabled() -> Self {
let (sender, _receiver) = mpsc::channel(1);
Self {
queue: AnalyticsEventsQueue {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
},
analytics_enabled: Some(false),
}
Self { queue: None }
}
pub fn track_skill_invocations(
@@ -235,7 +228,7 @@ impl AnalyticsEventsClient {
if !tracks_client_request(&request) {
return;
}
self.record_fact(AnalyticsFact::Request {
self.record_fact(AnalyticsFact::ClientRequest {
connection_id,
request_id,
request: Box::new(request),
@@ -243,7 +236,10 @@ impl AnalyticsEventsClient {
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
if !self.queue.should_enqueue_app_used(&tracking, &app) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
@@ -258,7 +254,10 @@ impl AnalyticsEventsClient {
}
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
@@ -321,17 +320,16 @@ impl AnalyticsEventsClient {
}
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
if self.analytics_enabled == Some(false) {
return;
if let Some(queue) = self.queue.as_ref() {
queue.try_send(input);
}
self.queue.try_send(input);
}
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
if !tracks_client_response(&response) {
return;
}
self.record_fact(AnalyticsFact::Response {
self.record_fact(AnalyticsFact::ClientResponse {
connection_id,
response: Box::new(response),
});
@@ -371,6 +369,20 @@ impl AnalyticsEventsClient {
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
request: Box::new(request),
});
}
pub fn track_server_response(&self, connection_id: u64, response: ServerResponse) {
self.record_fact(AnalyticsFact::ServerResponse {
connection_id,
response: Box::new(response),
});
}
}
fn tracks_client_request(request: &ClientRequest) -> bool {

View File

@@ -8,6 +8,8 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
@@ -273,12 +275,12 @@ pub(crate) enum AnalyticsFact {
runtime: CodexRuntimeMetadata,
rpc_transport: AppServerRpcTransport,
},
Request {
ClientRequest {
connection_id: u64,
request_id: RequestId,
request: Box<ClientRequest>,
},
Response {
ClientResponse {
connection_id: u64,
response: Box<ClientResponse>,
},
@@ -293,6 +295,14 @@ pub(crate) enum AnalyticsFact {
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
},
ServerRequest {
connection_id: u64,
request: Box<ServerRequest>,
},
ServerResponse {
connection_id: u64,
response: Box<ServerResponse>,
},
Notification(Box<ServerNotification>),
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.

View File

@@ -171,14 +171,14 @@ impl AnalyticsReducer {
rpc_transport,
);
}
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id,
request_id,
request,
} => {
self.ingest_request(connection_id, request_id, *request);
}
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id,
response,
} => {
@@ -204,6 +204,14 @@ impl AnalyticsReducer {
AnalyticsFact::Notification(notification) => {
self.ingest_notification(*notification, out);
}
AnalyticsFact::ServerRequest {
connection_id: _connection_id,
request: _request,
} => {}
AnalyticsFact::ServerResponse {
connection_id: _connection_id,
response: _response,
} => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);

View File

@@ -803,6 +803,24 @@ macro_rules! server_request_definitions {
$(Self::$variant { request_id, .. } => request_id,)*
}
}
pub fn response_from_result(
&self,
result: &crate::Result,
) -> serde_json::Result<ServerResponse> {
match self {
$(
Self::$variant { request_id, .. } => {
let response =
<$response as serde::Deserialize>::deserialize(result)?;
Ok(ServerResponse::$variant {
request_id: request_id.clone(),
response,
})
}
)*
}
}
}
/// Typed response from the client to the server.

View File

@@ -0,0 +1,18 @@
use std::sync::Arc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AuthManagerRetention;
use codex_core::config::Config;
use codex_login::AuthManager;
pub(crate) fn analytics_events_client_from_config(
auth_manager: Arc<AuthManager>,
config: &Config,
) -> AnalyticsEventsClient {
AnalyticsEventsClient::new(
auth_manager,
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Weak,
)
}

View File

@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::analytics_events::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
@@ -66,9 +67,7 @@ use crate::transport::CHANNEL_CAPACITY;
use crate::transport::ConnectionOrigin;
use crate::transport::OutboundConnectionState;
use crate::transport::route_outgoing_envelope;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AppServerRpcTransport;
use codex_analytics::AuthManagerRetention;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
@@ -370,18 +369,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
.await;
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
args.config
.chatgpt_base_url
.trim_end_matches('/')
.to_string(),
args.config.analytics_enabled,
AuthManagerRetention::Weak,
);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client,
analytics_events_client.clone(),
));
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
@@ -419,6 +411,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let mut processor_handle = tokio::spawn(async move {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
arg0_paths: args.arg0_paths,
config: args.config,
config_manager,
@@ -577,7 +570,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
}
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
outgoing_message_sender
.notify_client_response(request_id, result)
.notify_client_response(
IN_PROCESS_CONNECTION_ID,
request_id,
result,
)
.await;
}
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {

View File

@@ -1,7 +1,5 @@
#![deny(clippy::print_stdout, clippy::print_stderr)]
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AuthManagerRetention;
use codex_arg0::Arg0DispatchPaths;
use codex_config::ConfigLayerStackOrdering;
use codex_config::LoaderOverrides;
@@ -21,6 +19,7 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use crate::analytics_events::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
@@ -69,6 +68,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::Registry;
use tracing_subscriber::util::SubscriberInitExt;
mod analytics_events;
mod app_server_tracing;
mod bespoke_event_handling;
mod codex_message_processor;
@@ -711,19 +711,18 @@ pub async fn run_main_with_transport_options(
});
let processor_handle = tokio::spawn({
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Weak,
);
let outbound_control_tx = outbound_control_tx;
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client,
analytics_events_client.clone(),
));
let outbound_control_tx = outbound_control_tx;
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
analytics_events_client,
arg0_paths,
config: Arc::new(config),
config_manager,
@@ -732,7 +731,7 @@ pub async fn run_main_with_transport_options(
log_db,
config_warnings,
session_source,
auth_manager: Arc::clone(&auth_manager),
auth_manager,
rpc_transport: analytics_rpc_transport(&transport),
remote_control_handle: Some(remote_control_handle),
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
@@ -890,7 +889,7 @@ pub async fn run_main_with_transport_options(
warn!("dropping response from unknown connection: {connection_id:?}");
continue;
}
processor.process_response(response).await;
processor.process_response(connection_id, response).await;
}
JSONRPCMessage::Notification(notification) => {
if !connections.contains_key(&connection_id) {

View File

@@ -25,7 +25,6 @@ use async_trait::async_trait;
use axum::http::HeaderValue;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AppServerRpcTransport;
use codex_analytics::AuthManagerRetention;
use codex_app_server_protocol::AppListUpdatedNotification;
use codex_app_server_protocol::AuthMode as LoginAuthMode;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
@@ -239,6 +238,7 @@ impl ConnectionSessionState {
pub(crate) struct MessageProcessorArgs {
pub(crate) outgoing: Arc<OutgoingMessageSender>,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) arg0_paths: Arg0DispatchPaths,
pub(crate) config: Arc<Config>,
pub(crate) config_manager: ConfigManager,
@@ -259,6 +259,7 @@ impl MessageProcessor {
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
let MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths,
config,
config_manager,
@@ -275,12 +276,6 @@ impl MessageProcessor {
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
}));
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Strong,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
@@ -557,10 +552,16 @@ impl MessageProcessor {
}
/// Handle a standalone JSON-RPC response originating from the peer.
pub(crate) async fn process_response(&self, response: JSONRPCResponse) {
pub(crate) async fn process_response(
&self,
connection_id: ConnectionId,
response: JSONRPCResponse,
) {
tracing::info!("<- response: {:?}", response);
let JSONRPCResponse { id, result, .. } = response;
self.outgoing.notify_client_response(id, result).await
self.outgoing
.notify_client_response(connection_id, id, result)
.await
}
/// Handle an error object received from the peer.

View File

@@ -1,6 +1,7 @@
use super::ConnectionSessionState;
use super::MessageProcessor;
use super::MessageProcessorArgs;
use crate::analytics_events::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingMessageSender;
@@ -264,13 +265,14 @@ async fn build_test_processor(
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let analytics_events_client = codex_analytics::AnalyticsEventsClient::disabled();
let auth_manager =
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let auth_manager =
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await;
let config_manager = ConfigManager::new(
config.codex_home.to_path_buf(),
Vec::new(),
@@ -281,6 +283,7 @@ async fn build_test_processor(
);
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths: Arg0DispatchPaths::default(),
config,
config_manager,

View File

@@ -134,6 +134,7 @@ struct PendingCallbackEntry {
callback: oneshot::Sender<ClientRequestResult>,
thread_id: Option<ThreadId>,
request: ServerRequest,
track_server_response: bool,
}
impl ThreadScopedOutgoingMessageSender {
@@ -302,11 +303,12 @@ impl OutgoingMessageSender {
callback: tx_approve,
thread_id,
request: request.clone(),
track_server_response: connection_ids.is_some(),
},
);
}
let outgoing_message = OutgoingMessage::Request(request);
let outgoing_message = OutgoingMessage::Request(request.clone());
let send_result = match connection_ids {
None => {
self.sender
@@ -329,6 +331,9 @@ impl OutgoingMessageSender {
{
send_error = Some(err);
break;
} else {
self.analytics_events_client
.track_server_request(connection_id.0, request.clone());
}
}
match send_error {
@@ -357,21 +362,35 @@ impl OutgoingMessageSender {
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id,
message: OutgoingMessage::Request(request),
message: OutgoingMessage::Request(request.clone()),
write_complete_tx: None,
})
.await
{
warn!("failed to resend request to client: {err:?}");
} else {
self.analytics_events_client
.track_server_request(connection_id.0, request);
}
}
}
pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) {
pub(crate) async fn notify_client_response(
&self,
connection_id: ConnectionId,
id: RequestId,
result: Result,
) {
let entry = self.take_request_callback(&id).await;
match entry {
Some((id, entry)) => {
if entry.track_server_response
&& let Ok(response) = entry.request.response_from_result(&result)
{
self.analytics_events_client
.track_server_response(connection_id.0, response);
}
if let Err(err) = entry.callback.send(Ok(result)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
}
@@ -687,6 +706,8 @@ mod tests {
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
@@ -697,6 +718,7 @@ mod tests {
use codex_app_server_protocol::ModelVerificationNotification;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_app_server_protocol::ServerResponse;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
@@ -898,6 +920,47 @@ mod tests {
);
}
#[test]
fn server_request_response_from_result_decodes_typed_response() {
let request = ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(7),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
approval_id: None,
reason: None,
network_approval_context: None,
command: Some("echo hi".to_string()),
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
available_decisions: None,
},
};
let response = request
.response_from_result(&json!({
"decision": "acceptForSession",
}))
.expect("decode typed server response");
let ServerResponse::CommandExecutionRequestApproval {
request_id,
response,
} = response
else {
panic!("expected command execution approval response");
};
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(
response.decision,
CommandExecutionApprovalDecision::AcceptForSession
);
}
#[test]
fn verify_model_verification_notification_serialization() {
let notification = ServerNotification::ModelVerification(ModelVerificationNotification {