mirror of
https://github.com/openai/codex.git
synced 2026-05-11 23:02:39 +00:00
Compare commits
25 Commits
jif/sqlite
...
rhan/analy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
122a4f62e1 | ||
|
|
42cb76b669 | ||
|
|
a283b3e1d1 | ||
|
|
3405165f46 | ||
|
|
0caa739982 | ||
|
|
114566ae9d | ||
|
|
b01cb5fb03 | ||
|
|
47d32d5cd9 | ||
|
|
40bceeb2ce | ||
|
|
54591dac4e | ||
|
|
a12c04ce37 | ||
|
|
3be8cc539f | ||
|
|
15e71997f3 | ||
|
|
fe7e8e7a0c | ||
|
|
1970393a6b | ||
|
|
d3c7bf2609 | ||
|
|
98f89b7ea8 | ||
|
|
82c12ae6bf | ||
|
|
30c33d883e | ||
|
|
c092aa338c | ||
|
|
34733b8723 | ||
|
|
5c30a31222 | ||
|
|
a89b81baaf | ||
|
|
ad5754b23d | ||
|
|
840f2711df |
@@ -1,4 +1,6 @@
|
||||
use crate::client::AnalyticsEventsClient;
|
||||
use crate::client::AnalyticsEventsQueue;
|
||||
use crate::client::AuthManagerRetention;
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
@@ -59,7 +61,7 @@ use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
|
||||
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
@@ -71,6 +73,8 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
@@ -83,6 +87,8 @@ use codex_app_server_protocol::TurnStatus as AppServerTurnStatus;
|
||||
use codex_app_server_protocol::TurnSteerParams;
|
||||
use codex_app_server_protocol::TurnSteerResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::default_client::DEFAULT_ORIGINATOR;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_plugin::AppConnectorId;
|
||||
@@ -141,23 +147,24 @@ fn sample_thread_with_source(
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_start_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
|
||||
ClientResponse::ThreadStart {
|
||||
request_id: RequestId::Integer(1),
|
||||
response: ThreadStartResponse {
|
||||
thread: sample_thread(thread_id, ephemeral),
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
instruction_sources: Vec::new(),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
permission_profile: Some(sample_permission_profile()),
|
||||
reasoning_effort: None,
|
||||
},
|
||||
}
|
||||
fn sample_thread_start_response(
|
||||
thread_id: &str,
|
||||
ephemeral: bool,
|
||||
model: &str,
|
||||
) -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadStart(ThreadStartResponse {
|
||||
thread: sample_thread(thread_id, ephemeral),
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
instruction_sources: Vec::new(),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
permission_profile: Some(sample_permission_profile()),
|
||||
reasoning_effort: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_permission_profile() -> AppServerPermissionProfile {
|
||||
@@ -183,7 +190,11 @@ fn sample_runtime_metadata() -> CodexRuntimeMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
|
||||
fn sample_thread_resume_response(
|
||||
thread_id: &str,
|
||||
ephemeral: bool,
|
||||
model: &str,
|
||||
) -> ClientResponsePayload {
|
||||
sample_thread_resume_response_with_source(
|
||||
thread_id,
|
||||
ephemeral,
|
||||
@@ -197,23 +208,20 @@ fn sample_thread_resume_response_with_source(
|
||||
ephemeral: bool,
|
||||
model: &str,
|
||||
source: AppServerSessionSource,
|
||||
) -> ClientResponse {
|
||||
ClientResponse::ThreadResume {
|
||||
request_id: RequestId::Integer(2),
|
||||
response: ThreadResumeResponse {
|
||||
thread: sample_thread_with_source(thread_id, ephemeral, source),
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
instruction_sources: Vec::new(),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
permission_profile: Some(sample_permission_profile()),
|
||||
reasoning_effort: None,
|
||||
},
|
||||
}
|
||||
) -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadResume(ThreadResumeResponse {
|
||||
thread: sample_thread_with_source(thread_id, ephemeral, source),
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
instruction_sources: Vec::new(),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
permission_profile: Some(sample_permission_profile()),
|
||||
reasoning_effort: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
|
||||
@@ -235,21 +243,18 @@ fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_start_response(turn_id: &str, request_id: i64) -> ClientResponse {
|
||||
ClientResponse::TurnStart {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
response: codex_app_server_protocol::TurnStartResponse {
|
||||
turn: Turn {
|
||||
id: turn_id.to_string(),
|
||||
items: vec![],
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
},
|
||||
fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload {
|
||||
ClientResponsePayload::TurnStart(codex_app_server_protocol::TurnStartResponse {
|
||||
turn: Turn {
|
||||
id: turn_id.to_string(),
|
||||
items: vec![],
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_turn_started_notification(thread_id: &str, turn_id: &str) -> ServerNotification {
|
||||
@@ -355,12 +360,21 @@ fn sample_turn_steer_request(
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_steer_response(turn_id: &str, request_id: i64) -> ClientResponse {
|
||||
ClientResponse::TurnSteer {
|
||||
fn sample_turn_steer_response(turn_id: &str) -> ClientResponsePayload {
|
||||
ClientResponsePayload::TurnSteer(TurnSteerResponse {
|
||||
turn_id: turn_id.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn client_response_fact(
|
||||
connection_id: u64,
|
||||
request_id: i64,
|
||||
response: ClientResponsePayload,
|
||||
) -> AnalyticsFact {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
request_id: RequestId::Integer(request_id),
|
||||
response: TurnSteerResponse {
|
||||
turn_id: turn_id.to_string(),
|
||||
},
|
||||
response: Box::new(response),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -426,7 +440,7 @@ async fn ingest_rejected_turn_steer(
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -486,12 +500,11 @@ async fn ingest_turn_prerequisites(
|
||||
ingest_initialize(reducer, out).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-2", /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 1,
|
||||
sample_thread_start_response("thread-2", /*ephemeral*/ false, "gpt-5"),
|
||||
),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
@@ -500,7 +513,7 @@ async fn ingest_turn_prerequisites(
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
|
||||
@@ -510,10 +523,11 @@ async fn ingest_turn_prerequisites(
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 3,
|
||||
sample_turn_start_response("turn-2"),
|
||||
),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
@@ -793,6 +807,37 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() {
|
||||
assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_client_retains_auth_manager() {
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
|
||||
let weak_auth_manager = Arc::downgrade(&auth_manager);
|
||||
|
||||
let _client = AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
"http://localhost".to_string(),
|
||||
/*analytics_enabled*/ None,
|
||||
AuthManagerRetention::Strong,
|
||||
);
|
||||
|
||||
assert!(weak_auth_manager.upgrade().is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn non_owning_client_does_not_retain_auth_manager() {
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
|
||||
let weak_auth_manager = Arc::downgrade(&auth_manager);
|
||||
|
||||
let _client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
"http://localhost".to_string(),
|
||||
/*analytics_enabled*/ None,
|
||||
AuthManagerRetention::Weak,
|
||||
);
|
||||
drop(auth_manager);
|
||||
|
||||
assert!(weak_auth_manager.upgrade().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_initialized_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::ThreadInitialized(ThreadInitializedEvent {
|
||||
@@ -862,14 +907,11 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-no-client",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 1,
|
||||
sample_thread_start_response("thread-no-client", /*ephemeral*/ false, "gpt-5"),
|
||||
),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
@@ -906,12 +948,11 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response(
|
||||
"thread-1", /*ephemeral*/ true, "gpt-5",
|
||||
)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 2,
|
||||
sample_thread_resume_response("thread-1", /*ephemeral*/ true, "gpt-5"),
|
||||
),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
@@ -954,6 +995,63 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_client_requests_are_ignored_by_reducer() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(ClientRequest::ThreadArchive {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: ThreadArchiveParams {
|
||||
thread_id: "thread-2".to_string(),
|
||||
},
|
||||
}),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 3,
|
||||
sample_turn_start_response("turn-2"),
|
||||
),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
events.is_empty(),
|
||||
"unrelated requests must not create pending turn state"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_client_responses_are_ignored_by_reducer() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
ingest_initialize(&mut reducer, &mut events).await;
|
||||
reducer
|
||||
.ingest(
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 9,
|
||||
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
|
||||
),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(events.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compaction_event_ingests_custom_fact() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
@@ -986,9 +1084,10 @@ async fn compaction_event_ingests_custom_fact() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 2,
|
||||
sample_thread_resume_response_with_source(
|
||||
"thread-1",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
@@ -999,8 +1098,8 @@ async fn compaction_event_ingests_custom_fact() {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)),
|
||||
},
|
||||
),
|
||||
),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
@@ -1097,14 +1196,11 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-guardian",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 1,
|
||||
sample_thread_start_response("thread-guardian", /*ephemeral*/ false, "gpt-5"),
|
||||
),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
@@ -1867,7 +1963,7 @@ async fn accepted_turn_steer_emits_expected_event() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -1879,10 +1975,11 @@ async fn accepted_turn_steer_emits_expected_event() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 4,
|
||||
sample_turn_steer_response("turn-2"),
|
||||
),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
@@ -2021,7 +2118,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
|
||||
ingest_initialize(&mut reducer, &mut out).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
|
||||
@@ -2045,10 +2142,11 @@ async fn turn_start_error_response_discards_pending_start_request() {
|
||||
// failed turn/start request and attach request-scoped connection metadata.
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 3,
|
||||
sample_turn_start_response("turn-2"),
|
||||
),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
@@ -2162,7 +2260,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -2174,17 +2272,18 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 4,
|
||||
sample_turn_steer_response("turn-2"),
|
||||
),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(5),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -2208,7 +2307,7 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(6),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
@@ -2220,10 +2319,11 @@ async fn accepted_steers_increment_turn_steer_count() {
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)),
|
||||
},
|
||||
client_response_fact(
|
||||
/*connection_id*/ 7,
|
||||
/*request_id*/ 6,
|
||||
sample_turn_steer_response("turn-2"),
|
||||
),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -22,11 +22,13 @@ use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
@@ -49,18 +51,34 @@ pub(crate) struct AnalyticsEventsQueue {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AnalyticsEventsClient {
|
||||
queue: AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
queue: Option<AnalyticsEventsQueue>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum AuthManagerRetention {
|
||||
Strong,
|
||||
Weak,
|
||||
}
|
||||
|
||||
impl AnalyticsEventsQueue {
|
||||
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
|
||||
pub(crate) fn new(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
base_url: String,
|
||||
retention: AuthManagerRetention,
|
||||
) -> Self {
|
||||
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
|
||||
let auth_manager = match retention {
|
||||
AuthManagerRetention::Strong => AuthManagerHandle::Strong(auth_manager),
|
||||
AuthManagerRetention::Weak => AuthManagerHandle::Weak(Arc::downgrade(&auth_manager)),
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
while let Some(input) = receiver.recv().await {
|
||||
let mut events = Vec::new();
|
||||
reducer.ingest(input, &mut events).await;
|
||||
let Some(auth_manager) = auth_manager.get() else {
|
||||
break;
|
||||
};
|
||||
send_track_events(&auth_manager, &base_url, events).await;
|
||||
}
|
||||
});
|
||||
@@ -112,18 +130,42 @@ impl AnalyticsEventsQueue {
|
||||
}
|
||||
}
|
||||
|
||||
enum AuthManagerHandle {
|
||||
Strong(Arc<AuthManager>),
|
||||
Weak(std::sync::Weak<AuthManager>),
|
||||
}
|
||||
|
||||
impl AuthManagerHandle {
|
||||
fn get(&self) -> Option<Arc<AuthManager>> {
|
||||
match self {
|
||||
Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)),
|
||||
Self::Weak(auth_manager) => auth_manager.upgrade(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnalyticsEventsClient {
|
||||
pub fn new(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
base_url: String,
|
||||
analytics_enabled: Option<bool>,
|
||||
auth_manager_retention: AuthManagerRetention,
|
||||
) -> Self {
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
|
||||
analytics_enabled,
|
||||
queue: (analytics_enabled != Some(false)).then(|| {
|
||||
AnalyticsEventsQueue::new(
|
||||
Arc::clone(&auth_manager),
|
||||
base_url,
|
||||
auth_manager_retention,
|
||||
)
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disabled() -> Self {
|
||||
Self { queue: None }
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
@@ -181,16 +223,30 @@ impl AnalyticsEventsClient {
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
|
||||
self.record_fact(AnalyticsFact::Request {
|
||||
pub fn track_request(
|
||||
&self,
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: &ClientRequest,
|
||||
) {
|
||||
if !matches!(
|
||||
request,
|
||||
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
|
||||
) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::ClientRequest {
|
||||
connection_id,
|
||||
request_id,
|
||||
request: Box::new(request),
|
||||
request: Box::new(request.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
let Some(queue) = self.queue.as_ref() else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
@@ -205,7 +261,10 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
||||
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
let Some(queue) = self.queue.as_ref() else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
@@ -268,15 +327,30 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
|
||||
if self.analytics_enabled == Some(false) {
|
||||
return;
|
||||
if let Some(queue) = self.queue.as_ref() {
|
||||
queue.try_send(input);
|
||||
}
|
||||
self.queue.try_send(input);
|
||||
}
|
||||
|
||||
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
|
||||
self.record_fact(AnalyticsFact::Response {
|
||||
pub fn track_response(
|
||||
&self,
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
response: ClientResponsePayload,
|
||||
) {
|
||||
if !matches!(
|
||||
response,
|
||||
ClientResponsePayload::ThreadStart(_)
|
||||
| ClientResponsePayload::ThreadResume(_)
|
||||
| ClientResponsePayload::ThreadFork(_)
|
||||
| ClientResponsePayload::TurnStart(_)
|
||||
| ClientResponsePayload::TurnSteer(_)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
request_id,
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
@@ -299,6 +373,20 @@ impl AnalyticsEventsClient {
|
||||
pub fn track_notification(&self, notification: ServerNotification) {
|
||||
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
|
||||
}
|
||||
|
||||
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
|
||||
self.record_fact(AnalyticsFact::ServerRequest {
|
||||
connection_id,
|
||||
request: Box::new(request),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_server_response(&self, connection_id: u64, response: ServerResponse) {
|
||||
self.record_fact(AnalyticsFact::ServerResponse {
|
||||
connection_id,
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
@@ -341,3 +429,7 @@ async fn send_track_events(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "client_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
92
codex-rs/analytics/src/client_tests.rs
Normal file
92
codex-rs/analytics/src/client_tests.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use super::AnalyticsEventsClient;
|
||||
use super::AnalyticsEventsQueue;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::TurnSteerParams;
|
||||
use codex_app_server_protocol::TurnSteerResponse;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
|
||||
fn client_with_receiver() -> (AnalyticsEventsClient, mpsc::Receiver<AnalyticsFact>) {
|
||||
let (sender, receiver) = mpsc::channel(4);
|
||||
let queue = AnalyticsEventsQueue {
|
||||
sender,
|
||||
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
};
|
||||
(AnalyticsEventsClient { queue: Some(queue) }, receiver)
|
||||
}
|
||||
|
||||
fn sample_turn_steer_request() -> ClientRequest {
|
||||
ClientRequest::TurnSteer {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: TurnSteerParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
expected_turn_id: "turn-1".to_string(),
|
||||
input: Vec::new(),
|
||||
responsesapi_client_metadata: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_archive_request() -> ClientRequest {
|
||||
ClientRequest::ThreadArchive {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: ThreadArchiveParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn track_request_only_enqueues_analytics_relevant_requests() {
|
||||
let (client, mut receiver) = client_with_receiver();
|
||||
|
||||
client.track_request(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(1),
|
||||
&sample_turn_steer_request(),
|
||||
);
|
||||
assert!(matches!(
|
||||
receiver.try_recv(),
|
||||
Ok(AnalyticsFact::ClientRequest { .. })
|
||||
));
|
||||
|
||||
client.track_request(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(2),
|
||||
&sample_thread_archive_request(),
|
||||
);
|
||||
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn track_response_only_enqueues_analytics_relevant_responses() {
|
||||
let (client, mut receiver) = client_with_receiver();
|
||||
|
||||
client.track_response(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(1),
|
||||
ClientResponsePayload::TurnSteer(TurnSteerResponse {
|
||||
turn_id: "turn-1".to_string(),
|
||||
}),
|
||||
);
|
||||
assert!(matches!(
|
||||
receiver.try_recv(),
|
||||
Ok(AnalyticsFact::ClientResponse { .. })
|
||||
));
|
||||
|
||||
client.track_response(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(2),
|
||||
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
|
||||
);
|
||||
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
@@ -2,11 +2,13 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::GuardianReviewEventParams;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::config_types::ApprovalsReviewer;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
@@ -272,14 +274,15 @@ pub(crate) enum AnalyticsFact {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
},
|
||||
Request {
|
||||
ClientRequest {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
Response {
|
||||
ClientResponse {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
request_id: RequestId,
|
||||
response: Box<ClientResponsePayload>,
|
||||
},
|
||||
ErrorResponse {
|
||||
connection_id: u64,
|
||||
@@ -287,6 +290,14 @@ pub(crate) enum AnalyticsFact {
|
||||
error: JSONRPCErrorError,
|
||||
error_type: Option<AnalyticsJsonRpcError>,
|
||||
},
|
||||
ServerRequest {
|
||||
connection_id: u64,
|
||||
request: Box<ServerRequest>,
|
||||
},
|
||||
ServerResponse {
|
||||
connection_id: u64,
|
||||
response: Box<ServerResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
pub use client::AnalyticsEventsClient;
|
||||
pub use client::AuthManagerRetention;
|
||||
pub use events::AppServerRpcTransport;
|
||||
pub use events::GuardianApprovalRequestSource;
|
||||
pub use events::GuardianReviewAnalyticsResult;
|
||||
|
||||
@@ -171,18 +171,21 @@ impl AnalyticsReducer {
|
||||
rpc_transport,
|
||||
);
|
||||
}
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id,
|
||||
request_id,
|
||||
request,
|
||||
} => {
|
||||
self.ingest_request(connection_id, request_id, *request);
|
||||
}
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
self.ingest_response(connection_id, *response, out);
|
||||
if let Some(response) = response.into_client_response(request_id) {
|
||||
self.ingest_response(connection_id, response, out);
|
||||
}
|
||||
}
|
||||
AnalyticsFact::ErrorResponse {
|
||||
connection_id,
|
||||
@@ -195,6 +198,14 @@ impl AnalyticsReducer {
|
||||
AnalyticsFact::Notification(notification) => {
|
||||
self.ingest_notification(*notification, out);
|
||||
}
|
||||
AnalyticsFact::ServerRequest {
|
||||
connection_id: _connection_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::ServerResponse {
|
||||
connection_id: _connection_id,
|
||||
response: _response,
|
||||
} => {}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
|
||||
@@ -158,6 +158,84 @@ macro_rules! client_request_definitions {
|
||||
})
|
||||
.unwrap_or_else(|| "<unknown>".to_string())
|
||||
}
|
||||
|
||||
pub fn into_jsonrpc_parts(
|
||||
self,
|
||||
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant { request_id, response } => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result))
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum ClientResponsePayload {
|
||||
$( $variant($response), )*
|
||||
InterruptConversation(v1::InterruptConversationResponse),
|
||||
}
|
||||
|
||||
impl ClientResponsePayload {
|
||||
pub fn into_jsonrpc_parts_and_payload(
|
||||
self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<
|
||||
(RequestId, crate::Result, Option<ClientResponsePayload>),
|
||||
serde_json::Error,
|
||||
> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
let result = serde_json::to_value(&response)?;
|
||||
Ok((request_id, result, Some(Self::$variant(response))))
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_client_response(self, request_id: RequestId) -> Option<ClientResponse> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
Some(ClientResponse::$variant {
|
||||
request_id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_jsonrpc_parts(
|
||||
self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
|
||||
self.to_jsonrpc_parts(request_id)
|
||||
}
|
||||
|
||||
pub fn to_jsonrpc_parts(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result))
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::experimental_api::ExperimentalApi for ClientRequest {
|
||||
@@ -701,6 +779,24 @@ macro_rules! server_request_definitions {
|
||||
$(Self::$variant { request_id, .. } => request_id,)*
|
||||
}
|
||||
}
|
||||
|
||||
pub fn response_from_result(
|
||||
&self,
|
||||
result: &crate::Result,
|
||||
) -> serde_json::Result<ServerResponse> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant { request_id, .. } => {
|
||||
let response =
|
||||
<$response as serde::Deserialize>::deserialize(result)?;
|
||||
Ok(ServerResponse::$variant {
|
||||
request_id: request_id.clone(),
|
||||
response,
|
||||
})
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Typed response from the client to the server.
|
||||
@@ -2197,3 +2293,7 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "common_tests.rs"]
|
||||
mod common_tests;
|
||||
|
||||
44
codex-rs/app-server-protocol/src/protocol/common_tests.rs
Normal file
44
codex-rs/app-server-protocol/src/protocol/common_tests.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use super::*;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn client_response_payload_returns_jsonrpc_parts_and_client_response() -> Result<()> {
|
||||
let (request_id, result, payload) =
|
||||
ClientResponsePayload::ThreadArchive(v2::ThreadArchiveResponse {})
|
||||
.into_jsonrpc_parts_and_payload(RequestId::Integer(7))?;
|
||||
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
assert_eq!(result, json!({}));
|
||||
|
||||
let Some(ClientResponse::ThreadArchive {
|
||||
request_id,
|
||||
response: _,
|
||||
}) = payload.and_then(|payload| payload.into_client_response(RequestId::Integer(7)))
|
||||
else {
|
||||
panic!("expected thread/archive client response");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interrupt_conversation_payload_stays_jsonrpc_only() -> Result<()> {
|
||||
let (request_id, result, payload) =
|
||||
ClientResponsePayload::InterruptConversation(v1::InterruptConversationResponse {
|
||||
abort_reason: TurnAbortReason::Interrupted,
|
||||
})
|
||||
.into_jsonrpc_parts_and_payload(RequestId::Integer(8))?;
|
||||
|
||||
assert_eq!(request_id, RequestId::Integer(8));
|
||||
assert_eq!(
|
||||
result,
|
||||
json!({
|
||||
"abortReason": "interrupted",
|
||||
})
|
||||
);
|
||||
assert!(payload.is_none());
|
||||
Ok(())
|
||||
}
|
||||
18
codex-rs/app-server/src/analytics_events.rs
Normal file
18
codex-rs/app-server/src/analytics_events.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::AuthManagerRetention;
|
||||
use codex_core::config::Config;
|
||||
use codex_login::AuthManager;
|
||||
|
||||
pub(crate) fn analytics_events_client_from_config(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: &Config,
|
||||
) -> AnalyticsEventsClient {
|
||||
AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
AuthManagerRetention::Weak,
|
||||
)
|
||||
}
|
||||
@@ -1938,7 +1938,12 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
}
|
||||
};
|
||||
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadRollback(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
EventMsg::ThreadNameUpdated(thread_name_event) => {
|
||||
@@ -2354,10 +2359,24 @@ async fn respond_to_pending_interrupts(
|
||||
continue;
|
||||
};
|
||||
let response = InterruptConversationResponse { abort_reason };
|
||||
outgoing.send_response(rid, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
rid,
|
||||
codex_app_server_protocol::ClientResponsePayload::InterruptConversation(
|
||||
response,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ApiVersion::V2 => {
|
||||
outgoing.send_response(rid, TurnInterruptResponse {}).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
rid,
|
||||
codex_app_server_protocol::ClientResponsePayload::TurnInterrupt(
|
||||
TurnInterruptResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3110,6 +3129,7 @@ mod tests {
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
use codex_analytics::AuthManagerRetention;
|
||||
use codex_app_server_protocol::AutoReviewDecisionSource;
|
||||
use codex_app_server_protocol::GuardianApprovalReviewStatus;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
@@ -3420,7 +3440,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3489,7 +3512,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3579,7 +3605,10 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
let thread_watch_manager = ThreadWatchManager::new();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3598,6 +3627,7 @@ mod tests {
|
||||
),
|
||||
"http://localhost".to_string(),
|
||||
Some(false),
|
||||
AuthManagerRetention::Strong,
|
||||
),
|
||||
codex_home: codex_home.path().to_path_buf(),
|
||||
};
|
||||
@@ -4205,7 +4235,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let event_turn_id = "complete1".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4271,7 +4304,10 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4319,7 +4355,10 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4361,7 +4400,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4415,7 +4457,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-123".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4504,7 +4549,10 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-456".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4577,7 +4625,10 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4839,7 +4890,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4877,7 +4931,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4903,7 +4960,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let conversation_id = ThreadId::new();
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
|
||||
@@ -42,7 +42,7 @@ use codex_app_server_protocol::CancelLoginAccountParams;
|
||||
use codex_app_server_protocol::CancelLoginAccountResponse;
|
||||
use codex_app_server_protocol::CancelLoginAccountStatus;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::CollaborationModeListParams;
|
||||
use codex_app_server_protocol::CollaborationModeListResponse;
|
||||
@@ -1375,7 +1375,9 @@ impl CodexMessageProcessor {
|
||||
.await
|
||||
.map(|()| LoginAccountResponse::ApiKey {});
|
||||
let logged_in = result.is_ok();
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
|
||||
.await;
|
||||
|
||||
if logged_in {
|
||||
self.send_login_success_notifications(/*login_id*/ None)
|
||||
@@ -1443,7 +1445,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn login_chatgpt_v2(&self, request_id: ConnectionRequestId) {
|
||||
let result = self.login_chatgpt_response().await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn login_chatgpt_response(&self) -> Result<LoginAccountResponse, JSONRPCErrorError> {
|
||||
@@ -1512,7 +1516,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn login_chatgpt_device_code_v2(&self, request_id: ConnectionRequestId) {
|
||||
let result = self.login_chatgpt_device_code_response().await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn login_chatgpt_device_code_response(
|
||||
@@ -1602,7 +1608,13 @@ impl CodexMessageProcessor {
|
||||
params: CancelLoginAccountParams,
|
||||
) {
|
||||
let result = self.cancel_login_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::CancelLoginAccount,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn cancel_login_response(
|
||||
@@ -1630,7 +1642,9 @@ impl CodexMessageProcessor {
|
||||
.login_chatgpt_auth_tokens_response(access_token, chatgpt_account_id, chatgpt_plan_type)
|
||||
.await;
|
||||
let logged_in = result.is_ok();
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::LoginAccount)
|
||||
.await;
|
||||
|
||||
if logged_in {
|
||||
self.send_login_success_notifications(/*login_id*/ None)
|
||||
@@ -1784,7 +1798,11 @@ impl CodexMessageProcessor {
|
||||
plan_type: None,
|
||||
});
|
||||
self.outgoing
|
||||
.send_result(request_id, result.map(|_| LogoutAccountResponse {}))
|
||||
.send_result(
|
||||
request_id,
|
||||
result.map(|_| LogoutAccountResponse {}),
|
||||
ClientResponsePayload::LogoutAccount,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(payload) = account_updated {
|
||||
@@ -1869,12 +1887,19 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::GetAuthStatus(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_account(&self, request_id: ConnectionRequestId, params: GetAccountParams) {
|
||||
let result = self.get_account_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::GetAccount)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_account_response(
|
||||
@@ -1920,7 +1945,13 @@ impl CodexMessageProcessor {
|
||||
),
|
||||
},
|
||||
);
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::GetAccountRateLimits,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_add_credits_nudge_email(
|
||||
@@ -1932,7 +1963,13 @@ impl CodexMessageProcessor {
|
||||
.send_add_credits_nudge_email_inner(params)
|
||||
.await
|
||||
.map(|status| SendAddCreditsNudgeEmailResponse { status });
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::SendAddCreditsNudgeEmail,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_add_credits_nudge_email_inner(
|
||||
@@ -2063,7 +2100,7 @@ impl CodexMessageProcessor {
|
||||
let result = self
|
||||
.exec_one_off_command_inner(request_id.clone(), params)
|
||||
.await
|
||||
.map(|()| None::<serde_json::Value>);
|
||||
.map(|()| None);
|
||||
self.send_optional_result(request_id, result).await;
|
||||
}
|
||||
|
||||
@@ -2311,7 +2348,9 @@ impl CodexMessageProcessor {
|
||||
.command_exec_manager
|
||||
.write(request_id.clone(), params)
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::CommandExecWrite)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn command_exec_resize(
|
||||
@@ -2323,7 +2362,9 @@ impl CodexMessageProcessor {
|
||||
.command_exec_manager
|
||||
.resize(request_id.clone(), params)
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::CommandExecResize)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn command_exec_terminate(
|
||||
@@ -2335,7 +2376,13 @@ impl CodexMessageProcessor {
|
||||
.command_exec_manager
|
||||
.terminate(request_id.clone(), params)
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::CommandExecTerminate,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_start(
|
||||
@@ -2738,19 +2785,12 @@ impl CodexMessageProcessor {
|
||||
|
||||
match result {
|
||||
Ok((response, notif)) => {
|
||||
listener_task_context
|
||||
.analytics_events_client
|
||||
.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadStart {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
listener_task_context
|
||||
.outgoing
|
||||
.send_response(request_id, response)
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadStart(response),
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.send_response",
|
||||
otel.name = "app_server.thread_start.send_response",
|
||||
@@ -2817,7 +2857,11 @@ impl CodexMessageProcessor {
|
||||
.ok()
|
||||
.map(|(_, thread_ids)| thread_ids.clone());
|
||||
self.outgoing
|
||||
.send_result(request_id, result.map(|(response, _)| response))
|
||||
.send_result(
|
||||
request_id,
|
||||
result.map(|(response, _)| response),
|
||||
ClientResponsePayload::ThreadArchive,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(archived_thread_ids) = archived_thread_ids {
|
||||
@@ -2959,7 +3003,13 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadIncrementElicitation,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_decrement_elicitation(
|
||||
@@ -2984,7 +3034,13 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadDecrementElicitation,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_set_name(&self, request_id: ConnectionRequestId, params: ThreadSetNameParams) {
|
||||
@@ -2994,7 +3050,11 @@ impl CodexMessageProcessor {
|
||||
.ok()
|
||||
.and_then(|(_, notification)| notification.clone());
|
||||
self.outgoing
|
||||
.send_result(request_id, result.map(|(response, _)| response))
|
||||
.send_result(
|
||||
request_id,
|
||||
result.map(|(response, _)| response),
|
||||
ClientResponsePayload::ThreadSetName,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(notification) = notification {
|
||||
@@ -3051,7 +3111,13 @@ impl CodexMessageProcessor {
|
||||
params: ThreadMemoryModeSetParams,
|
||||
) {
|
||||
let result = self.thread_memory_mode_set_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadMemoryModeSet,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_memory_mode_set_response(
|
||||
@@ -3095,7 +3161,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn memory_reset(&self, request_id: ConnectionRequestId, _params: Option<()>) {
|
||||
let result = self.memory_reset_response().await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::MemoryReset)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn memory_reset_response(&self) -> Result<MemoryResetResponse, JSONRPCErrorError> {
|
||||
@@ -3130,7 +3198,13 @@ impl CodexMessageProcessor {
|
||||
params: ThreadMetadataUpdateParams,
|
||||
) {
|
||||
let result = self.thread_metadata_update_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadMetadataUpdate,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_metadata_update_response(
|
||||
@@ -3363,7 +3437,11 @@ impl CodexMessageProcessor {
|
||||
thread_id: thread_id.clone(),
|
||||
});
|
||||
self.outgoing
|
||||
.send_result(request_id, result.map(|(response, _)| response))
|
||||
.send_result(
|
||||
request_id,
|
||||
result.map(|(response, _)| response),
|
||||
ClientResponsePayload::ThreadUnarchive,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(notification) = notification {
|
||||
@@ -3411,7 +3489,7 @@ impl CodexMessageProcessor {
|
||||
let result = self
|
||||
.thread_rollback_start(&request_id, params)
|
||||
.await
|
||||
.map(|()| None::<serde_json::Value>);
|
||||
.map(|()| None);
|
||||
self.send_optional_result(request_id, result).await;
|
||||
}
|
||||
|
||||
@@ -3482,7 +3560,13 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(ThreadCompactStartResponse {})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadCompactStart,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_background_terminals_clean(
|
||||
@@ -3502,7 +3586,13 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(ThreadBackgroundTerminalsCleanResponse {})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadBackgroundTerminalsClean,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_shell_command(
|
||||
@@ -3528,7 +3618,13 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(ThreadShellCommandResponse {})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadShellCommand,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_approve_guardian_denied_action(
|
||||
@@ -3552,12 +3648,20 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(ThreadApproveGuardianDeniedActionResponse {})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ThreadApproveGuardianDeniedAction,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_list(&self, request_id: ConnectionRequestId, params: ThreadListParams) {
|
||||
let result = self.thread_list_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ThreadList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_list_response(
|
||||
@@ -3651,7 +3755,9 @@ impl CodexMessageProcessor {
|
||||
params: ThreadLoadedListParams,
|
||||
) {
|
||||
let result = self.thread_loaded_list_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ThreadLoadedList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_loaded_list_response(
|
||||
@@ -3703,7 +3809,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
|
||||
let result = self.thread_read_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ThreadRead)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_read_response(
|
||||
@@ -3881,7 +3989,9 @@ impl CodexMessageProcessor {
|
||||
params: ThreadTurnsListParams,
|
||||
) {
|
||||
let result = self.thread_turns_list_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ThreadTurnsList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_turns_list_response(
|
||||
@@ -4260,17 +4370,15 @@ impl CodexMessageProcessor {
|
||||
permission_profile,
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
};
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadResume {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = include_turns.then(|| response.thread.clone());
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadResume(response),
|
||||
)
|
||||
.await;
|
||||
// `excludeTurns` is explicitly the cheap resume path, so avoid
|
||||
// rebuilding history only to attribute a replayed usage update.
|
||||
if let Some(token_usage_thread) = token_usage_thread {
|
||||
@@ -4879,17 +4987,14 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadFork {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = include_turns.then(|| response.thread.clone());
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadFork(response),
|
||||
)
|
||||
.await;
|
||||
// `excludeTurns` is the cheap fork path, so skip restored usage replay
|
||||
// instead of rebuilding history only to attribute a historical update.
|
||||
if let Some(token_usage_thread) = token_usage_thread {
|
||||
@@ -4927,7 +5032,13 @@ impl CodexMessageProcessor {
|
||||
params: GetConversationSummaryParams,
|
||||
) {
|
||||
let result = self.get_thread_summary_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::GetConversationSummary,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_thread_summary_response(
|
||||
@@ -5131,7 +5242,9 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
}
|
||||
.await;
|
||||
outgoing.send_result(request_id, result).await;
|
||||
outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ModelList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn list_collaboration_modes(
|
||||
@@ -5147,7 +5260,12 @@ impl CodexMessageProcessor {
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
let response = CollaborationModeListResponse { data: items };
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::CollaborationModeList(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn experimental_feature_list(
|
||||
@@ -5156,7 +5274,13 @@ impl CodexMessageProcessor {
|
||||
params: ExperimentalFeatureListParams,
|
||||
) {
|
||||
let result = self.experimental_feature_list_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ExperimentalFeatureList,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn experimental_feature_list_response(
|
||||
@@ -5254,7 +5378,12 @@ impl CodexMessageProcessor {
|
||||
) {
|
||||
let MockExperimentalMethodParams { value } = params;
|
||||
let response = MockExperimentalMethodResponse { echoed: value };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::MockExperimentalMethod(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
|
||||
@@ -5264,7 +5393,9 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(McpServerRefreshResponse {})
|
||||
}
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::McpServerRefresh)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn queue_mcp_server_refresh_for_config(
|
||||
@@ -5319,7 +5450,13 @@ impl CodexMessageProcessor {
|
||||
params: McpServerOauthLoginParams,
|
||||
) {
|
||||
let result = self.mcp_server_oauth_login_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::McpServerOauthLogin,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn mcp_server_oauth_login_response(
|
||||
@@ -5467,7 +5604,13 @@ impl CodexMessageProcessor {
|
||||
runtime_environment,
|
||||
)
|
||||
.await;
|
||||
outgoing.send_result(request_id, result).await;
|
||||
outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::McpServerStatusList,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn list_mcp_server_status_response(
|
||||
@@ -5638,7 +5781,9 @@ impl CodexMessageProcessor {
|
||||
))
|
||||
})
|
||||
});
|
||||
outgoing.send_result(request_id, result).await;
|
||||
outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::McpResourceRead)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn call_mcp_server_tool(
|
||||
@@ -5663,17 +5808,17 @@ impl CodexMessageProcessor {
|
||||
.await
|
||||
.map(McpServerToolCallResponse::from)
|
||||
.map_err(|error| internal_error(format!("{error:#}")));
|
||||
outgoing.send_result(request_id, result).await;
|
||||
outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::McpServerToolCall)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn send_optional_result<T>(
|
||||
async fn send_optional_result(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
result: Result<Option<T>, JSONRPCErrorError>,
|
||||
) where
|
||||
T: serde::Serialize,
|
||||
{
|
||||
result: Result<Option<ClientResponsePayload>, JSONRPCErrorError>,
|
||||
) {
|
||||
match result {
|
||||
Ok(Some(response)) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(None) => {}
|
||||
@@ -5786,7 +5931,9 @@ impl CodexMessageProcessor {
|
||||
let result = self
|
||||
.thread_unsubscribe_response(params, request_id.connection_id)
|
||||
.await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ThreadUnsubscribe)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_unsubscribe_response(
|
||||
@@ -5871,10 +6018,10 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
AppsListResponse {
|
||||
codex_app_server_protocol::ClientResponsePayload::AppsList(AppsListResponse {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -5887,10 +6034,10 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
AppsListResponse {
|
||||
codex_app_server_protocol::ClientResponsePayload::AppsList(AppsListResponse {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -5912,7 +6059,9 @@ impl CodexMessageProcessor {
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
) {
|
||||
let result = Self::apps_list_response(&outgoing, params, config, environment_manager).await;
|
||||
outgoing.send_result(request_id, result).await;
|
||||
outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::AppsList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn apps_list_response(
|
||||
@@ -6060,7 +6209,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) {
|
||||
let result = self.skills_list_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::SkillsList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn skills_list_response(
|
||||
@@ -6207,7 +6358,9 @@ impl CodexMessageProcessor {
|
||||
MarketplaceRemoveError::InvalidRequest(message) => invalid_request(message),
|
||||
MarketplaceRemoveError::Internal(message) => internal_error(message),
|
||||
});
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::MarketplaceRemove)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn marketplace_upgrade(
|
||||
@@ -6216,7 +6369,13 @@ impl CodexMessageProcessor {
|
||||
params: MarketplaceUpgradeParams,
|
||||
) {
|
||||
let result = self.marketplace_upgrade_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::MarketplaceUpgrade,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn marketplace_upgrade_response(
|
||||
@@ -6268,7 +6427,9 @@ impl CodexMessageProcessor {
|
||||
MarketplaceAddError::InvalidRequest(message) => invalid_request(message),
|
||||
MarketplaceAddError::Internal(message) => internal_error(message),
|
||||
});
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::MarketplaceAdd)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn skills_config_write(
|
||||
@@ -6277,7 +6438,9 @@ impl CodexMessageProcessor {
|
||||
params: SkillsConfigWriteParams,
|
||||
) {
|
||||
let result = self.skills_config_write_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::SkillsConfigWrite)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn skills_config_write_response(
|
||||
@@ -6492,14 +6655,12 @@ impl CodexMessageProcessor {
|
||||
|
||||
match result {
|
||||
Ok(response) => {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::TurnStart {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::TurnStart(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -6513,7 +6674,9 @@ impl CodexMessageProcessor {
|
||||
params: ThreadInjectItemsParams,
|
||||
) {
|
||||
let result = self.thread_inject_items_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::ThreadInjectItems)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_inject_items_response(
|
||||
@@ -6669,14 +6832,12 @@ impl CodexMessageProcessor {
|
||||
|
||||
match result {
|
||||
Ok(response) => {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::TurnSteer {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::TurnSteer(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -6753,7 +6914,11 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStartResponse::default()))
|
||||
}
|
||||
.await;
|
||||
self.send_optional_result(request_id, result).await;
|
||||
self.send_optional_result(
|
||||
request_id,
|
||||
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeStart)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_realtime_append_audio(
|
||||
@@ -6784,7 +6949,11 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendAudioResponse::default()))
|
||||
}
|
||||
.await;
|
||||
self.send_optional_result(request_id, result).await;
|
||||
self.send_optional_result(
|
||||
request_id,
|
||||
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeAppendAudio)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_realtime_append_text(
|
||||
@@ -6813,7 +6982,11 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendTextResponse::default()))
|
||||
}
|
||||
.await;
|
||||
self.send_optional_result(request_id, result).await;
|
||||
self.send_optional_result(
|
||||
request_id,
|
||||
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeAppendText)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_realtime_stop(
|
||||
@@ -6836,7 +7009,11 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStopResponse::default()))
|
||||
}
|
||||
.await;
|
||||
self.send_optional_result(request_id, result).await;
|
||||
self.send_optional_result(
|
||||
request_id,
|
||||
result.map(|response| response.map(ClientResponsePayload::ThreadRealtimeStop)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_realtime_list_voices(
|
||||
@@ -6847,9 +7024,11 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadRealtimeListVoicesResponse {
|
||||
voices: RealtimeVoicesList::builtin(),
|
||||
},
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadRealtimeListVoices(
|
||||
ThreadRealtimeListVoicesResponse {
|
||||
voices: RealtimeVoicesList::builtin(),
|
||||
},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -6890,7 +7069,10 @@ impl CodexMessageProcessor {
|
||||
review_thread_id,
|
||||
};
|
||||
self.outgoing
|
||||
.send_response(request_id.clone(), response)
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
codex_app_server_protocol::ClientResponsePayload::ReviewStart(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -7078,7 +7260,11 @@ impl CodexMessageProcessor {
|
||||
Ok::<_, JSONRPCErrorError>(None::<ReviewStartResponse>)
|
||||
}
|
||||
.await;
|
||||
self.send_optional_result(request_id, result).await;
|
||||
self.send_optional_result(
|
||||
request_id,
|
||||
result.map(|response| response.map(ClientResponsePayload::ReviewStart)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) {
|
||||
@@ -7147,7 +7333,11 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
.await;
|
||||
self.send_optional_result(request_id, result).await;
|
||||
self.send_optional_result(
|
||||
request_id,
|
||||
result.map(|response| response.map(ClientResponsePayload::TurnInterrupt)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn ensure_conversation_listener(
|
||||
@@ -7469,7 +7659,9 @@ impl CodexMessageProcessor {
|
||||
"failed to compute git diff to remote for cwd: {cwd:?}"
|
||||
))
|
||||
});
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::GitDiffToRemote)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search(
|
||||
@@ -7513,7 +7705,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
let response = FuzzyFileSearchResponse { files: results };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::FuzzyFileSearch(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_start(
|
||||
@@ -7522,7 +7719,13 @@ impl CodexMessageProcessor {
|
||||
params: FuzzyFileSearchSessionStartParams,
|
||||
) {
|
||||
let result = self.fuzzy_file_search_session_start_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::FuzzyFileSearchSessionStart,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_start_response(
|
||||
@@ -7552,7 +7755,13 @@ impl CodexMessageProcessor {
|
||||
params: FuzzyFileSearchSessionUpdateParams,
|
||||
) {
|
||||
let result = self.fuzzy_file_search_session_update_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::FuzzyFileSearchSessionUpdate,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_update_response(
|
||||
@@ -7590,13 +7799,20 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::FuzzyFileSearchSessionStop(
|
||||
FuzzyFileSearchSessionStopResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) {
|
||||
let result = self.upload_feedback_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::FeedbackUpload)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn upload_feedback_response(
|
||||
@@ -7768,7 +7984,9 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
WindowsSandboxSetupStartResponse { started: true },
|
||||
codex_app_server_protocol::ClientResponsePayload::WindowsSandboxSetupStart(
|
||||
WindowsSandboxSetupStartResponse { started: true },
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -8126,7 +8344,12 @@ async fn handle_pending_thread_resume_request(
|
||||
reasoning_effort,
|
||||
};
|
||||
let token_usage_thread = pending.include_turns.then(|| response.thread.clone());
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadResume(response),
|
||||
)
|
||||
.await;
|
||||
// Match cold resume: metadata-only resume should attach the listener without
|
||||
// paying the cost of turn reconstruction for historical usage replay.
|
||||
if let Some(token_usage_thread) = token_usage_thread {
|
||||
@@ -10469,7 +10692,10 @@ mod tests {
|
||||
let connection_id = ConnectionId(7);
|
||||
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
vec![connection_id],
|
||||
|
||||
@@ -10,7 +10,9 @@ impl CodexMessageProcessor {
|
||||
params: PluginListParams,
|
||||
) {
|
||||
let result = self.plugin_list_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::PluginList)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn plugin_list_response(
|
||||
@@ -169,7 +171,9 @@ impl CodexMessageProcessor {
|
||||
params: PluginReadParams,
|
||||
) {
|
||||
let result = self.plugin_read_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::PluginRead)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn plugin_read_response(
|
||||
@@ -304,7 +308,9 @@ impl CodexMessageProcessor {
|
||||
params: PluginInstallParams,
|
||||
) {
|
||||
let result = self.plugin_install_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::PluginInstall)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn plugin_install_response(
|
||||
@@ -538,7 +544,9 @@ impl CodexMessageProcessor {
|
||||
params: PluginUninstallParams,
|
||||
) {
|
||||
let result = self.plugin_uninstall_response(params).await;
|
||||
self.outgoing.send_result(request_id, result).await;
|
||||
self.outgoing
|
||||
.send_result(request_id, result, ClientResponsePayload::PluginUninstall)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn plugin_uninstall_response(
|
||||
|
||||
@@ -171,7 +171,9 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
ThreadGoalSetResponse { goal: goal.clone() },
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadGoalSet(
|
||||
ThreadGoalSetResponse { goal: goal.clone() },
|
||||
),
|
||||
)
|
||||
.await;
|
||||
self.emit_thread_goal_updated_ordered(thread_id, goal, listener_command_tx)
|
||||
@@ -215,7 +217,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadGoalGetResponse { goal })
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadGoalGet(
|
||||
ThreadGoalGetResponse { goal },
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -315,7 +322,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadGoalClearResponse { cleared })
|
||||
.send_response(
|
||||
request_id,
|
||||
codex_app_server_protocol::ClientResponsePayload::ThreadGoalClear(
|
||||
ThreadGoalClearResponse { cleared },
|
||||
),
|
||||
)
|
||||
.await;
|
||||
if cleared {
|
||||
self.emit_thread_goal_cleared_ordered(thread_id, listener_command_tx)
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::time::Duration;
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CommandExecOutputDeltaNotification;
|
||||
use codex_app_server_protocol::CommandExecOutputStream;
|
||||
use codex_app_server_protocol::CommandExecResizeParams;
|
||||
@@ -209,11 +210,11 @@ impl CommandExecManager {
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
CommandExecResponse {
|
||||
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
|
||||
exit_code: output.exit_code,
|
||||
stdout: output.stdout.text,
|
||||
stderr: output.stderr.text,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -554,11 +555,11 @@ async fn run_command(params: RunCommandParams) {
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
CommandExecResponse {
|
||||
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
|
||||
exit_code,
|
||||
stdout,
|
||||
stderr,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -726,7 +727,10 @@ mod tests {
|
||||
let manager = CommandExecManager::default();
|
||||
let err = manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
request_id: ConnectionRequestId {
|
||||
connection_id: ConnectionId(1),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(42),
|
||||
@@ -762,7 +766,10 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-99".to_string()),
|
||||
exec_request: windows_sandbox_exec_request(),
|
||||
@@ -809,7 +816,10 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-100".to_string()),
|
||||
exec_request: ExecRequest::new(
|
||||
|
||||
@@ -467,6 +467,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::config_manager::apply_runtime_feature_enablement;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::AuthManagerRetention;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::CloudRequirementsLoader;
|
||||
use codex_config::LoaderOverrides;
|
||||
@@ -832,6 +833,7 @@ mod tests {
|
||||
.trim_end_matches('/')
|
||||
.to_string(),
|
||||
analytics_config.analytics_enabled,
|
||||
AuthManagerRetention::Strong,
|
||||
),
|
||||
);
|
||||
|
||||
|
||||
@@ -234,7 +234,10 @@ mod tests {
|
||||
const OUTGOING_BUFFER: usize = 1;
|
||||
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
|
||||
FsWatchManager::new_with_file_watcher(
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)),
|
||||
Arc::new(FileWatcher::noop()),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
@@ -365,7 +366,15 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
|
||||
.await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
@@ -390,9 +399,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
|
||||
.await;
|
||||
let config_manager = ConfigManager::new(
|
||||
args.config.codex_home.to_path_buf(),
|
||||
args.cli_overrides,
|
||||
@@ -405,6 +411,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
analytics_events_client,
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
config_manager,
|
||||
@@ -563,7 +570,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
}
|
||||
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
|
||||
outgoing_message_sender
|
||||
.notify_client_response(request_id, result)
|
||||
.notify_client_response(
|
||||
IN_PROCESS_CONNECTION_ID,
|
||||
request_id,
|
||||
result,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
@@ -67,6 +68,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::Registry;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
mod analytics_events;
|
||||
mod app_server_tracing;
|
||||
mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
@@ -709,12 +711,18 @@ pub async fn run_main_with_transport_options(
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config: Arc::new(config),
|
||||
config_manager,
|
||||
@@ -881,7 +889,7 @@ pub async fn run_main_with_transport_options(
|
||||
warn!("dropping response from unknown connection: {connection_id:?}");
|
||||
continue;
|
||||
}
|
||||
processor.process_response(response).await;
|
||||
processor.process_response(connection_id, response).await;
|
||||
}
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
if !connections.contains_key(&connection_id) {
|
||||
|
||||
@@ -33,6 +33,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigValueWriteParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
@@ -237,6 +238,7 @@ impl ConnectionSessionState {
|
||||
|
||||
pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) arg0_paths: Arg0DispatchPaths,
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) config_manager: ConfigManager,
|
||||
@@ -257,6 +259,7 @@ impl MessageProcessor {
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config,
|
||||
config_manager,
|
||||
@@ -273,11 +276,6 @@ impl MessageProcessor {
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
}));
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
@@ -554,10 +552,16 @@ impl MessageProcessor {
|
||||
}
|
||||
|
||||
/// Handle a standalone JSON-RPC response originating from the peer.
|
||||
pub(crate) async fn process_response(&self, response: JSONRPCResponse) {
|
||||
pub(crate) async fn process_response(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
response: JSONRPCResponse,
|
||||
) {
|
||||
tracing::info!("<- response: {:?}", response);
|
||||
let JSONRPCResponse { id, result, .. } = response;
|
||||
self.outgoing.notify_client_response(id, result).await
|
||||
self.outgoing
|
||||
.notify_client_response(connection_id, id, result)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Handle an error object received from the peer.
|
||||
@@ -673,7 +677,10 @@ impl MessageProcessor {
|
||||
};
|
||||
|
||||
self.outgoing
|
||||
.send_response(connection_request_id, response)
|
||||
.send_response(
|
||||
connection_request_id,
|
||||
ClientResponsePayload::Initialize(response),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(outbound_initialized) = outbound_initialized {
|
||||
@@ -714,15 +721,11 @@ impl MessageProcessor {
|
||||
return Err(invalid_request(experimental_required_message(reason)));
|
||||
}
|
||||
let connection_id = connection_request_id.connection_id;
|
||||
if let ClientRequest::TurnStart { request_id, .. }
|
||||
| ClientRequest::TurnSteer { request_id, .. } = &codex_request
|
||||
{
|
||||
self.analytics_events_client.track_request(
|
||||
connection_id.0,
|
||||
request_id.clone(),
|
||||
codex_request.clone(),
|
||||
);
|
||||
}
|
||||
self.analytics_events_client.track_request(
|
||||
connection_id.0,
|
||||
connection_request_id.request_id.clone(),
|
||||
&codex_request,
|
||||
);
|
||||
|
||||
let app_server_client_name = session.app_server_client_name().map(str::to_string);
|
||||
let client_version = session.client_version().map(str::to_string);
|
||||
@@ -760,6 +763,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.config_api.read(params).await,
|
||||
ClientResponsePayload::ConfigRead,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -768,6 +772,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.external_agent_config_api.detect(params).await,
|
||||
ClientResponsePayload::ExternalAgentConfigDetect,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -801,6 +806,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.config_api.config_requirements_read().await,
|
||||
ClientResponsePayload::ConfigRequirementsRead,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -830,6 +836,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.read_file(params).await,
|
||||
ClientResponsePayload::FsReadFile,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -838,6 +845,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.write_file(params).await,
|
||||
ClientResponsePayload::FsWriteFile,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -846,6 +854,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.create_directory(params).await,
|
||||
ClientResponsePayload::FsCreateDirectory,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -854,6 +863,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.get_metadata(params).await,
|
||||
ClientResponsePayload::FsGetMetadata,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -862,6 +872,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.read_directory(params).await,
|
||||
ClientResponsePayload::FsReadDirectory,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -870,6 +881,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.remove(params).await,
|
||||
ClientResponsePayload::FsRemove,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -878,6 +890,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_api.copy(params).await,
|
||||
ClientResponsePayload::FsCopy,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -886,6 +899,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_watch_manager.watch(connection_id, params).await,
|
||||
ClientResponsePayload::FsWatch,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -894,6 +908,7 @@ impl MessageProcessor {
|
||||
.send_result(
|
||||
request_id_for_connection(request_id),
|
||||
self.fs_watch_manager.unwatch(connection_id, params).await,
|
||||
ClientResponsePayload::FsUnwatch,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -922,7 +937,12 @@ impl MessageProcessor {
|
||||
params: ConfigValueWriteParams,
|
||||
) {
|
||||
let result = self.config_api.write_value(params).await;
|
||||
self.handle_config_mutation_result(request_id, result).await
|
||||
self.handle_config_mutation_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ConfigValueWrite,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_config_batch_write(
|
||||
@@ -931,7 +951,12 @@ impl MessageProcessor {
|
||||
params: ConfigBatchWriteParams,
|
||||
) {
|
||||
let result = self.config_api.batch_write(params).await;
|
||||
self.handle_config_mutation_result(request_id, result).await;
|
||||
self.handle_config_mutation_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ConfigBatchWrite,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_experimental_feature_enablement_set(
|
||||
@@ -945,7 +970,12 @@ impl MessageProcessor {
|
||||
.set_experimental_feature_enablement(params)
|
||||
.await;
|
||||
let is_ok = result.is_ok();
|
||||
self.handle_config_mutation_result(request_id, result).await;
|
||||
self.handle_config_mutation_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ExperimentalFeatureEnablementSet,
|
||||
)
|
||||
.await;
|
||||
if should_refresh_apps_list && is_ok {
|
||||
self.refresh_apps_list_after_experimental_feature_enablement_set()
|
||||
.await;
|
||||
@@ -1021,15 +1051,18 @@ impl MessageProcessor {
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_config_mutation_result<T: serde::Serialize>(
|
||||
async fn handle_config_mutation_result<T>(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
result: std::result::Result<T, JSONRPCErrorError>,
|
||||
wrap_success: impl FnOnce(T) -> ClientResponsePayload,
|
||||
) {
|
||||
match result {
|
||||
Ok(response) => {
|
||||
self.handle_config_mutation().await;
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, wrap_success(response))
|
||||
.await;
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
@@ -1068,6 +1101,7 @@ impl MessageProcessor {
|
||||
request_id,
|
||||
"device/key/create",
|
||||
device_key_requests_allowed,
|
||||
ClientResponsePayload::DeviceKeyCreate,
|
||||
move |device_key_api| async move { device_key_api.create(params).await },
|
||||
);
|
||||
}
|
||||
@@ -1082,6 +1116,7 @@ impl MessageProcessor {
|
||||
request_id,
|
||||
"device/key/public",
|
||||
device_key_requests_allowed,
|
||||
ClientResponsePayload::DeviceKeyPublic,
|
||||
move |device_key_api| async move { device_key_api.public(params).await },
|
||||
);
|
||||
}
|
||||
@@ -1096,6 +1131,7 @@ impl MessageProcessor {
|
||||
request_id,
|
||||
"device/key/sign",
|
||||
device_key_requests_allowed,
|
||||
ClientResponsePayload::DeviceKeySign,
|
||||
move |device_key_api| async move { device_key_api.sign(params).await },
|
||||
);
|
||||
}
|
||||
@@ -1105,6 +1141,7 @@ impl MessageProcessor {
|
||||
request_id: ConnectionRequestId,
|
||||
method: &'static str,
|
||||
device_key_requests_allowed: bool,
|
||||
wrap_success: fn(R) -> ClientResponsePayload,
|
||||
run_request: F,
|
||||
) where
|
||||
R: serde::Serialize + Send + 'static,
|
||||
@@ -1123,7 +1160,7 @@ impl MessageProcessor {
|
||||
run_request(device_key_api).await
|
||||
}
|
||||
.await;
|
||||
outgoing.send_result(request_id, result).await;
|
||||
outgoing.send_result(request_id, result, wrap_success).await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1144,7 +1181,12 @@ impl MessageProcessor {
|
||||
self.handle_config_mutation().await;
|
||||
}
|
||||
self.outgoing
|
||||
.send_response(request_id, ExternalAgentConfigImportResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ExternalAgentConfigImport(
|
||||
ExternalAgentConfigImportResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
if !has_plugin_imports {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::ConnectionSessionState;
|
||||
use super::MessageProcessor;
|
||||
use super::MessageProcessorArgs;
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
@@ -264,9 +265,14 @@ async fn build_test_processor(
|
||||
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let config_manager = ConfigManager::new(
|
||||
config.codex_home.to_path_buf(),
|
||||
Vec::new(),
|
||||
@@ -277,6 +283,7 @@ async fn build_test_processor(
|
||||
);
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config,
|
||||
config_manager,
|
||||
|
||||
@@ -4,6 +4,8 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
@@ -118,6 +120,7 @@ pub(crate) struct OutgoingMessageSender {
|
||||
/// We keep them here because this is where responses, errors, and
|
||||
/// disconnect cleanup all get handled.
|
||||
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -131,6 +134,7 @@ struct PendingCallbackEntry {
|
||||
callback: oneshot::Sender<ClientRequestResult>,
|
||||
thread_id: Option<ThreadId>,
|
||||
request: ServerRequest,
|
||||
track_server_response: bool,
|
||||
}
|
||||
|
||||
impl ThreadScopedOutgoingMessageSender {
|
||||
@@ -186,10 +190,10 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response<T: Serialize>(
|
||||
pub(crate) async fn send_response(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
response: T,
|
||||
response: ClientResponsePayload,
|
||||
) {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
@@ -204,12 +208,16 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
}
|
||||
|
||||
impl OutgoingMessageSender {
|
||||
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
pub(crate) fn new(
|
||||
sender: mpsc::Sender<OutgoingEnvelope>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
) -> Self {
|
||||
Self {
|
||||
next_server_request_id: AtomicI64::new(0),
|
||||
sender,
|
||||
request_id_to_callback: Mutex::new(HashMap::new()),
|
||||
request_contexts: Mutex::new(HashMap::new()),
|
||||
analytics_events_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,11 +303,12 @@ impl OutgoingMessageSender {
|
||||
callback: tx_approve,
|
||||
thread_id,
|
||||
request: request.clone(),
|
||||
track_server_response: connection_ids.is_some(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let outgoing_message = OutgoingMessage::Request(request);
|
||||
let outgoing_message = OutgoingMessage::Request(request.clone());
|
||||
let send_result = match connection_ids {
|
||||
None => {
|
||||
self.sender
|
||||
@@ -322,6 +331,9 @@ impl OutgoingMessageSender {
|
||||
{
|
||||
send_error = Some(err);
|
||||
break;
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_server_request(connection_id.0, request.clone());
|
||||
}
|
||||
}
|
||||
match send_error {
|
||||
@@ -350,21 +362,35 @@ impl OutgoingMessageSender {
|
||||
.sender
|
||||
.send(OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message: OutgoingMessage::Request(request),
|
||||
message: OutgoingMessage::Request(request.clone()),
|
||||
write_complete_tx: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to resend request to client: {err:?}");
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_server_request(connection_id.0, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) {
|
||||
pub(crate) async fn notify_client_response(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
id: RequestId,
|
||||
result: Result,
|
||||
) {
|
||||
let entry = self.take_request_callback(&id).await;
|
||||
|
||||
match entry {
|
||||
Some((id, entry)) => {
|
||||
if entry.track_server_response
|
||||
&& let Ok(response) = entry.request.response_from_result(&result)
|
||||
{
|
||||
self.analytics_events_client
|
||||
.track_server_response(connection_id.0, response);
|
||||
}
|
||||
if let Err(err) = entry.callback.send(Ok(result)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
}
|
||||
@@ -470,21 +496,33 @@ impl OutgoingMessageSender {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response<T: Serialize>(
|
||||
pub(crate) async fn send_response(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
response: T,
|
||||
response: ClientResponsePayload,
|
||||
) {
|
||||
let connection_id = request_id.connection_id;
|
||||
let request_id_for_analytics = request_id.request_id.clone();
|
||||
let serialized_response = response
|
||||
.into_jsonrpc_parts_and_payload(request_id.request_id.clone())
|
||||
.map(|(id, result, response)| {
|
||||
if let Some(response) = response {
|
||||
self.analytics_events_client.track_response(
|
||||
connection_id.0,
|
||||
request_id_for_analytics,
|
||||
response,
|
||||
);
|
||||
}
|
||||
(id, result)
|
||||
});
|
||||
let request_context = self.take_request_context(&request_id).await;
|
||||
match serde_json::to_value(response) {
|
||||
Ok(result) => {
|
||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse {
|
||||
id: request_id.request_id.clone(),
|
||||
result,
|
||||
});
|
||||
|
||||
match serialized_response {
|
||||
Ok((id, result)) => {
|
||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
|
||||
self.send_outgoing_message_to_connection(
|
||||
request_context,
|
||||
request_id.connection_id,
|
||||
connection_id,
|
||||
outgoing_message,
|
||||
"response",
|
||||
)
|
||||
@@ -575,16 +613,19 @@ impl OutgoingMessageSender {
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_result<T, E>(
|
||||
pub(crate) async fn send_result<T, E, F>(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
result: std::result::Result<T, E>,
|
||||
wrap_success: F,
|
||||
) where
|
||||
T: Serialize,
|
||||
F: FnOnce(T) -> ClientResponsePayload,
|
||||
E: Into<JSONRPCErrorError>,
|
||||
{
|
||||
match result {
|
||||
Ok(response) => self.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.send_response(request_id, wrap_success(response)).await;
|
||||
}
|
||||
Err(error) => self.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -660,31 +701,179 @@ pub(crate) struct OutgoingError {
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_analytics::AppServerRpcTransport;
|
||||
use codex_analytics::AuthManagerRetention;
|
||||
use codex_app_server_protocol::AccountLoginCompletedNotification;
|
||||
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
|
||||
use codex_app_server_protocol::AccountUpdatedNotification;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::ApprovalsReviewer;
|
||||
use codex_app_server_protocol::AskForApproval;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalParams;
|
||||
use codex_app_server_protocol::GuardianWarningNotification;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::ModelRerouteReason;
|
||||
use codex_app_server_protocol::ModelReroutedNotification;
|
||||
use codex_app_server_protocol::ModelVerification;
|
||||
use codex_app_server_protocol::ModelVerificationNotification;
|
||||
use codex_app_server_protocol::PermissionProfile;
|
||||
use codex_app_server_protocol::RateLimitSnapshot;
|
||||
use codex_app_server_protocol::RateLimitWindow;
|
||||
use codex_app_server_protocol::SandboxPolicy;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ToolRequestUserInputParams;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::TurnSteerParams;
|
||||
use codex_app_server_protocol::TurnSteerResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_utils_absolute_path::test_support::PathBufExt;
|
||||
use codex_utils_absolute_path::test_support::test_path_buf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn sample_thread_start_response(thread_id: &str) -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadStart(ThreadStartResponse {
|
||||
thread: Thread {
|
||||
id: thread_id.to_string(),
|
||||
forked_from_id: None,
|
||||
preview: "first prompt".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "openai".to_string(),
|
||||
created_at: 1,
|
||||
updated_at: 2,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Exec,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: None,
|
||||
turns: Vec::new(),
|
||||
},
|
||||
model: "gpt-5".to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
instruction_sources: Vec::new(),
|
||||
approval_policy: AskForApproval::OnFailure,
|
||||
approvals_reviewer: ApprovalsReviewer::User,
|
||||
sandbox: SandboxPolicy::DangerFullAccess,
|
||||
permission_profile: Some(PermissionProfile::from(
|
||||
codex_protocol::models::PermissionProfile::Disabled,
|
||||
)),
|
||||
reasoning_effort: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_turn_start_request(thread_id: &str) -> ClientRequest {
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: TurnStartParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: Vec::new(),
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload {
|
||||
ClientResponsePayload::TurnStart(TurnStartResponse {
|
||||
turn: Turn {
|
||||
id: turn_id.to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::InProgress,
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_turn_steer_request(thread_id: &str, turn_id: &str) -> ClientRequest {
|
||||
ClientRequest::TurnSteer {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: TurnSteerParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
expected_turn_id: turn_id.to_string(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "more".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_steer_response(turn_id: &str) -> ClientResponsePayload {
|
||||
ClientResponsePayload::TurnSteer(TurnSteerResponse {
|
||||
turn_id: turn_id.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_analytics_event(server: &MockServer, event_type: &str) -> Value {
|
||||
timeout(Duration::from_secs(1), async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
continue;
|
||||
};
|
||||
for request in &requests {
|
||||
if request.method != "POST"
|
||||
|| request.url.path() != "/codex/analytics-events/events"
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let payload: Value =
|
||||
serde_json::from_slice(&request.body).expect("valid analytics payload");
|
||||
let Some(events) = payload["events"].as_array() else {
|
||||
continue;
|
||||
};
|
||||
if let Some(event) = events
|
||||
.iter()
|
||||
.find(|event| event["event_type"] == event_type)
|
||||
{
|
||||
return event.clone();
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("analytics event before timeout")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_server_notification_serialization() {
|
||||
let notification =
|
||||
@@ -876,6 +1065,47 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_request_response_from_result_decodes_typed_response() {
|
||||
let request = ServerRequest::CommandExecutionRequestApproval {
|
||||
request_id: RequestId::Integer(7),
|
||||
params: CommandExecutionRequestApprovalParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
approval_id: None,
|
||||
reason: None,
|
||||
network_approval_context: None,
|
||||
command: Some("echo hi".to_string()),
|
||||
cwd: None,
|
||||
command_actions: None,
|
||||
additional_permissions: None,
|
||||
proposed_execpolicy_amendment: None,
|
||||
proposed_network_policy_amendments: None,
|
||||
available_decisions: None,
|
||||
},
|
||||
};
|
||||
|
||||
let response = request
|
||||
.response_from_result(&json!({
|
||||
"decision": "acceptForSession",
|
||||
}))
|
||||
.expect("decode typed server response");
|
||||
|
||||
let ServerResponse::CommandExecutionRequestApproval {
|
||||
request_id,
|
||||
response,
|
||||
} = response
|
||||
else {
|
||||
panic!("expected command execution approval response");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
assert_eq!(
|
||||
response.decision,
|
||||
CommandExecutionApprovalDecision::AcceptForSession
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_model_verification_notification_serialization() {
|
||||
let notification = ServerNotification::ModelVerification(ModelVerificationNotification {
|
||||
@@ -903,14 +1133,20 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
};
|
||||
|
||||
outgoing
|
||||
.send_response(request_id.clone(), json!({ "ok": true }))
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
ClientResponsePayload::ThreadArchive(
|
||||
codex_app_server_protocol::ThreadArchiveResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let envelope = timeout(Duration::from_secs(1), rx.recv())
|
||||
@@ -929,7 +1165,7 @@ mod tests {
|
||||
panic!("expected response message");
|
||||
};
|
||||
assert_eq!(response.id, request_id.request_id);
|
||||
assert_eq!(response.result, json!({ "ok": true }));
|
||||
assert_eq!(response.result, json!({}));
|
||||
}
|
||||
other => panic!("expected targeted response envelope, got: {other:?}"),
|
||||
}
|
||||
@@ -938,7 +1174,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_clears_registered_request_context() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -954,16 +1191,103 @@ mod tests {
|
||||
assert_eq!(outgoing.request_context_count().await, 1);
|
||||
|
||||
outgoing
|
||||
.send_response(request_id, json!({ "ok": true }))
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadArchive(
|
||||
codex_app_server_protocol::ThreadArchiveResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(outgoing.request_context_count().await, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_response_tracks_accepted_turn_steer_analytics() {
|
||||
let analytics_server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/codex/analytics-events/events"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.mount(&analytics_server)
|
||||
.await;
|
||||
|
||||
let analytics_events_client = codex_analytics::AnalyticsEventsClient::new(
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
|
||||
analytics_server.uri(),
|
||||
/*analytics_enabled*/ Some(true),
|
||||
AuthManagerRetention::Strong,
|
||||
);
|
||||
analytics_events_client.track_initialize(
|
||||
/*connection_id*/ 7,
|
||||
InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: Some(InitializeCapabilities {
|
||||
experimental_api: false,
|
||||
opt_out_notification_methods: None,
|
||||
}),
|
||||
},
|
||||
"codex-tui".to_string(),
|
||||
AppServerRpcTransport::Stdio,
|
||||
);
|
||||
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx, analytics_events_client.clone());
|
||||
outgoing
|
||||
.send_response(
|
||||
ConnectionRequestId {
|
||||
connection_id: ConnectionId(7),
|
||||
request_id: RequestId::Integer(1),
|
||||
},
|
||||
sample_thread_start_response("thread-1"),
|
||||
)
|
||||
.await;
|
||||
|
||||
analytics_events_client.track_request(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(2),
|
||||
&sample_turn_start_request("thread-1"),
|
||||
);
|
||||
outgoing
|
||||
.send_response(
|
||||
ConnectionRequestId {
|
||||
connection_id: ConnectionId(7),
|
||||
request_id: RequestId::Integer(2),
|
||||
},
|
||||
sample_turn_start_response("turn-1"),
|
||||
)
|
||||
.await;
|
||||
|
||||
analytics_events_client.track_request(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(3),
|
||||
&sample_turn_steer_request("thread-1", "turn-1"),
|
||||
);
|
||||
outgoing
|
||||
.send_response(
|
||||
ConnectionRequestId {
|
||||
connection_id: ConnectionId(7),
|
||||
request_id: RequestId::Integer(3),
|
||||
},
|
||||
sample_turn_steer_response("turn-1"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let event = wait_for_analytics_event(&analytics_server, "codex_turn_steer_event").await;
|
||||
assert_eq!(event["event_params"]["thread_id"], "thread-1");
|
||||
assert_eq!(event["event_params"]["expected_turn_id"], "turn-1");
|
||||
assert_eq!(event["event_params"]["accepted_turn_id"], "turn-1");
|
||||
assert_eq!(event["event_params"]["result"], "accepted");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_error_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1001,7 +1325,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let send_task = tokio::spawn(async move {
|
||||
outgoing
|
||||
.send_server_notification_to_connection_and_wait(
|
||||
@@ -1045,7 +1370,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn connection_closed_clears_registered_request_contexts() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
let closed_connection_request = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1079,7 +1405,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn notify_client_error_forwards_error_to_waiter() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing =
|
||||
OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled());
|
||||
|
||||
let (request_id, wait_for_result) = outgoing
|
||||
.send_request(ServerRequestPayload::ApplyPatchApproval(
|
||||
@@ -1113,7 +1440,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
@@ -1171,7 +1501,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
|
||||
@@ -722,6 +722,7 @@ mod tests {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)));
|
||||
|
||||
manager
|
||||
@@ -764,6 +765,7 @@ mod tests {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
)));
|
||||
|
||||
manager
|
||||
|
||||
@@ -46,6 +46,7 @@ use async_channel::Sender;
|
||||
use chrono::Local;
|
||||
use chrono::Utc;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::AuthManagerRetention;
|
||||
use codex_analytics::SubAgentThreadStartedInput;
|
||||
use codex_app_server_protocol::McpServerElicitationRequest;
|
||||
use codex_app_server_protocol::McpServerElicitationRequestParams;
|
||||
|
||||
@@ -775,6 +775,7 @@ impl Session {
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
AuthManagerRetention::Strong,
|
||||
)
|
||||
});
|
||||
let services = SessionServices {
|
||||
|
||||
@@ -3418,6 +3418,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
AuthManagerRetention::Strong,
|
||||
),
|
||||
hooks: Hooks::new(HooksConfig {
|
||||
legacy_notify_argv: config.notify.clone(),
|
||||
@@ -4812,6 +4813,7 @@ where
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
AuthManagerRetention::Strong,
|
||||
),
|
||||
hooks: Hooks::new(HooksConfig {
|
||||
legacy_notify_argv: config.notify.clone(),
|
||||
|
||||
Reference in New Issue
Block a user