Compare commits

...

25 Commits

Author SHA1 Message Date
Roy Han
122a4f62e1 cover centralized response analytics path 2026-04-28 11:02:41 -07:00
Roy Han
42cb76b669 address client response review feedback 2026-04-28 10:50:44 -07:00
Roy Han
a283b3e1d1 annotate analytics client response test ids 2026-04-28 10:13:49 -07:00
Roy Han
3405165f46 [app-server] filter analytics before cloning payloads 2026-04-28 09:15:05 -07:00
Roy Han
0caa739982 [app-server] collapse client response analytics facts 2026-04-28 07:33:36 -07:00
Roy Han
114566ae9d Merge remote-tracking branch 'origin/pr17088' into pr18919-resolve 2026-04-27 22:40:20 -07:00
rhan-oai
b01cb5fb03 Merge branch 'main' into pr17088 2026-04-27 22:33:34 -07:00
Roy Han
47d32d5cd9 Merge remote-tracking branch 'origin/pr17088' into pr18919-resolve
# Conflicts:
#	codex-rs/analytics/src/client.rs
#	codex-rs/app-server/src/in_process.rs
#	codex-rs/app-server/src/lib.rs
#	codex-rs/app-server/src/message_processor.rs
#	codex-rs/app-server/src/message_processor/tracing_tests.rs
#	codex-rs/app-server/src/outgoing_message.rs
2026-04-27 22:32:00 -07:00
Roy Han
40bceeb2ce [codex-analytics] fix auth retention test lint 2026-04-27 17:03:17 -07:00
Roy Han
54591dac4e [codex-analytics] annotate analytics-enabled test args 2026-04-27 17:02:45 -07:00
Roy Han
a12c04ce37 Merge remote-tracking branch 'origin/main' into pr18919-resolve
# Conflicts:
#	codex-rs/app-server/src/codex_message_processor.rs
2026-04-27 16:55:46 -07:00
Roy Han
3be8cc539f revert tracing test stack wrapper 2026-04-27 16:54:31 -07:00
Roy Han
15e71997f3 [codex-analytics] make auth retention explicit 2026-04-27 16:39:16 -07:00
Roy Han
fe7e8e7a0c [codex-analytics] make auth retention explicit 2026-04-27 16:39:15 -07:00
Roy Han
1970393a6b Merge remote-tracking branch 'origin/main' into pr18919-resolve
# Conflicts:
#	codex-rs/app-server/src/codex_message_processor.rs
2026-04-27 15:17:29 -07:00
Roy Han
d3c7bf2609 Merge remote-tracking branch 'origin/main' into pr18919-resolve
# Conflicts:
#	codex-rs/app-server/src/bespoke_event_handling.rs
#	codex-rs/app-server/src/codex_message_processor.rs
#	codex-rs/app-server/src/codex_message_processor/plugins.rs
#	codex-rs/app-server/src/in_process.rs
#	codex-rs/app-server/src/lib.rs
#	codex-rs/app-server/src/message_processor.rs
2026-04-27 15:05:31 -07:00
Roy Han
98f89b7ea8 Merge remote-tracking branch 'origin/main' into pr17088
# Conflicts:
#	codex-rs/app-server/src/in_process.rs
#	codex-rs/app-server/src/lib.rs
#	codex-rs/app-server/src/message_processor/tracing_tests.rs
2026-04-27 13:50:03 -07:00
Roy Han
82c12ae6bf simplify server request tracking state 2026-04-27 12:50:55 -07:00
Roy Han
30c33d883e type client responses 2026-04-24 16:43:53 -04:00
Roy Han
c092aa338c ignore untracked broadcast server responses 2026-04-23 17:47:25 -04:00
Roy Han
34733b8723 avoid cloning server response payloads 2026-04-23 17:03:26 -04:00
Roy Han
5c30a31222 track broadcast server request responses 2026-04-23 16:55:36 -04:00
Roy Han
a89b81baaf track replayed server requests 2026-04-23 16:47:36 -04:00
Roy Han
ad5754b23d Merge remote-tracking branch 'origin/main' into pr17088
# Conflicts:
#	codex-rs/app-server/src/outgoing_message.rs
2026-04-23 16:39:02 -04:00
rhan-oai
840f2711df [codex-analytics] ingest server requests and responses 2026-04-22 12:58:36 -07:00
25 changed files with 1549 additions and 352 deletions

View File

@@ -1,4 +1,6 @@
use crate::client::AnalyticsEventsClient;
use crate::client::AnalyticsEventsQueue;
use crate::client::AuthManagerRetention;
use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
@@ -59,7 +61,7 @@ use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
@@ -71,6 +73,8 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -83,6 +87,8 @@ use codex_app_server_protocol::TurnStatus as AppServerTurnStatus;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::default_client::DEFAULT_ORIGINATOR;
use codex_login::default_client::originator;
use codex_plugin::AppConnectorId;
@@ -141,23 +147,24 @@ fn sample_thread_with_source(
}
}
fn sample_thread_start_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
ClientResponse::ThreadStart {
request_id: RequestId::Integer(1),
response: ThreadStartResponse {
thread: sample_thread(thread_id, ephemeral),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
permission_profile: Some(sample_permission_profile()),
reasoning_effort: None,
},
}
fn sample_thread_start_response(
thread_id: &str,
ephemeral: bool,
model: &str,
) -> ClientResponsePayload {
ClientResponsePayload::ThreadStart(ThreadStartResponse {
thread: sample_thread(thread_id, ephemeral),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
permission_profile: Some(sample_permission_profile()),
reasoning_effort: None,
})
}
fn sample_permission_profile() -> AppServerPermissionProfile {
@@ -183,7 +190,11 @@ fn sample_runtime_metadata() -> CodexRuntimeMetadata {
}
}
fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
fn sample_thread_resume_response(
thread_id: &str,
ephemeral: bool,
model: &str,
) -> ClientResponsePayload {
sample_thread_resume_response_with_source(
thread_id,
ephemeral,
@@ -197,23 +208,20 @@ fn sample_thread_resume_response_with_source(
ephemeral: bool,
model: &str,
source: AppServerSessionSource,
) -> ClientResponse {
ClientResponse::ThreadResume {
request_id: RequestId::Integer(2),
response: ThreadResumeResponse {
thread: sample_thread_with_source(thread_id, ephemeral, source),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
permission_profile: Some(sample_permission_profile()),
reasoning_effort: None,
},
}
) -> ClientResponsePayload {
ClientResponsePayload::ThreadResume(ThreadResumeResponse {
thread: sample_thread_with_source(thread_id, ephemeral, source),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
permission_profile: Some(sample_permission_profile()),
reasoning_effort: None,
})
}
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
@@ -235,21 +243,18 @@ fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest
}
}
fn sample_turn_start_response(turn_id: &str, request_id: i64) -> ClientResponse {
ClientResponse::TurnStart {
request_id: RequestId::Integer(request_id),
response: codex_app_server_protocol::TurnStartResponse {
turn: Turn {
id: turn_id.to_string(),
items: vec![],
status: AppServerTurnStatus::InProgress,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
},
fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload {
ClientResponsePayload::TurnStart(codex_app_server_protocol::TurnStartResponse {
turn: Turn {
id: turn_id.to_string(),
items: vec![],
status: AppServerTurnStatus::InProgress,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
},
}
})
}
fn sample_turn_started_notification(thread_id: &str, turn_id: &str) -> ServerNotification {
@@ -355,12 +360,21 @@ fn sample_turn_steer_request(
}
}
fn sample_turn_steer_response(turn_id: &str, request_id: i64) -> ClientResponse {
ClientResponse::TurnSteer {
fn sample_turn_steer_response(turn_id: &str) -> ClientResponsePayload {
ClientResponsePayload::TurnSteer(TurnSteerResponse {
turn_id: turn_id.to_string(),
})
}
fn client_response_fact(
connection_id: u64,
request_id: i64,
response: ClientResponsePayload,
) -> AnalyticsFact {
AnalyticsFact::ClientResponse {
connection_id,
request_id: RequestId::Integer(request_id),
response: TurnSteerResponse {
turn_id: turn_id.to_string(),
},
response: Box::new(response),
}
}
@@ -426,7 +440,7 @@ async fn ingest_rejected_turn_steer(
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -486,12 +500,11 @@ async fn ingest_turn_prerequisites(
ingest_initialize(reducer, out).await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 1,
sample_thread_start_response("thread-2", /*ephemeral*/ false, "gpt-5"),
),
out,
)
.await;
@@ -500,7 +513,7 @@ async fn ingest_turn_prerequisites(
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
@@ -510,10 +523,11 @@ async fn ingest_turn_prerequisites(
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 3,
sample_turn_start_response("turn-2"),
),
out,
)
.await;
@@ -793,6 +807,37 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() {
assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true);
}
#[tokio::test]
async fn default_client_retains_auth_manager() {
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
let weak_auth_manager = Arc::downgrade(&auth_manager);
let _client = AnalyticsEventsClient::new(
auth_manager,
"http://localhost".to_string(),
/*analytics_enabled*/ None,
AuthManagerRetention::Strong,
);
assert!(weak_auth_manager.upgrade().is_some());
}
#[tokio::test]
async fn non_owning_client_does_not_retain_auth_manager() {
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
let weak_auth_manager = Arc::downgrade(&auth_manager);
let _client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
"http://localhost".to_string(),
/*analytics_enabled*/ None,
AuthManagerRetention::Weak,
);
drop(auth_manager);
assert!(weak_auth_manager.upgrade().is_none());
}
#[test]
fn thread_initialized_event_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(ThreadInitializedEvent {
@@ -862,14 +907,11 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-no-client",
/*ephemeral*/ false,
"gpt-5",
)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 1,
sample_thread_start_response("thread-no-client", /*ephemeral*/ false, "gpt-5"),
),
&mut events,
)
.await;
@@ -906,12 +948,11 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_resume_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 2,
sample_thread_resume_response("thread-1", /*ephemeral*/ true, "gpt-5"),
),
&mut events,
)
.await;
@@ -954,6 +995,63 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
);
}
#[tokio::test]
async fn unrelated_client_requests_are_ignored_by_reducer() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(ClientRequest::ThreadArchive {
request_id: RequestId::Integer(3),
params: ThreadArchiveParams {
thread_id: "thread-2".to_string(),
},
}),
},
&mut events,
)
.await;
reducer
.ingest(
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 3,
sample_turn_start_response("turn-2"),
),
&mut events,
)
.await;
assert!(
events.is_empty(),
"unrelated requests must not create pending turn state"
);
}
#[tokio::test]
async fn unrelated_client_responses_are_ignored_by_reducer() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_initialize(&mut reducer, &mut events).await;
reducer
.ingest(
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 9,
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
),
&mut events,
)
.await;
assert!(events.is_empty());
}
#[tokio::test]
async fn compaction_event_ingests_custom_fact() {
let mut reducer = AnalyticsReducer::default();
@@ -986,9 +1084,10 @@ async fn compaction_event_ingests_custom_fact() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_resume_response_with_source(
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 2,
sample_thread_resume_response_with_source(
"thread-1",
/*ephemeral*/ false,
"gpt-5",
@@ -999,8 +1098,8 @@ async fn compaction_event_ingests_custom_fact() {
agent_nickname: None,
agent_role: None,
}),
)),
},
),
),
&mut events,
)
.await;
@@ -1097,14 +1196,11 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-guardian",
/*ephemeral*/ false,
"gpt-5",
)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 1,
sample_thread_start_response("thread-guardian", /*ephemeral*/ false, "gpt-5"),
),
&mut events,
)
.await;
@@ -1867,7 +1963,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -1879,10 +1975,11 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 4,
sample_turn_steer_response("turn-2"),
),
&mut out,
)
.await;
@@ -2021,7 +2118,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
@@ -2045,10 +2142,11 @@ async fn turn_start_error_response_discards_pending_start_request() {
// failed turn/start request and attach request-scoped connection metadata.
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 3,
sample_turn_start_response("turn-2"),
),
&mut out,
)
.await;
@@ -2162,7 +2260,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -2174,17 +2272,18 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 4,
sample_turn_steer_response("turn-2"),
),
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(5),
request: Box::new(sample_turn_steer_request(
@@ -2208,7 +2307,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(6),
request: Box::new(sample_turn_steer_request(
@@ -2220,10 +2319,11 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)),
},
client_response_fact(
/*connection_id*/ 7,
/*request_id*/ 6,
sample_turn_steer_response("turn-2"),
),
&mut out,
)
.await;

View File

@@ -22,11 +22,13 @@ use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
@@ -49,18 +51,34 @@ pub(crate) struct AnalyticsEventsQueue {
#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
queue: Option<AnalyticsEventsQueue>,
}
#[derive(Debug, Clone, Copy)]
pub enum AuthManagerRetention {
Strong,
Weak,
}
impl AnalyticsEventsQueue {
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
pub(crate) fn new(
auth_manager: Arc<AuthManager>,
base_url: String,
retention: AuthManagerRetention,
) -> Self {
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
let auth_manager = match retention {
AuthManagerRetention::Strong => AuthManagerHandle::Strong(auth_manager),
AuthManagerRetention::Weak => AuthManagerHandle::Weak(Arc::downgrade(&auth_manager)),
};
tokio::spawn(async move {
let mut reducer = AnalyticsReducer::default();
while let Some(input) = receiver.recv().await {
let mut events = Vec::new();
reducer.ingest(input, &mut events).await;
let Some(auth_manager) = auth_manager.get() else {
break;
};
send_track_events(&auth_manager, &base_url, events).await;
}
});
@@ -112,18 +130,42 @@ impl AnalyticsEventsQueue {
}
}
enum AuthManagerHandle {
Strong(Arc<AuthManager>),
Weak(std::sync::Weak<AuthManager>),
}
impl AuthManagerHandle {
fn get(&self) -> Option<Arc<AuthManager>> {
match self {
Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)),
Self::Weak(auth_manager) => auth_manager.upgrade(),
}
}
}
impl AnalyticsEventsClient {
pub fn new(
auth_manager: Arc<AuthManager>,
base_url: String,
analytics_enabled: Option<bool>,
auth_manager_retention: AuthManagerRetention,
) -> Self {
Self {
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
analytics_enabled,
queue: (analytics_enabled != Some(false)).then(|| {
AnalyticsEventsQueue::new(
Arc::clone(&auth_manager),
base_url,
auth_manager_retention,
)
}),
}
}
pub fn disabled() -> Self {
Self { queue: None }
}
pub fn track_skill_invocations(
&self,
tracking: TrackEventsContext,
@@ -181,16 +223,30 @@ impl AnalyticsEventsClient {
)));
}
pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
self.record_fact(AnalyticsFact::Request {
pub fn track_request(
&self,
connection_id: u64,
request_id: RequestId,
request: &ClientRequest,
) {
if !matches!(
request,
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
) {
return;
}
self.record_fact(AnalyticsFact::ClientRequest {
connection_id,
request_id,
request: Box::new(request),
request: Box::new(request.clone()),
});
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
if !self.queue.should_enqueue_app_used(&tracking, &app) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
@@ -205,7 +261,10 @@ impl AnalyticsEventsClient {
}
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
@@ -268,15 +327,30 @@ impl AnalyticsEventsClient {
}
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
if self.analytics_enabled == Some(false) {
return;
if let Some(queue) = self.queue.as_ref() {
queue.try_send(input);
}
self.queue.try_send(input);
}
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
self.record_fact(AnalyticsFact::Response {
pub fn track_response(
&self,
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
) {
if !matches!(
response,
ClientResponsePayload::ThreadStart(_)
| ClientResponsePayload::ThreadResume(_)
| ClientResponsePayload::ThreadFork(_)
| ClientResponsePayload::TurnStart(_)
| ClientResponsePayload::TurnSteer(_)
) {
return;
}
self.record_fact(AnalyticsFact::ClientResponse {
connection_id,
request_id,
response: Box::new(response),
});
}
@@ -299,6 +373,20 @@ impl AnalyticsEventsClient {
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
request: Box::new(request),
});
}
pub fn track_server_response(&self, connection_id: u64, response: ServerResponse) {
self.record_fact(AnalyticsFact::ServerResponse {
connection_id,
response: Box::new(response),
});
}
}
async fn send_track_events(
@@ -341,3 +429,7 @@ async fn send_track_events(
}
}
}
#[cfg(test)]
#[path = "client_tests.rs"]
mod tests;

View File

@@ -0,0 +1,92 @@
use super::AnalyticsEventsClient;
use super::AnalyticsEventsQueue;
use crate::facts::AnalyticsFact;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
fn client_with_receiver() -> (AnalyticsEventsClient, mpsc::Receiver<AnalyticsFact>) {
let (sender, receiver) = mpsc::channel(4);
let queue = AnalyticsEventsQueue {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
};
(AnalyticsEventsClient { queue: Some(queue) }, receiver)
}
fn sample_turn_steer_request() -> ClientRequest {
ClientRequest::TurnSteer {
request_id: RequestId::Integer(1),
params: TurnSteerParams {
thread_id: "thread-1".to_string(),
expected_turn_id: "turn-1".to_string(),
input: Vec::new(),
responsesapi_client_metadata: None,
},
}
}
fn sample_thread_archive_request() -> ClientRequest {
ClientRequest::ThreadArchive {
request_id: RequestId::Integer(2),
params: ThreadArchiveParams {
thread_id: "thread-1".to_string(),
},
}
}
#[test]
fn track_request_only_enqueues_analytics_relevant_requests() {
let (client, mut receiver) = client_with_receiver();
client.track_request(
/*connection_id*/ 7,
RequestId::Integer(1),
&sample_turn_steer_request(),
);
assert!(matches!(
receiver.try_recv(),
Ok(AnalyticsFact::ClientRequest { .. })
));
client.track_request(
/*connection_id*/ 7,
RequestId::Integer(2),
&sample_thread_archive_request(),
);
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn track_response_only_enqueues_analytics_relevant_responses() {
let (client, mut receiver) = client_with_receiver();
client.track_response(
/*connection_id*/ 7,
RequestId::Integer(1),
ClientResponsePayload::TurnSteer(TurnSteerResponse {
turn_id: "turn-1".to_string(),
}),
);
assert!(matches!(
receiver.try_recv(),
Ok(AnalyticsFact::ClientResponse { .. })
));
client.track_response(
/*connection_id*/ 7,
RequestId::Integer(2),
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
);
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
}

View File

@@ -2,11 +2,13 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexRuntimeMetadata;
use crate::events::GuardianReviewEventParams;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
@@ -272,14 +274,15 @@ pub(crate) enum AnalyticsFact {
runtime: CodexRuntimeMetadata,
rpc_transport: AppServerRpcTransport,
},
Request {
ClientRequest {
connection_id: u64,
request_id: RequestId,
request: Box<ClientRequest>,
},
Response {
ClientResponse {
connection_id: u64,
response: Box<ClientResponse>,
request_id: RequestId,
response: Box<ClientResponsePayload>,
},
ErrorResponse {
connection_id: u64,
@@ -287,6 +290,14 @@ pub(crate) enum AnalyticsFact {
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
},
ServerRequest {
connection_id: u64,
request: Box<ServerRequest>,
},
ServerResponse {
connection_id: u64,
response: Box<ServerResponse>,
},
Notification(Box<ServerNotification>),
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.

View File

@@ -7,6 +7,7 @@ use std::time::SystemTime;
use std::time::UNIX_EPOCH;
pub use client::AnalyticsEventsClient;
pub use client::AuthManagerRetention;
pub use events::AppServerRpcTransport;
pub use events::GuardianApprovalRequestSource;
pub use events::GuardianReviewAnalyticsResult;

View File

@@ -171,18 +171,21 @@ impl AnalyticsReducer {
rpc_transport,
);
}
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id,
request_id,
request,
} => {
self.ingest_request(connection_id, request_id, *request);
}
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id,
request_id,
response,
} => {
self.ingest_response(connection_id, *response, out);
if let Some(response) = response.into_client_response(request_id) {
self.ingest_response(connection_id, response, out);
}
}
AnalyticsFact::ErrorResponse {
connection_id,
@@ -195,6 +198,14 @@ impl AnalyticsReducer {
AnalyticsFact::Notification(notification) => {
self.ingest_notification(*notification, out);
}
AnalyticsFact::ServerRequest {
connection_id: _connection_id,
request: _request,
} => {}
AnalyticsFact::ServerResponse {
connection_id: _connection_id,
response: _response,
} => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);

View File

@@ -158,6 +158,84 @@ macro_rules! client_request_definitions {
})
.unwrap_or_else(|| "<unknown>".to_string())
}
pub fn into_jsonrpc_parts(
self,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
match self {
$(
Self::$variant { request_id, response } => {
serde_json::to_value(response).map(|result| (request_id, result))
}
)*
}
}
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ClientResponsePayload {
$( $variant($response), )*
InterruptConversation(v1::InterruptConversationResponse),
}
impl ClientResponsePayload {
pub fn into_jsonrpc_parts_and_payload(
self,
request_id: RequestId,
) -> std::result::Result<
(RequestId, crate::Result, Option<ClientResponsePayload>),
serde_json::Error,
> {
match self {
$(
Self::$variant(response) => {
let result = serde_json::to_value(&response)?;
Ok((request_id, result, Some(Self::$variant(response))))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result, None))
}
}
}
pub fn into_client_response(self, request_id: RequestId) -> Option<ClientResponse> {
match self {
$(
Self::$variant(response) => {
Some(ClientResponse::$variant {
request_id,
response,
})
}
)*
Self::InterruptConversation(_) => None,
}
}
pub fn into_jsonrpc_parts(
self,
request_id: RequestId,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
self.to_jsonrpc_parts(request_id)
}
pub fn to_jsonrpc_parts(
&self,
request_id: RequestId,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
match self {
$(
Self::$variant(response) => {
serde_json::to_value(response).map(|result| (request_id, result))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result))
}
}
}
}
impl crate::experimental_api::ExperimentalApi for ClientRequest {
@@ -701,6 +779,24 @@ macro_rules! server_request_definitions {
$(Self::$variant { request_id, .. } => request_id,)*
}
}
pub fn response_from_result(
&self,
result: &crate::Result,
) -> serde_json::Result<ServerResponse> {
match self {
$(
Self::$variant { request_id, .. } => {
let response =
<$response as serde::Deserialize>::deserialize(result)?;
Ok(ServerResponse::$variant {
request_id: request_id.clone(),
response,
})
}
)*
}
}
}
/// Typed response from the client to the server.
@@ -2197,3 +2293,7 @@ mod tests {
);
}
}
#[cfg(test)]
#[path = "common_tests.rs"]
mod common_tests;

View File

@@ -0,0 +1,44 @@
use super::*;
use anyhow::Result;
use codex_protocol::protocol::TurnAbortReason;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn client_response_payload_returns_jsonrpc_parts_and_client_response() -> Result<()> {
let (request_id, result, payload) =
ClientResponsePayload::ThreadArchive(v2::ThreadArchiveResponse {})
.into_jsonrpc_parts_and_payload(RequestId::Integer(7))?;
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(result, json!({}));
let Some(ClientResponse::ThreadArchive {
request_id,
response: _,
}) = payload.and_then(|payload| payload.into_client_response(RequestId::Integer(7)))
else {
panic!("expected thread/archive client response");
};
assert_eq!(request_id, RequestId::Integer(7));
Ok(())
}
#[test]
fn interrupt_conversation_payload_stays_jsonrpc_only() -> Result<()> {
let (request_id, result, payload) =
ClientResponsePayload::InterruptConversation(v1::InterruptConversationResponse {
abort_reason: TurnAbortReason::Interrupted,
})
.into_jsonrpc_parts_and_payload(RequestId::Integer(8))?;
assert_eq!(request_id, RequestId::Integer(8));
assert_eq!(
result,
json!({
"abortReason": "interrupted",
})
);
assert!(payload.is_none());
Ok(())
}

View File

@@ -0,0 +1,18 @@
use std::sync::Arc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AuthManagerRetention;
use codex_core::config::Config;
use codex_login::AuthManager;
pub(crate) fn analytics_events_client_from_config(
auth_manager: Arc<AuthManager>,
config: &Config,
) -> AnalyticsEventsClient {
AnalyticsEventsClient::new(
auth_manager,
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Weak,
)
}

View File

@@ -1938,7 +1938,12 @@ pub(crate) async fn apply_bespoke_event_handling(
}
};
outgoing.send_response(request_id, response).await;
outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadRollback(response),
)
.await;
}
}
EventMsg::ThreadNameUpdated(thread_name_event) => {
@@ -2354,10 +2359,24 @@ async fn respond_to_pending_interrupts(
continue;
};
let response = InterruptConversationResponse { abort_reason };
outgoing.send_response(rid, response).await;
outgoing
.send_response(
rid,
codex_app_server_protocol::ClientResponsePayload::InterruptConversation(
response,
),
)
.await;
}
ApiVersion::V2 => {
outgoing.send_response(rid, TurnInterruptResponse {}).await;
outgoing
.send_response(
rid,
codex_app_server_protocol::ClientResponsePayload::TurnInterrupt(
TurnInterruptResponse {},
),
)
.await;
}
}
}
@@ -3110,6 +3129,7 @@ mod tests {
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use codex_analytics::AuthManagerRetention;
use codex_app_server_protocol::AutoReviewDecisionSource;
use codex_app_server_protocol::GuardianApprovalReviewStatus;
use codex_app_server_protocol::JSONRPCErrorError;
@@ -3420,7 +3440,10 @@ mod tests {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3489,7 +3512,10 @@ mod tests {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3579,7 +3605,10 @@ mod tests {
let thread_state = new_thread_state();
let thread_watch_manager = ThreadWatchManager::new();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3598,6 +3627,7 @@ mod tests {
),
"http://localhost".to_string(),
Some(false),
AuthManagerRetention::Strong,
),
codex_home: codex_home.path().to_path_buf(),
};
@@ -4205,7 +4235,10 @@ mod tests {
let conversation_id = ThreadId::new();
let event_turn_id = "complete1".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4271,7 +4304,10 @@ mod tests {
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4319,7 +4355,10 @@ mod tests {
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4361,7 +4400,10 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4415,7 +4457,10 @@ mod tests {
let conversation_id = ThreadId::new();
let turn_id = "turn-123".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4504,7 +4549,10 @@ mod tests {
let conversation_id = ThreadId::new();
let turn_id = "turn-456".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4577,7 +4625,10 @@ mod tests {
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4839,7 +4890,10 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4877,7 +4931,10 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4903,7 +4960,10 @@ mod tests {
#[tokio::test]
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let conversation_id = ThreadId::new();
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,

View File

@@ -42,7 +42,7 @@ use codex_app_server_protocol::CancelLoginAccountParams;
use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginAccountStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
@@ -1375,7 +1375,9 @@ impl CodexMessageProcessor {
.await
.map(|()| LoginAccountResponse::ApiKey {});
let logged_in = result.is_ok();
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
.await;
if logged_in {
self.send_login_success_notifications(/*login_id*/ None)
@@ -1443,7 +1445,9 @@ impl CodexMessageProcessor {
async fn login_chatgpt_v2(&self, request_id: ConnectionRequestId) {
let result = self.login_chatgpt_response().await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
.await;
}
async fn login_chatgpt_response(&self) -> Result<LoginAccountResponse, JSONRPCErrorError> {
@@ -1512,7 +1516,9 @@ impl CodexMessageProcessor {
async fn login_chatgpt_device_code_v2(&self, request_id: ConnectionRequestId) {
let result = self.login_chatgpt_device_code_response().await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
.await;
}
async fn login_chatgpt_device_code_response(
@@ -1602,7 +1608,13 @@ impl CodexMessageProcessor {
params: CancelLoginAccountParams,
) {
let result = self.cancel_login_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::CancelLoginAccount,
)
.await;
}
async fn cancel_login_response(
@@ -1630,7 +1642,9 @@ impl CodexMessageProcessor {
.login_chatgpt_auth_tokens_response(access_token, chatgpt_account_id, chatgpt_plan_type)
.await;
let logged_in = result.is_ok();
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
.await;
if logged_in {
self.send_login_success_notifications(/*login_id*/ None)
@@ -1784,7 +1798,11 @@ impl CodexMessageProcessor {
plan_type: None,
});
self.outgoing
.send_result(request_id, result.map(|_| LogoutAccountResponse {}))
.send_result(
request_id,
result.map(|_| LogoutAccountResponse {}),
ClientResponsePayload::LogoutAccount,
)
.await;
if let Some(payload) = account_updated {
@@ -1869,12 +1887,19 @@ impl CodexMessageProcessor {
}
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::GetAuthStatus(response),
)
.await;
}
async fn get_account(&self, request_id: ConnectionRequestId, params: GetAccountParams) {
let result = self.get_account_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::GetAccount)
.await;
}
async fn get_account_response(
@@ -1920,7 +1945,13 @@ impl CodexMessageProcessor {
),
},
);
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::GetAccountRateLimits,
)
.await;
}
async fn send_add_credits_nudge_email(
@@ -1932,7 +1963,13 @@ impl CodexMessageProcessor {
.send_add_credits_nudge_email_inner(params)
.await
.map(|status| SendAddCreditsNudgeEmailResponse { status });
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::SendAddCreditsNudgeEmail,
)
.await;
}
async fn send_add_credits_nudge_email_inner(
@@ -2063,7 +2100,7 @@ impl CodexMessageProcessor {
let result = self
.exec_one_off_command_inner(request_id.clone(), params)
.await
.map(|()| None::<serde_json::Value>);
.map(|()| None);
self.send_optional_result(request_id, result).await;
}
@@ -2311,7 +2348,9 @@ impl CodexMessageProcessor {
.command_exec_manager
.write(request_id.clone(), params)
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::CommandExecWrite)
.await;
}
async fn command_exec_resize(
@@ -2323,7 +2362,9 @@ impl CodexMessageProcessor {
.command_exec_manager
.resize(request_id.clone(), params)
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::CommandExecResize)
.await;
}
async fn command_exec_terminate(
@@ -2335,7 +2376,13 @@ impl CodexMessageProcessor {
.command_exec_manager
.terminate(request_id.clone(), params)
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::CommandExecTerminate,
)
.await;
}
async fn thread_start(
@@ -2738,19 +2785,12 @@ impl CodexMessageProcessor {
match result {
Ok((response, notif)) => {
listener_task_context
.analytics_events_client
.track_response(
request_id.connection_id.0,
ClientResponse::ThreadStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
listener_task_context
.outgoing
.send_response(request_id, response)
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadStart(response),
)
.instrument(tracing::info_span!(
"app_server.thread_start.send_response",
otel.name = "app_server.thread_start.send_response",
@@ -2817,7 +2857,11 @@ impl CodexMessageProcessor {
.ok()
.map(|(_, thread_ids)| thread_ids.clone());
self.outgoing
.send_result(request_id, result.map(|(response, _)| response))
.send_result(
request_id,
result.map(|(response, _)| response),
ClientResponsePayload::ThreadArchive,
)
.await;
if let Some(archived_thread_ids) = archived_thread_ids {
@@ -2959,7 +3003,13 @@ impl CodexMessageProcessor {
})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadIncrementElicitation,
)
.await;
}
async fn thread_decrement_elicitation(
@@ -2984,7 +3034,13 @@ impl CodexMessageProcessor {
})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadDecrementElicitation,
)
.await;
}
async fn thread_set_name(&self, request_id: ConnectionRequestId, params: ThreadSetNameParams) {
@@ -2994,7 +3050,11 @@ impl CodexMessageProcessor {
.ok()
.and_then(|(_, notification)| notification.clone());
self.outgoing
.send_result(request_id, result.map(|(response, _)| response))
.send_result(
request_id,
result.map(|(response, _)| response),
ClientResponsePayload::ThreadSetName,
)
.await;
if let Some(notification) = notification {
@@ -3051,7 +3111,13 @@ impl CodexMessageProcessor {
params: ThreadMemoryModeSetParams,
) {
let result = self.thread_memory_mode_set_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadMemoryModeSet,
)
.await;
}
async fn thread_memory_mode_set_response(
@@ -3095,7 +3161,9 @@ impl CodexMessageProcessor {
async fn memory_reset(&self, request_id: ConnectionRequestId, _params: Option<()>) {
let result = self.memory_reset_response().await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::MemoryReset)
.await;
}
async fn memory_reset_response(&self) -> Result<MemoryResetResponse, JSONRPCErrorError> {
@@ -3130,7 +3198,13 @@ impl CodexMessageProcessor {
params: ThreadMetadataUpdateParams,
) {
let result = self.thread_metadata_update_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadMetadataUpdate,
)
.await;
}
async fn thread_metadata_update_response(
@@ -3363,7 +3437,11 @@ impl CodexMessageProcessor {
thread_id: thread_id.clone(),
});
self.outgoing
.send_result(request_id, result.map(|(response, _)| response))
.send_result(
request_id,
result.map(|(response, _)| response),
ClientResponsePayload::ThreadUnarchive,
)
.await;
if let Some(notification) = notification {
@@ -3411,7 +3489,7 @@ impl CodexMessageProcessor {
let result = self
.thread_rollback_start(&request_id, params)
.await
.map(|()| None::<serde_json::Value>);
.map(|()| None);
self.send_optional_result(request_id, result).await;
}
@@ -3482,7 +3560,13 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(ThreadCompactStartResponse {})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadCompactStart,
)
.await;
}
async fn thread_background_terminals_clean(
@@ -3502,7 +3586,13 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(ThreadBackgroundTerminalsCleanResponse {})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadBackgroundTerminalsClean,
)
.await;
}
async fn thread_shell_command(
@@ -3528,7 +3618,13 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(ThreadShellCommandResponse {})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadShellCommand,
)
.await;
}
async fn thread_approve_guardian_denied_action(
@@ -3552,12 +3648,20 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(ThreadApproveGuardianDeniedActionResponse {})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ThreadApproveGuardianDeniedAction,
)
.await;
}
async fn thread_list(&self, request_id: ConnectionRequestId, params: ThreadListParams) {
let result = self.thread_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::ThreadList)
.await;
}
async fn thread_list_response(
@@ -3651,7 +3755,9 @@ impl CodexMessageProcessor {
params: ThreadLoadedListParams,
) {
let result = self.thread_loaded_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::ThreadLoadedList)
.await;
}
async fn thread_loaded_list_response(
@@ -3703,7 +3809,9 @@ impl CodexMessageProcessor {
async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
let result = self.thread_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::ThreadRead)
.await;
}
async fn thread_read_response(
@@ -3881,7 +3989,9 @@ impl CodexMessageProcessor {
params: ThreadTurnsListParams,
) {
let result = self.thread_turns_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::ThreadTurnsList)
.await;
}
async fn thread_turns_list_response(
@@ -4260,17 +4370,15 @@ impl CodexMessageProcessor {
permission_profile,
reasoning_effort: session_configured.reasoning_effort,
};
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadResume {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
let connection_id = request_id.connection_id;
let token_usage_thread = include_turns.then(|| response.thread.clone());
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadResume(response),
)
.await;
// `excludeTurns` is explicitly the cheap resume path, so avoid
// rebuilding history only to attribute a replayed usage update.
if let Some(token_usage_thread) = token_usage_thread {
@@ -4879,17 +4987,14 @@ impl CodexMessageProcessor {
}
};
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadFork {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
let connection_id = request_id.connection_id;
let token_usage_thread = include_turns.then(|| response.thread.clone());
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadFork(response),
)
.await;
// `excludeTurns` is the cheap fork path, so skip restored usage replay
// instead of rebuilding history only to attribute a historical update.
if let Some(token_usage_thread) = token_usage_thread {
@@ -4927,7 +5032,13 @@ impl CodexMessageProcessor {
params: GetConversationSummaryParams,
) {
let result = self.get_thread_summary_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::GetConversationSummary,
)
.await;
}
async fn get_thread_summary_response(
@@ -5131,7 +5242,9 @@ impl CodexMessageProcessor {
})
}
.await;
outgoing.send_result(request_id, result).await;
outgoing
.send_result(request_id, result, ClientResponsePayload::ModelList)
.await;
}
async fn list_collaboration_modes(
@@ -5147,7 +5260,12 @@ impl CodexMessageProcessor {
.map(Into::into)
.collect();
let response = CollaborationModeListResponse { data: items };
outgoing.send_response(request_id, response).await;
outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::CollaborationModeList(response),
)
.await;
}
async fn experimental_feature_list(
@@ -5156,7 +5274,13 @@ impl CodexMessageProcessor {
params: ExperimentalFeatureListParams,
) {
let result = self.experimental_feature_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::ExperimentalFeatureList,
)
.await;
}
async fn experimental_feature_list_response(
@@ -5254,7 +5378,12 @@ impl CodexMessageProcessor {
) {
let MockExperimentalMethodParams { value } = params;
let response = MockExperimentalMethodResponse { echoed: value };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::MockExperimentalMethod(response),
)
.await;
}
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
@@ -5264,7 +5393,9 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(McpServerRefreshResponse {})
}
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::McpServerRefresh)
.await;
}
async fn queue_mcp_server_refresh_for_config(
@@ -5319,7 +5450,13 @@ impl CodexMessageProcessor {
params: McpServerOauthLoginParams,
) {
let result = self.mcp_server_oauth_login_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::McpServerOauthLogin,
)
.await;
}
async fn mcp_server_oauth_login_response(
@@ -5467,7 +5604,13 @@ impl CodexMessageProcessor {
runtime_environment,
)
.await;
outgoing.send_result(request_id, result).await;
outgoing
.send_result(
request_id,
result,
ClientResponsePayload::McpServerStatusList,
)
.await;
}
async fn list_mcp_server_status_response(
@@ -5638,7 +5781,9 @@ impl CodexMessageProcessor {
))
})
});
outgoing.send_result(request_id, result).await;
outgoing
.send_result(request_id, result, ClientResponsePayload::McpResourceRead)
.await;
}
async fn call_mcp_server_tool(
@@ -5663,17 +5808,17 @@ impl CodexMessageProcessor {
.await
.map(McpServerToolCallResponse::from)
.map_err(|error| internal_error(format!("{error:#}")));
outgoing.send_result(request_id, result).await;
outgoing
.send_result(request_id, result, ClientResponsePayload::McpServerToolCall)
.await;
});
}
async fn send_optional_result<T>(
async fn send_optional_result(
&self,
request_id: ConnectionRequestId,
result: Result<Option<T>, JSONRPCErrorError>,
) where
T: serde::Serialize,
{
result: Result<Option<ClientResponsePayload>, JSONRPCErrorError>,
) {
match result {
Ok(Some(response)) => self.outgoing.send_response(request_id, response).await,
Ok(None) => {}
@@ -5786,7 +5931,9 @@ impl CodexMessageProcessor {
let result = self
.thread_unsubscribe_response(params, request_id.connection_id)
.await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::ThreadUnsubscribe)
.await;
}
async fn thread_unsubscribe_response(
@@ -5871,10 +6018,10 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
AppsListResponse {
codex_app_server_protocol::ClientResponsePayload::AppsList(AppsListResponse {
data: Vec::new(),
next_cursor: None,
},
}),
)
.await;
return;
@@ -5887,10 +6034,10 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
AppsListResponse {
codex_app_server_protocol::ClientResponsePayload::AppsList(AppsListResponse {
data: Vec::new(),
next_cursor: None,
},
}),
)
.await;
return;
@@ -5912,7 +6059,9 @@ impl CodexMessageProcessor {
environment_manager: Arc<EnvironmentManager>,
) {
let result = Self::apps_list_response(&outgoing, params, config, environment_manager).await;
outgoing.send_result(request_id, result).await;
outgoing
.send_result(request_id, result, ClientResponsePayload::AppsList)
.await;
}
async fn apps_list_response(
@@ -6060,7 +6209,9 @@ impl CodexMessageProcessor {
async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) {
let result = self.skills_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::SkillsList)
.await;
}
async fn skills_list_response(
@@ -6207,7 +6358,9 @@ impl CodexMessageProcessor {
MarketplaceRemoveError::InvalidRequest(message) => invalid_request(message),
MarketplaceRemoveError::Internal(message) => internal_error(message),
});
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::MarketplaceRemove)
.await;
}
async fn marketplace_upgrade(
@@ -6216,7 +6369,13 @@ impl CodexMessageProcessor {
params: MarketplaceUpgradeParams,
) {
let result = self.marketplace_upgrade_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::MarketplaceUpgrade,
)
.await;
}
async fn marketplace_upgrade_response(
@@ -6268,7 +6427,9 @@ impl CodexMessageProcessor {
MarketplaceAddError::InvalidRequest(message) => invalid_request(message),
MarketplaceAddError::Internal(message) => internal_error(message),
});
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::MarketplaceAdd)
.await;
}
async fn skills_config_write(
@@ -6277,7 +6438,9 @@ impl CodexMessageProcessor {
params: SkillsConfigWriteParams,
) {
let result = self.skills_config_write_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::SkillsConfigWrite)
.await;
}
async fn skills_config_write_response(
@@ -6492,14 +6655,12 @@ impl CodexMessageProcessor {
match result {
Ok(response) => {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::TurnStart(response),
)
.await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -6513,7 +6674,9 @@ impl CodexMessageProcessor {
params: ThreadInjectItemsParams,
) {
let result = self.thread_inject_items_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::ThreadInjectItems)
.await;
}
async fn thread_inject_items_response(
@@ -6669,14 +6832,12 @@ impl CodexMessageProcessor {
match result {
Ok(response) => {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnSteer {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::TurnSteer(response),
)
.await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -6753,7 +6914,11 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStartResponse::default()))
}
.await;
self.send_optional_result(request_id, result).await;
self.send_optional_result(
request_id,
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeStart)),
)
.await;
}
async fn thread_realtime_append_audio(
@@ -6784,7 +6949,11 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendAudioResponse::default()))
}
.await;
self.send_optional_result(request_id, result).await;
self.send_optional_result(
request_id,
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeAppendAudio)),
)
.await;
}
async fn thread_realtime_append_text(
@@ -6813,7 +6982,11 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendTextResponse::default()))
}
.await;
self.send_optional_result(request_id, result).await;
self.send_optional_result(
request_id,
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeAppendText)),
)
.await;
}
async fn thread_realtime_stop(
@@ -6836,7 +7009,11 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStopResponse::default()))
}
.await;
self.send_optional_result(request_id, result).await;
self.send_optional_result(
request_id,
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeStop)),
)
.await;
}
async fn thread_realtime_list_voices(
@@ -6847,9 +7024,11 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ThreadRealtimeListVoicesResponse {
voices: RealtimeVoicesList::builtin(),
},
codex_app_server_protocol::ClientResponsePayload::ThreadRealtimeListVoices(
ThreadRealtimeListVoicesResponse {
voices: RealtimeVoicesList::builtin(),
},
),
)
.await;
}
@@ -6890,7 +7069,10 @@ impl CodexMessageProcessor {
review_thread_id,
};
self.outgoing
.send_response(request_id.clone(), response)
.send_response(
request_id.clone(),
codex_app_server_protocol::ClientResponsePayload::ReviewStart(response),
)
.await;
}
@@ -7078,7 +7260,11 @@ impl CodexMessageProcessor {
Ok::<_, JSONRPCErrorError>(None::<ReviewStartResponse>)
}
.await;
self.send_optional_result(request_id, result).await;
self.send_optional_result(
request_id,
result.map(|response| response.map(ClientResponsePayload::ReviewStart)),
)
.await;
}
async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) {
@@ -7147,7 +7333,11 @@ impl CodexMessageProcessor {
}
}
.await;
self.send_optional_result(request_id, result).await;
self.send_optional_result(
request_id,
result.map(|response| response.map(ClientResponsePayload::TurnInterrupt)),
)
.await;
}
async fn ensure_conversation_listener(
@@ -7469,7 +7659,9 @@ impl CodexMessageProcessor {
"failed to compute git diff to remote for cwd: {cwd:?}"
))
});
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::GitDiffToRemote)
.await;
}
async fn fuzzy_file_search(
@@ -7513,7 +7705,12 @@ impl CodexMessageProcessor {
}
let response = FuzzyFileSearchResponse { files: results };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::FuzzyFileSearch(response),
)
.await;
}
async fn fuzzy_file_search_session_start(
@@ -7522,7 +7719,13 @@ impl CodexMessageProcessor {
params: FuzzyFileSearchSessionStartParams,
) {
let result = self.fuzzy_file_search_session_start_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::FuzzyFileSearchSessionStart,
)
.await;
}
async fn fuzzy_file_search_session_start_response(
@@ -7552,7 +7755,13 @@ impl CodexMessageProcessor {
params: FuzzyFileSearchSessionUpdateParams,
) {
let result = self.fuzzy_file_search_session_update_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(
request_id,
result,
ClientResponsePayload::FuzzyFileSearchSessionUpdate,
)
.await;
}
async fn fuzzy_file_search_session_update_response(
@@ -7590,13 +7799,20 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::FuzzyFileSearchSessionStop(
FuzzyFileSearchSessionStopResponse {},
),
)
.await;
}
async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) {
let result = self.upload_feedback_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::FeedbackUpload)
.await;
}
async fn upload_feedback_response(
@@ -7768,7 +7984,9 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id.clone(),
WindowsSandboxSetupStartResponse { started: true },
codex_app_server_protocol::ClientResponsePayload::WindowsSandboxSetupStart(
WindowsSandboxSetupStartResponse { started: true },
),
)
.await;
@@ -8126,7 +8344,12 @@ async fn handle_pending_thread_resume_request(
reasoning_effort,
};
let token_usage_thread = pending.include_turns.then(|| response.thread.clone());
outgoing.send_response(request_id, response).await;
outgoing
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadResume(response),
)
.await;
// Match cold resume: metadata-only resume should attach the listener without
// paying the cost of turn reconstruction for historical usage replay.
if let Some(token_usage_thread) = token_usage_thread {
@@ -10469,7 +10692,10 @@ mod tests {
let connection_id = ConnectionId(7);
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
vec![connection_id],

View File

@@ -10,7 +10,9 @@ impl CodexMessageProcessor {
params: PluginListParams,
) {
let result = self.plugin_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::PluginList)
.await;
}
async fn plugin_list_response(
@@ -169,7 +171,9 @@ impl CodexMessageProcessor {
params: PluginReadParams,
) {
let result = self.plugin_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::PluginRead)
.await;
}
async fn plugin_read_response(
@@ -304,7 +308,9 @@ impl CodexMessageProcessor {
params: PluginInstallParams,
) {
let result = self.plugin_install_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::PluginInstall)
.await;
}
async fn plugin_install_response(
@@ -538,7 +544,9 @@ impl CodexMessageProcessor {
params: PluginUninstallParams,
) {
let result = self.plugin_uninstall_response(params).await;
self.outgoing.send_result(request_id, result).await;
self.outgoing
.send_result(request_id, result, ClientResponsePayload::PluginUninstall)
.await;
}
async fn plugin_uninstall_response(

View File

@@ -171,7 +171,9 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id.clone(),
ThreadGoalSetResponse { goal: goal.clone() },
codex_app_server_protocol::ClientResponsePayload::ThreadGoalSet(
ThreadGoalSetResponse { goal: goal.clone() },
),
)
.await;
self.emit_thread_goal_updated_ordered(thread_id, goal, listener_command_tx)
@@ -215,7 +217,12 @@ impl CodexMessageProcessor {
}
};
self.outgoing
.send_response(request_id, ThreadGoalGetResponse { goal })
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadGoalGet(
ThreadGoalGetResponse { goal },
),
)
.await;
}
@@ -315,7 +322,12 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, ThreadGoalClearResponse { cleared })
.send_response(
request_id,
codex_app_server_protocol::ClientResponsePayload::ThreadGoalClear(
ThreadGoalClearResponse { cleared },
),
)
.await;
if cleared {
self.emit_thread_goal_cleared_ordered(thread_id, listener_command_tx)

View File

@@ -6,6 +6,7 @@ use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CommandExecOutputDeltaNotification;
use codex_app_server_protocol::CommandExecOutputStream;
use codex_app_server_protocol::CommandExecResizeParams;
@@ -209,11 +210,11 @@ impl CommandExecManager {
outgoing
.send_response(
request_id,
CommandExecResponse {
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
exit_code: output.exit_code,
stdout: output.stdout.text,
stderr: output.stderr.text,
},
}),
)
.await;
}
@@ -554,11 +555,11 @@ async fn run_command(params: RunCommandParams) {
outgoing
.send_response(
request_id,
CommandExecResponse {
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
exit_code,
stdout,
stderr,
},
}),
)
.await;
}
@@ -726,7 +727,10 @@ mod tests {
let manager = CommandExecManager::default();
let err = manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)),
request_id: ConnectionRequestId {
connection_id: ConnectionId(1),
request_id: codex_app_server_protocol::RequestId::Integer(42),
@@ -762,7 +766,10 @@ mod tests {
manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)),
request_id: request_id.clone(),
process_id: Some("proc-99".to_string()),
exec_request: windows_sandbox_exec_request(),
@@ -809,7 +816,10 @@ mod tests {
manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)),
request_id: request_id.clone(),
process_id: Some("proc-100".to_string()),
exec_request: ExecRequest::new(

View File

@@ -467,6 +467,7 @@ mod tests {
use super::*;
use crate::config_manager::apply_runtime_feature_enablement;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AuthManagerRetention;
use codex_arg0::Arg0DispatchPaths;
use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
@@ -832,6 +833,7 @@ mod tests {
.trim_end_matches('/')
.to_string(),
analytics_config.analytics_enabled,
AuthManagerRetention::Strong,
),
);

View File

@@ -234,7 +234,10 @@ mod tests {
const OUTGOING_BUFFER: usize = 1;
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
FsWatchManager::new_with_file_watcher(
Arc::new(OutgoingMessageSender::new(tx)),
Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)),
Arc::new(FileWatcher::noop()),
)
}

View File

@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::analytics_events::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
@@ -365,7 +366,15 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
.await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
let outbound_initialized = Arc::new(AtomicBool::new(false));
@@ -390,9 +399,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
});
let processor_outgoing = Arc::clone(&outgoing_message_sender);
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
.await;
let config_manager = ConfigManager::new(
args.config.codex_home.to_path_buf(),
args.cli_overrides,
@@ -405,6 +411,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let mut processor_handle = tokio::spawn(async move {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
arg0_paths: args.arg0_paths,
config: args.config,
config_manager,
@@ -563,7 +570,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
}
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
outgoing_message_sender
.notify_client_response(request_id, result)
.notify_client_response(
IN_PROCESS_CONNECTION_ID,
request_id,
result,
)
.await;
}
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use crate::analytics_events::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
@@ -67,6 +68,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::Registry;
use tracing_subscriber::util::SubscriberInitExt;
mod analytics_events;
mod app_server_tracing;
mod bespoke_event_handling;
mod codex_message_processor;
@@ -709,12 +711,18 @@ pub async fn run_main_with_transport_options(
});
let processor_handle = tokio::spawn({
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outbound_control_tx = outbound_control_tx;
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
analytics_events_client,
arg0_paths,
config: Arc::new(config),
config_manager,
@@ -881,7 +889,7 @@ pub async fn run_main_with_transport_options(
warn!("dropping response from unknown connection: {connection_id:?}");
continue;
}
processor.process_response(response).await;
processor.process_response(connection_id, response).await;
}
JSONRPCMessage::Notification(notification) => {
if !connections.contains_key(&connection_id) {

View File

@@ -33,6 +33,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigValueWriteParams;
use codex_app_server_protocol::ConfigWarningNotification;
@@ -237,6 +238,7 @@ impl ConnectionSessionState {
pub(crate) struct MessageProcessorArgs {
pub(crate) outgoing: Arc<OutgoingMessageSender>,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) arg0_paths: Arg0DispatchPaths,
pub(crate) config: Arc<Config>,
pub(crate) config_manager: ConfigManager,
@@ -257,6 +259,7 @@ impl MessageProcessor {
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
let MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths,
config,
config_manager,
@@ -273,11 +276,6 @@ impl MessageProcessor {
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
}));
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
@@ -554,10 +552,16 @@ impl MessageProcessor {
}
/// Handle a standalone JSON-RPC response originating from the peer.
pub(crate) async fn process_response(&self, response: JSONRPCResponse) {
pub(crate) async fn process_response(
&self,
connection_id: ConnectionId,
response: JSONRPCResponse,
) {
tracing::info!("<- response: {:?}", response);
let JSONRPCResponse { id, result, .. } = response;
self.outgoing.notify_client_response(id, result).await
self.outgoing
.notify_client_response(connection_id, id, result)
.await
}
/// Handle an error object received from the peer.
@@ -673,7 +677,10 @@ impl MessageProcessor {
};
self.outgoing
.send_response(connection_request_id, response)
.send_response(
connection_request_id,
ClientResponsePayload::Initialize(response),
)
.await;
if let Some(outbound_initialized) = outbound_initialized {
@@ -714,15 +721,11 @@ impl MessageProcessor {
return Err(invalid_request(experimental_required_message(reason)));
}
let connection_id = connection_request_id.connection_id;
if let ClientRequest::TurnStart { request_id, .. }
| ClientRequest::TurnSteer { request_id, .. } = &codex_request
{
self.analytics_events_client.track_request(
connection_id.0,
request_id.clone(),
codex_request.clone(),
);
}
self.analytics_events_client.track_request(
connection_id.0,
connection_request_id.request_id.clone(),
&codex_request,
);
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
@@ -760,6 +763,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.config_api.read(params).await,
ClientResponsePayload::ConfigRead,
)
.await;
}
@@ -768,6 +772,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.external_agent_config_api.detect(params).await,
ClientResponsePayload::ExternalAgentConfigDetect,
)
.await;
}
@@ -801,6 +806,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.config_api.config_requirements_read().await,
ClientResponsePayload::ConfigRequirementsRead,
)
.await;
}
@@ -830,6 +836,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.read_file(params).await,
ClientResponsePayload::FsReadFile,
)
.await;
}
@@ -838,6 +845,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.write_file(params).await,
ClientResponsePayload::FsWriteFile,
)
.await;
}
@@ -846,6 +854,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.create_directory(params).await,
ClientResponsePayload::FsCreateDirectory,
)
.await;
}
@@ -854,6 +863,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.get_metadata(params).await,
ClientResponsePayload::FsGetMetadata,
)
.await;
}
@@ -862,6 +872,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.read_directory(params).await,
ClientResponsePayload::FsReadDirectory,
)
.await;
}
@@ -870,6 +881,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.remove(params).await,
ClientResponsePayload::FsRemove,
)
.await;
}
@@ -878,6 +890,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_api.copy(params).await,
ClientResponsePayload::FsCopy,
)
.await;
}
@@ -886,6 +899,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_watch_manager.watch(connection_id, params).await,
ClientResponsePayload::FsWatch,
)
.await;
}
@@ -894,6 +908,7 @@ impl MessageProcessor {
.send_result(
request_id_for_connection(request_id),
self.fs_watch_manager.unwatch(connection_id, params).await,
ClientResponsePayload::FsUnwatch,
)
.await;
}
@@ -922,7 +937,12 @@ impl MessageProcessor {
params: ConfigValueWriteParams,
) {
let result = self.config_api.write_value(params).await;
self.handle_config_mutation_result(request_id, result).await
self.handle_config_mutation_result(
request_id,
result,
ClientResponsePayload::ConfigValueWrite,
)
.await
}
async fn handle_config_batch_write(
@@ -931,7 +951,12 @@ impl MessageProcessor {
params: ConfigBatchWriteParams,
) {
let result = self.config_api.batch_write(params).await;
self.handle_config_mutation_result(request_id, result).await;
self.handle_config_mutation_result(
request_id,
result,
ClientResponsePayload::ConfigBatchWrite,
)
.await;
}
async fn handle_experimental_feature_enablement_set(
@@ -945,7 +970,12 @@ impl MessageProcessor {
.set_experimental_feature_enablement(params)
.await;
let is_ok = result.is_ok();
self.handle_config_mutation_result(request_id, result).await;
self.handle_config_mutation_result(
request_id,
result,
ClientResponsePayload::ExperimentalFeatureEnablementSet,
)
.await;
if should_refresh_apps_list && is_ok {
self.refresh_apps_list_after_experimental_feature_enablement_set()
.await;
@@ -1021,15 +1051,18 @@ impl MessageProcessor {
});
}
async fn handle_config_mutation_result<T: serde::Serialize>(
async fn handle_config_mutation_result<T>(
&self,
request_id: ConnectionRequestId,
result: std::result::Result<T, JSONRPCErrorError>,
wrap_success: impl FnOnce(T) -> ClientResponsePayload,
) {
match result {
Ok(response) => {
self.handle_config_mutation().await;
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, wrap_success(response))
.await;
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
@@ -1068,6 +1101,7 @@ impl MessageProcessor {
request_id,
"device/key/create",
device_key_requests_allowed,
ClientResponsePayload::DeviceKeyCreate,
move |device_key_api| async move { device_key_api.create(params).await },
);
}
@@ -1082,6 +1116,7 @@ impl MessageProcessor {
request_id,
"device/key/public",
device_key_requests_allowed,
ClientResponsePayload::DeviceKeyPublic,
move |device_key_api| async move { device_key_api.public(params).await },
);
}
@@ -1096,6 +1131,7 @@ impl MessageProcessor {
request_id,
"device/key/sign",
device_key_requests_allowed,
ClientResponsePayload::DeviceKeySign,
move |device_key_api| async move { device_key_api.sign(params).await },
);
}
@@ -1105,6 +1141,7 @@ impl MessageProcessor {
request_id: ConnectionRequestId,
method: &'static str,
device_key_requests_allowed: bool,
wrap_success: fn(R) -> ClientResponsePayload,
run_request: F,
) where
R: serde::Serialize + Send + 'static,
@@ -1123,7 +1160,7 @@ impl MessageProcessor {
run_request(device_key_api).await
}
.await;
outgoing.send_result(request_id, result).await;
outgoing.send_result(request_id, result, wrap_success).await;
});
}
@@ -1144,7 +1181,12 @@ impl MessageProcessor {
self.handle_config_mutation().await;
}
self.outgoing
.send_response(request_id, ExternalAgentConfigImportResponse {})
.send_response(
request_id,
ClientResponsePayload::ExternalAgentConfigImport(
ExternalAgentConfigImportResponse {},
),
)
.await;
if !has_plugin_imports {

View File

@@ -1,6 +1,7 @@
use super::ConnectionSessionState;
use super::MessageProcessor;
use super::MessageProcessorArgs;
use crate::analytics_events::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingMessageSender;
@@ -264,9 +265,14 @@ async fn build_test_processor(
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let auth_manager =
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let config_manager = ConfigManager::new(
config.codex_home.to_path_buf(),
Vec::new(),
@@ -277,6 +283,7 @@ async fn build_test_processor(
);
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths: Arg0DispatchPaths::default(),
config,
config_manager,

View File

@@ -4,6 +4,8 @@ use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
@@ -118,6 +120,7 @@ pub(crate) struct OutgoingMessageSender {
/// We keep them here because this is where responses, errors, and
/// disconnect cleanup all get handled.
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
analytics_events_client: AnalyticsEventsClient,
}
#[derive(Clone)]
@@ -131,6 +134,7 @@ struct PendingCallbackEntry {
callback: oneshot::Sender<ClientRequestResult>,
thread_id: Option<ThreadId>,
request: ServerRequest,
track_server_response: bool,
}
impl ThreadScopedOutgoingMessageSender {
@@ -186,10 +190,10 @@ impl ThreadScopedOutgoingMessageSender {
.await
}
pub(crate) async fn send_response<T: Serialize>(
pub(crate) async fn send_response(
&self,
request_id: ConnectionRequestId,
response: T,
response: ClientResponsePayload,
) {
self.outgoing.send_response(request_id, response).await;
}
@@ -204,12 +208,16 @@ impl ThreadScopedOutgoingMessageSender {
}
impl OutgoingMessageSender {
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
pub(crate) fn new(
sender: mpsc::Sender<OutgoingEnvelope>,
analytics_events_client: AnalyticsEventsClient,
) -> Self {
Self {
next_server_request_id: AtomicI64::new(0),
sender,
request_id_to_callback: Mutex::new(HashMap::new()),
request_contexts: Mutex::new(HashMap::new()),
analytics_events_client,
}
}
@@ -295,11 +303,12 @@ impl OutgoingMessageSender {
callback: tx_approve,
thread_id,
request: request.clone(),
track_server_response: connection_ids.is_some(),
},
);
}
let outgoing_message = OutgoingMessage::Request(request);
let outgoing_message = OutgoingMessage::Request(request.clone());
let send_result = match connection_ids {
None => {
self.sender
@@ -322,6 +331,9 @@ impl OutgoingMessageSender {
{
send_error = Some(err);
break;
} else {
self.analytics_events_client
.track_server_request(connection_id.0, request.clone());
}
}
match send_error {
@@ -350,21 +362,35 @@ impl OutgoingMessageSender {
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id,
message: OutgoingMessage::Request(request),
message: OutgoingMessage::Request(request.clone()),
write_complete_tx: None,
})
.await
{
warn!("failed to resend request to client: {err:?}");
} else {
self.analytics_events_client
.track_server_request(connection_id.0, request);
}
}
}
pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) {
pub(crate) async fn notify_client_response(
&self,
connection_id: ConnectionId,
id: RequestId,
result: Result,
) {
let entry = self.take_request_callback(&id).await;
match entry {
Some((id, entry)) => {
if entry.track_server_response
&& let Ok(response) = entry.request.response_from_result(&result)
{
self.analytics_events_client
.track_server_response(connection_id.0, response);
}
if let Err(err) = entry.callback.send(Ok(result)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
}
@@ -470,21 +496,33 @@ impl OutgoingMessageSender {
}
}
pub(crate) async fn send_response<T: Serialize>(
pub(crate) async fn send_response(
&self,
request_id: ConnectionRequestId,
response: T,
response: ClientResponsePayload,
) {
let connection_id = request_id.connection_id;
let request_id_for_analytics = request_id.request_id.clone();
let serialized_response = response
.into_jsonrpc_parts_and_payload(request_id.request_id.clone())
.map(|(id, result, response)| {
if let Some(response) = response {
self.analytics_events_client.track_response(
connection_id.0,
request_id_for_analytics,
response,
);
}
(id, result)
});
let request_context = self.take_request_context(&request_id).await;
match serde_json::to_value(response) {
Ok(result) => {
let outgoing_message = OutgoingMessage::Response(OutgoingResponse {
id: request_id.request_id.clone(),
result,
});
match serialized_response {
Ok((id, result)) => {
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
self.send_outgoing_message_to_connection(
request_context,
request_id.connection_id,
connection_id,
outgoing_message,
"response",
)
@@ -575,16 +613,19 @@ impl OutgoingMessageSender {
.await;
}
pub(crate) async fn send_result<T, E>(
pub(crate) async fn send_result<T, E, F>(
&self,
request_id: ConnectionRequestId,
result: std::result::Result<T, E>,
wrap_success: F,
) where
T: Serialize,
F: FnOnce(T) -> ClientResponsePayload,
E: Into<JSONRPCErrorError>,
{
match result {
Ok(response) => self.send_response(request_id, response).await,
Ok(response) => {
self.send_response(request_id, wrap_success(response)).await;
}
Err(error) => self.send_error(request_id, error).await,
}
}
@@ -660,31 +701,179 @@ pub(crate) struct OutgoingError {
mod tests {
use std::time::Duration;
use codex_analytics::AppServerRpcTransport;
use codex_analytics::AuthManagerRetention;
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApprovalsReviewer;
use codex_app_server_protocol::AskForApproval;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::GuardianWarningNotification;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ModelRerouteReason;
use codex_app_server_protocol::ModelReroutedNotification;
use codex_app_server_protocol::ModelVerification;
use codex_app_server_protocol::ModelVerificationNotification;
use codex_app_server_protocol::PermissionProfile;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_app_server_protocol::SandboxPolicy;
use codex_app_server_protocol::ServerResponse;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::ThreadId;
use codex_utils_absolute_path::test_support::PathBufExt;
use codex_utils_absolute_path::test_support::test_path_buf;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
use tokio::time::timeout;
use uuid::Uuid;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
fn sample_thread_start_response(thread_id: &str) -> ClientResponsePayload {
ClientResponsePayload::ThreadStart(ThreadStartResponse {
thread: Thread {
id: thread_id.to_string(),
forked_from_id: None,
preview: "first prompt".to_string(),
ephemeral: false,
model_provider: "openai".to_string(),
created_at: 1,
updated_at: 2,
status: ThreadStatus::Idle,
path: None,
cwd: test_path_buf("/tmp").abs(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Exec,
agent_nickname: None,
agent_role: None,
git_info: None,
name: None,
turns: Vec::new(),
},
model: "gpt-5".to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AskForApproval::OnFailure,
approvals_reviewer: ApprovalsReviewer::User,
sandbox: SandboxPolicy::DangerFullAccess,
permission_profile: Some(PermissionProfile::from(
codex_protocol::models::PermissionProfile::Disabled,
)),
reasoning_effort: None,
})
}
fn sample_turn_start_request(thread_id: &str) -> ClientRequest {
ClientRequest::TurnStart {
request_id: RequestId::Integer(2),
params: TurnStartParams {
thread_id: thread_id.to_string(),
input: Vec::new(),
..Default::default()
},
}
}
fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload {
ClientResponsePayload::TurnStart(TurnStartResponse {
turn: Turn {
id: turn_id.to_string(),
items: Vec::new(),
status: TurnStatus::InProgress,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
},
})
}
fn sample_turn_steer_request(thread_id: &str, turn_id: &str) -> ClientRequest {
ClientRequest::TurnSteer {
request_id: RequestId::Integer(3),
params: TurnSteerParams {
thread_id: thread_id.to_string(),
expected_turn_id: turn_id.to_string(),
input: vec![UserInput::Text {
text: "more".to_string(),
text_elements: Vec::new(),
}],
responsesapi_client_metadata: None,
},
}
}
fn sample_turn_steer_response(turn_id: &str) -> ClientResponsePayload {
ClientResponsePayload::TurnSteer(TurnSteerResponse {
turn_id: turn_id.to_string(),
})
}
async fn wait_for_analytics_event(server: &MockServer, event_type: &str) -> Value {
timeout(Duration::from_secs(1), async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
for request in &requests {
if request.method != "POST"
|| request.url.path() != "/codex/analytics-events/events"
{
continue;
}
let payload: Value =
serde_json::from_slice(&request.body).expect("valid analytics payload");
let Some(events) = payload["events"].as_array() else {
continue;
};
if let Some(event) = events
.iter()
.find(|event| event["event_type"] == event_type)
{
return event.clone();
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("analytics event before timeout")
}
#[test]
fn verify_server_notification_serialization() {
let notification =
@@ -876,6 +1065,47 @@ mod tests {
);
}
#[test]
fn server_request_response_from_result_decodes_typed_response() {
let request = ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(7),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
approval_id: None,
reason: None,
network_approval_context: None,
command: Some("echo hi".to_string()),
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
available_decisions: None,
},
};
let response = request
.response_from_result(&json!({
"decision": "acceptForSession",
}))
.expect("decode typed server response");
let ServerResponse::CommandExecutionRequestApproval {
request_id,
response,
} = response
else {
panic!("expected command execution approval response");
};
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(
response.decision,
CommandExecutionApprovalDecision::AcceptForSession
);
}
#[test]
fn verify_model_verification_notification_serialization() {
let notification = ServerNotification::ModelVerification(ModelVerificationNotification {
@@ -903,14 +1133,20 @@ mod tests {
#[tokio::test]
async fn send_response_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing =
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
};
outgoing
.send_response(request_id.clone(), json!({ "ok": true }))
.send_response(
request_id.clone(),
ClientResponsePayload::ThreadArchive(
codex_app_server_protocol::ThreadArchiveResponse {},
),
)
.await;
let envelope = timeout(Duration::from_secs(1), rx.recv())
@@ -929,7 +1165,7 @@ mod tests {
panic!("expected response message");
};
assert_eq!(response.id, request_id.request_id);
assert_eq!(response.result, json!({ "ok": true }));
assert_eq!(response.result, json!({}));
}
other => panic!("expected targeted response envelope, got: {other:?}"),
}
@@ -938,7 +1174,8 @@ mod tests {
#[tokio::test]
async fn send_response_clears_registered_request_context() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing =
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
@@ -954,16 +1191,103 @@ mod tests {
assert_eq!(outgoing.request_context_count().await, 1);
outgoing
.send_response(request_id, json!({ "ok": true }))
.send_response(
request_id,
ClientResponsePayload::ThreadArchive(
codex_app_server_protocol::ThreadArchiveResponse {},
),
)
.await;
assert_eq!(outgoing.request_context_count().await, 0);
}
#[tokio::test]
async fn send_response_tracks_accepted_turn_steer_analytics() {
let analytics_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/codex/analytics-events/events"))
.respond_with(ResponseTemplate::new(200))
.mount(&analytics_server)
.await;
let analytics_events_client = codex_analytics::AnalyticsEventsClient::new(
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
analytics_server.uri(),
/*analytics_enabled*/ Some(true),
AuthManagerRetention::Strong,
);
analytics_events_client.track_initialize(
/*connection_id*/ 7,
InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
},
"codex-tui".to_string(),
AppServerRpcTransport::Stdio,
);
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx, analytics_events_client.clone());
outgoing
.send_response(
ConnectionRequestId {
connection_id: ConnectionId(7),
request_id: RequestId::Integer(1),
},
sample_thread_start_response("thread-1"),
)
.await;
analytics_events_client.track_request(
/*connection_id*/ 7,
RequestId::Integer(2),
&sample_turn_start_request("thread-1"),
);
outgoing
.send_response(
ConnectionRequestId {
connection_id: ConnectionId(7),
request_id: RequestId::Integer(2),
},
sample_turn_start_response("turn-1"),
)
.await;
analytics_events_client.track_request(
/*connection_id*/ 7,
RequestId::Integer(3),
&sample_turn_steer_request("thread-1", "turn-1"),
);
outgoing
.send_response(
ConnectionRequestId {
connection_id: ConnectionId(7),
request_id: RequestId::Integer(3),
},
sample_turn_steer_response("turn-1"),
)
.await;
let event = wait_for_analytics_event(&analytics_server, "codex_turn_steer_event").await;
assert_eq!(event["event_params"]["thread_id"], "thread-1");
assert_eq!(event["event_params"]["expected_turn_id"], "turn-1");
assert_eq!(event["event_params"]["accepted_turn_id"], "turn-1");
assert_eq!(event["event_params"]["result"], "accepted");
}
#[tokio::test]
async fn send_error_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing =
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
let request_id = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
@@ -1001,7 +1325,8 @@ mod tests {
#[tokio::test]
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing =
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
let send_task = tokio::spawn(async move {
outgoing
.send_server_notification_to_connection_and_wait(
@@ -1045,7 +1370,8 @@ mod tests {
#[tokio::test]
async fn connection_closed_clears_registered_request_contexts() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing =
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
let closed_connection_request = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
@@ -1079,7 +1405,8 @@ mod tests {
#[tokio::test]
async fn notify_client_error_forwards_error_to_waiter() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing =
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
let (request_id, wait_for_result) = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(
@@ -1113,7 +1440,10 @@ mod tests {
#[tokio::test]
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
@@ -1171,7 +1501,10 @@ mod tests {
#[tokio::test]
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),

View File

@@ -722,6 +722,7 @@ mod tests {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)));
manager
@@ -764,6 +765,7 @@ mod tests {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)));
manager

View File

@@ -46,6 +46,7 @@ use async_channel::Sender;
use chrono::Local;
use chrono::Utc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AuthManagerRetention;
use codex_analytics::SubAgentThreadStartedInput;
use codex_app_server_protocol::McpServerElicitationRequest;
use codex_app_server_protocol::McpServerElicitationRequestParams;

View File

@@ -775,6 +775,7 @@ impl Session {
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Strong,
)
});
let services = SessionServices {

View File

@@ -3418,6 +3418,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Strong,
),
hooks: Hooks::new(HooksConfig {
legacy_notify_argv: config.notify.clone(),
@@ -4812,6 +4813,7 @@ where
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
AuthManagerRetention::Strong,
),
hooks: Hooks::new(HooksConfig {
legacy_notify_argv: config.notify.clone(),