Compare commits

...

1 Commits

Author SHA1 Message Date
Roy Han
746e6df696 type client responses 2026-04-22 13:54:17 -07:00
17 changed files with 941 additions and 235 deletions

View File

@@ -70,6 +70,8 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -943,6 +945,64 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
);
}
#[tokio::test]
async fn unrelated_client_requests_are_ignored_by_reducer() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
reducer
.ingest(
AnalyticsFact::Request {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(ClientRequest::ThreadArchive {
request_id: RequestId::Integer(3),
params: ThreadArchiveParams {
thread_id: "thread-2".to_string(),
},
}),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
&mut events,
)
.await;
assert!(
events.is_empty(),
"unrelated requests must not create pending turn state"
);
}
#[tokio::test]
async fn unrelated_client_responses_are_ignored_by_reducer() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_initialize(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(ClientResponse::ThreadArchive {
request_id: RequestId::Integer(9),
response: ThreadArchiveResponse {},
}),
},
&mut events,
)
.await;
assert!(events.is_empty());
}
#[tokio::test]
async fn compaction_event_ingests_custom_fact() {
let mut reducer = AnalyticsReducer::default();

View File

@@ -22,6 +22,7 @@ use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
@@ -123,6 +124,18 @@ impl AnalyticsEventsClient {
}
}
pub fn disabled() -> Self {
let (sender, _receiver) = mpsc::channel(1);
Self {
queue: AnalyticsEventsQueue {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
},
analytics_enabled: Some(false),
}
}
pub fn track_skill_invocations(
&self,
tracking: TrackEventsContext,
@@ -276,6 +289,19 @@ impl AnalyticsEventsClient {
});
}
pub fn track_response_payload(
&self,
connection_id: u64,
request_id: RequestId,
response: Box<ClientResponsePayload>,
) {
self.record_fact(AnalyticsFact::ResponsePayload {
connection_id,
request_id,
response,
});
}
pub fn track_error_response(
&self,
connection_id: u64,

View File

@@ -3,6 +3,7 @@ use crate::events::CodexRuntimeMetadata;
use crate::events::GuardianReviewEventParams;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
@@ -280,6 +281,11 @@ pub(crate) enum AnalyticsFact {
connection_id: u64,
response: Box<ClientResponse>,
},
ResponsePayload {
connection_id: u64,
request_id: RequestId,
response: Box<ClientResponsePayload>,
},
ErrorResponse {
connection_id: u64,
request_id: RequestId,

View File

@@ -184,6 +184,15 @@ impl AnalyticsReducer {
} => {
self.ingest_response(connection_id, *response, out);
}
AnalyticsFact::ResponsePayload {
connection_id,
request_id,
response,
} => {
if let Some(response) = response.into_client_response(request_id) {
self.ingest_response(connection_id, response, out);
}
}
AnalyticsFact::ErrorResponse {
connection_id,
request_id,

View File

@@ -153,6 +153,108 @@ macro_rules! client_request_definitions {
})
.unwrap_or_else(|| "<unknown>".to_string())
}
pub fn into_jsonrpc_parts(
self,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
match self {
$(
Self::$variant { request_id, response } => {
serde_json::to_value(response).map(|result| (request_id, result))
}
)*
}
}
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ClientResponsePayload {
$( $variant($response), )*
InterruptConversation(v1::InterruptConversationResponse),
}
impl ClientResponsePayload {
pub fn into_jsonrpc_parts_and_payload(
self,
request_id: RequestId,
) -> std::result::Result<
(RequestId, crate::Result, Option<Box<ClientResponsePayload>>),
serde_json::Error,
> {
match self {
$(
Self::$variant(response) => {
let result = serde_json::to_value(&response)?;
Ok((request_id, result, Some(Box::new(Self::$variant(response)))))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result, None))
}
}
}
pub fn into_client_response(self, request_id: RequestId) -> Option<ClientResponse> {
match self {
$(
Self::$variant(response) => {
Some(ClientResponse::$variant {
request_id,
response,
})
}
)*
Self::InterruptConversation(_) => None,
}
}
pub fn into_jsonrpc_parts_and_client_response(
self,
request_id: RequestId,
) -> std::result::Result<
(RequestId, crate::Result, Option<ClientResponse>),
serde_json::Error,
> {
match self {
$(
Self::$variant(response) => {
let result = serde_json::to_value(&response)?;
let client_response = ClientResponse::$variant {
request_id: request_id.clone(),
response,
};
Ok((request_id, result, Some(client_response)))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result, None))
}
}
}
pub fn into_jsonrpc_parts(
self,
request_id: RequestId,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
self.to_jsonrpc_parts(request_id)
}
pub fn to_jsonrpc_parts(
&self,
request_id: RequestId,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
match self {
$(
Self::$variant(response) => {
serde_json::to_value(response).map(|result| (request_id, result))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result))
}
}
}
}
impl crate::experimental_api::ExperimentalApi for ClientRequest {
@@ -1098,6 +1200,7 @@ mod tests {
use codex_protocol::protocol::RealtimeConversationVersion;
use codex_protocol::protocol::RealtimeOutputModality;
use codex_protocol::protocol::RealtimeVoice;
use codex_protocol::protocol::TurnAbortReason;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_absolute_path::test_support::PathBufExt;
use codex_utils_absolute_path::test_support::test_path_buf;
@@ -1510,6 +1613,45 @@ mod tests {
Ok(())
}
#[test]
fn client_response_payload_returns_jsonrpc_parts_and_client_response() -> Result<()> {
let (request_id, result, client_response) =
ClientResponsePayload::ThreadArchive(v2::ThreadArchiveResponse {})
.into_jsonrpc_parts_and_client_response(RequestId::Integer(7))?;
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(result, json!({}));
let Some(ClientResponse::ThreadArchive {
request_id,
response: _,
}) = client_response
else {
panic!("expected thread/archive client response");
};
assert_eq!(request_id, RequestId::Integer(7));
Ok(())
}
#[test]
fn interrupt_conversation_payload_stays_jsonrpc_only() -> Result<()> {
let (request_id, result, client_response) =
ClientResponsePayload::InterruptConversation(v1::InterruptConversationResponse {
abort_reason: TurnAbortReason::Interrupted,
})
.into_jsonrpc_parts_and_client_response(RequestId::Integer(8))?;
assert_eq!(request_id, RequestId::Integer(8));
assert_eq!(
result,
json!({
"abortReason": "interrupted",
})
);
assert!(client_response.is_none());
Ok(())
}
#[test]
fn serialize_config_requirements_read() -> Result<()> {
let request = ClientRequest::ConfigRequirementsRead {

View File

@@ -0,0 +1,16 @@
use std::sync::Arc;
use codex_analytics::AnalyticsEventsClient;
use codex_core::config::Config;
use codex_login::AuthManager;
pub(crate) fn analytics_events_client_from_config(
auth_manager: Arc<AuthManager>,
config: &Config,
) -> AnalyticsEventsClient {
AnalyticsEventsClient::new(
auth_manager,
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
)
}

View File

@@ -18,6 +18,7 @@ use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermis
use codex_app_server_protocol::AgentMessageDeltaNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApplyPatchApprovalResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo;
use codex_app_server_protocol::CollabAgentState as V2CollabAgentStatus;
use codex_app_server_protocol::CollabAgentTool;
@@ -1826,11 +1827,18 @@ pub(crate) async fn apply_bespoke_event_handling(
let response = InterruptConversationResponse {
abort_reason: turn_aborted_event.reason.clone(),
};
outgoing.send_response(rid, response).await;
outgoing
.send_response(
rid,
ClientResponsePayload::InterruptConversation(response),
)
.await;
}
ApiVersion::V2 => {
let response = TurnInterruptResponse {};
outgoing.send_response(rid, response).await;
outgoing
.send_response(rid, ClientResponsePayload::TurnInterrupt(response))
.await;
}
}
}
@@ -1920,7 +1928,9 @@ pub(crate) async fn apply_bespoke_event_handling(
}
};
outgoing.send_response(request_id, response).await;
outgoing
.send_response(request_id, ClientResponsePayload::ThreadRollback(response))
.await;
}
}
EventMsg::ThreadNameUpdated(thread_name_event) => {
@@ -3349,7 +3359,11 @@ mod tests {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3418,7 +3432,11 @@ mod tests {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3508,7 +3526,11 @@ mod tests {
let thread_state = new_thread_state();
let thread_watch_manager = ThreadWatchManager::new();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4079,7 +4101,11 @@ mod tests {
let conversation_id = ThreadId::new();
let event_turn_id = "complete1".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4143,7 +4169,11 @@ mod tests {
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4191,7 +4221,11 @@ mod tests {
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4233,7 +4267,11 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4287,7 +4325,11 @@ mod tests {
let conversation_id = ThreadId::new();
let turn_id = "turn-123".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4376,7 +4418,11 @@ mod tests {
let conversation_id = ThreadId::new();
let turn_id = "turn-456".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4449,7 +4495,11 @@ mod tests {
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4711,7 +4761,11 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4749,7 +4803,11 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4775,7 +4833,11 @@ mod tests {
#[tokio::test]
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let conversation_id = ThreadId::new();
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,

View File

@@ -42,7 +42,7 @@ use codex_app_server_protocol::CancelLoginAccountParams;
use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginAccountStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
@@ -1278,7 +1278,9 @@ impl CodexMessageProcessor {
match self.login_api_key_common(&params).await {
Ok(()) => {
let response = codex_app_server_protocol::LoginAccountResponse::ApiKey {};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::LoginAccount(response))
.await;
let payload_login_completed = AccountLoginCompletedNotification {
login_id: None,
@@ -1447,7 +1449,9 @@ impl CodexMessageProcessor {
login_id: login_id.to_string(),
auth_url,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::LoginAccount(response))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -1490,7 +1494,9 @@ impl CodexMessageProcessor {
verification_url,
user_code,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::LoginAccount(response))
.await;
let outgoing_clone = self.outgoing.clone();
let active_login = self.active_login.clone();
@@ -1588,7 +1594,12 @@ impl CodexMessageProcessor {
Err(CancelLoginError::NotFound) => CancelLoginAccountStatus::NotFound,
};
let response = CancelLoginAccountResponse { status };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::CancelLoginAccount(response),
)
.await;
}
Err(_) => {
let error = JSONRPCErrorError {
@@ -1668,7 +1679,10 @@ impl CodexMessageProcessor {
.await;
self.outgoing
.send_response(request_id, LoginAccountResponse::ChatgptAuthTokens {})
.send_response(
request_id,
ClientResponsePayload::LoginAccount(LoginAccountResponse::ChatgptAuthTokens {}),
)
.await;
let payload_login_completed = AccountLoginCompletedNotification {
@@ -1721,7 +1735,10 @@ impl CodexMessageProcessor {
match self.logout_common().await {
Ok(current_auth_method) => {
self.outgoing
.send_response(request_id, LogoutAccountResponse {})
.send_response(
request_id,
ClientResponsePayload::LogoutAccount(LogoutAccountResponse {}),
)
.await;
let payload_v2 = AccountUpdatedNotification {
@@ -1811,7 +1828,9 @@ impl CodexMessageProcessor {
}
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::GetAuthStatus(response))
.await;
}
async fn get_account(&self, request_id: ConnectionRequestId, params: GetAccountParams) {
@@ -1827,7 +1846,9 @@ impl CodexMessageProcessor {
account: None,
requires_openai_auth,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::GetAccount(response))
.await;
return;
}
@@ -1863,7 +1884,9 @@ impl CodexMessageProcessor {
account,
requires_openai_auth,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::GetAccount(response))
.await;
}
async fn get_account_rate_limits(&self, request_id: ConnectionRequestId) {
@@ -1878,7 +1901,12 @@ impl CodexMessageProcessor {
.collect(),
),
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::GetAccountRateLimits(response),
)
.await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -1894,7 +1922,12 @@ impl CodexMessageProcessor {
match self.send_add_credits_nudge_email_inner(params).await {
Ok(status) => {
self.outgoing
.send_response(request_id, SendAddCreditsNudgeEmailResponse { status })
.send_response(
request_id,
ClientResponsePayload::SendAddCreditsNudgeEmail(
SendAddCreditsNudgeEmailResponse { status },
),
)
.await;
}
Err(error) => {
@@ -2283,7 +2316,14 @@ impl CodexMessageProcessor {
.write(request_id.clone(), params)
.await
{
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(
request_id,
ClientResponsePayload::CommandExecWrite(response),
)
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -2298,7 +2338,14 @@ impl CodexMessageProcessor {
.resize(request_id.clone(), params)
.await
{
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(
request_id,
ClientResponsePayload::CommandExecResize(response),
)
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -2313,7 +2360,14 @@ impl CodexMessageProcessor {
.terminate(request_id.clone(), params)
.await
{
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(
request_id,
ClientResponsePayload::CommandExecTerminate(response),
)
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -2699,21 +2753,9 @@ impl CodexMessageProcessor {
sandbox: config_snapshot.sandbox_policy.into(),
reasoning_effort: config_snapshot.reasoning_effort,
};
if listener_task_context.general_analytics_enabled {
listener_task_context
.analytics_events_client
.track_response(
request_id.connection_id.0,
ClientResponse::ThreadStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
listener_task_context
.outgoing
.send_response(request_id, response)
.send_response(request_id, ClientResponsePayload::ThreadStart(response))
.instrument(tracing::info_span!(
"app_server.thread_start.send_response",
otel.name = "app_server.thread_start.send_response",
@@ -2868,7 +2910,10 @@ impl CodexMessageProcessor {
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
else {
self.outgoing
.send_response(request_id, ThreadArchiveResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
)
.await;
return;
};
@@ -2913,7 +2958,10 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, ThreadArchiveResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
)
.await;
for thread_id in archived_thread_ids {
let notification = ThreadArchivedNotification { thread_id };
@@ -2941,10 +2989,12 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ThreadIncrementElicitationResponse {
count,
paused: count > 0,
},
ClientResponsePayload::ThreadIncrementElicitation(
ThreadIncrementElicitationResponse {
count,
paused: count > 0,
},
),
)
.await;
}
@@ -2976,10 +3026,12 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ThreadDecrementElicitationResponse {
count,
paused: count > 0,
},
ClientResponsePayload::ThreadDecrementElicitation(
ThreadDecrementElicitationResponse {
count,
paused: count > 0,
},
),
)
.await;
}
@@ -3026,7 +3078,10 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, ThreadSetNameResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadSetName(ThreadSetNameResponse {}),
)
.await;
return;
}
@@ -3050,7 +3105,10 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, ThreadSetNameResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadSetName(ThreadSetNameResponse {}),
)
.await;
let notification = ThreadNameUpdatedNotification {
thread_id: thread_id.to_string(),
@@ -3096,7 +3154,10 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, ThreadMemoryModeSetResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadMemoryModeSet(ThreadMemoryModeSetResponse {}),
)
.await;
return;
}
@@ -3123,7 +3184,10 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, ThreadMemoryModeSetResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadMemoryModeSet(ThreadMemoryModeSetResponse {}),
)
.await;
}
@@ -3167,7 +3231,10 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, MemoryResetResponse {})
.send_response(
request_id,
ClientResponsePayload::MemoryReset(MemoryResetResponse {}),
)
.await;
}
@@ -3333,7 +3400,12 @@ impl CodexMessageProcessor {
);
self.outgoing
.send_response(request_id, ThreadMetadataUpdateResponse { thread })
.send_response(
request_id,
ClientResponsePayload::ThreadMetadataUpdate(ThreadMetadataUpdateResponse {
thread,
}),
)
.await;
}
@@ -3514,7 +3586,9 @@ impl CodexMessageProcessor {
self.attach_thread_name(thread_id, &mut thread).await;
let thread_id = thread.id.clone();
let response = ThreadUnarchiveResponse { thread };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::ThreadUnarchive(response))
.await;
let notification = ThreadUnarchivedNotification { thread_id };
self.outgoing
.send_server_notification(ServerNotification::ThreadUnarchived(notification))
@@ -3606,7 +3680,10 @@ impl CodexMessageProcessor {
{
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadCompactStartResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadCompactStart(ThreadCompactStartResponse {}),
)
.await;
}
Err(err) => {
@@ -3637,7 +3714,12 @@ impl CodexMessageProcessor {
{
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadBackgroundTerminalsCleanResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadBackgroundTerminalsClean(
ThreadBackgroundTerminalsCleanResponse {},
),
)
.await;
}
Err(err) => {
@@ -3689,7 +3771,10 @@ impl CodexMessageProcessor {
{
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadShellCommandResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadShellCommand(ThreadShellCommandResponse {}),
)
.await;
}
Err(err) => {
@@ -3793,7 +3878,9 @@ impl CodexMessageProcessor {
next_cursor,
backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::ThreadList(response))
.await;
}
async fn thread_loaded_list(
@@ -3815,7 +3902,12 @@ impl CodexMessageProcessor {
data,
next_cursor: None,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::ThreadLoadedList(response),
)
.await;
return;
}
@@ -3852,7 +3944,12 @@ impl CodexMessageProcessor {
data: page,
next_cursor,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::ThreadLoadedList(response),
)
.await;
}
async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
@@ -3882,7 +3979,9 @@ impl CodexMessageProcessor {
}
};
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::ThreadRead(response))
.await;
}
/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
@@ -4162,7 +4261,9 @@ impl CodexMessageProcessor {
next_cursor: page.next_cursor,
backwards_cursor: page.backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::ThreadTurnsList(response))
.await;
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
@@ -4432,23 +4533,15 @@ impl CodexMessageProcessor {
sandbox: session_configured.sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadResume {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
let connection_id = request_id.connection_id;
let token_usage_thread = response.thread.clone();
let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items(
&response_history.get_rollout_items(),
&token_usage_thread,
);
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::ThreadResume(response))
.await;
// The client needs restored usage before it starts another turn.
// Sending after the response preserves JSON-RPC request ordering while
// still filling the status line before the next turn lifecycle begins.
@@ -5077,16 +5170,6 @@ impl CodexMessageProcessor {
sandbox: session_configured.sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadFork {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
let connection_id = request_id.connection_id;
let token_usage_thread = response.thread.clone();
let token_usage_turn_id = if let Some(turn_id) =
@@ -5100,7 +5183,9 @@ impl CodexMessageProcessor {
)
.await
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::ThreadFork(response))
.await;
// Mirror the resume contract for forks: the new thread is usable as soon
// as the response arrives, so restored usage must follow immediately.
send_thread_token_usage_update_to_connection(
@@ -5177,7 +5262,12 @@ impl CodexMessageProcessor {
return;
};
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::GetConversationSummary(response),
)
.await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -5311,7 +5401,9 @@ impl CodexMessageProcessor {
data: Vec::new(),
next_cursor: None,
};
outgoing.send_response(request_id, response).await;
outgoing
.send_response(request_id, ClientResponsePayload::ModelList(response))
.await;
return;
}
@@ -5354,7 +5446,9 @@ impl CodexMessageProcessor {
data: items,
next_cursor,
};
outgoing.send_response(request_id, response).await;
outgoing
.send_response(request_id, ClientResponsePayload::ModelList(response))
.await;
}
async fn list_collaboration_modes(
@@ -5370,7 +5464,12 @@ impl CodexMessageProcessor {
.map(Into::into)
.collect();
let response = CollaborationModeListResponse { data: items };
outgoing.send_response(request_id, response).await;
outgoing
.send_response(
request_id,
ClientResponsePayload::CollaborationModeList(response),
)
.await;
}
async fn experimental_feature_list(
@@ -5431,10 +5530,12 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ExperimentalFeatureListResponse {
data: Vec::new(),
next_cursor: None,
},
ClientResponsePayload::ExperimentalFeatureList(
ExperimentalFeatureListResponse {
data: Vec::new(),
next_cursor: None,
},
),
)
.await;
return;
@@ -5478,7 +5579,10 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ExperimentalFeatureListResponse { data, next_cursor },
ClientResponsePayload::ExperimentalFeatureList(ExperimentalFeatureListResponse {
data,
next_cursor,
}),
)
.await;
}
@@ -5490,7 +5594,12 @@ impl CodexMessageProcessor {
) {
let MockExperimentalMethodParams { value } = params;
let response = MockExperimentalMethodResponse { echoed: value };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::MockExperimentalMethod(response),
)
.await;
}
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
@@ -5508,7 +5617,12 @@ impl CodexMessageProcessor {
}
let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::McpServerRefresh(response),
)
.await;
}
async fn queue_mcp_server_refresh_for_config(
@@ -5654,7 +5768,12 @@ impl CodexMessageProcessor {
});
let response = McpServerOauthLoginResponse { authorization_url };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(
request_id,
ClientResponsePayload::McpServerOauthLogin(response),
)
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -5813,7 +5932,12 @@ impl CodexMessageProcessor {
let response = ListMcpServerStatusResponse { data, next_cursor };
outgoing.send_response(request_id, response).await;
outgoing
.send_response(
request_id,
ClientResponsePayload::McpServerStatusList(response),
)
.await;
}
async fn read_mcp_resource(
@@ -5890,7 +6014,9 @@ impl CodexMessageProcessor {
match result {
Ok(result) => match serde_json::from_value::<McpResourceReadResponse>(result) {
Ok(response) => {
outgoing.send_response(request_id, response).await;
outgoing
.send_response(request_id, ClientResponsePayload::McpResourceRead(response))
.await;
}
Err(error) => {
outgoing
@@ -5945,7 +6071,12 @@ impl CodexMessageProcessor {
match result {
Ok(result) => {
outgoing
.send_response(request_id, McpServerToolCallResponse::from(result))
.send_response(
request_id,
ClientResponsePayload::McpServerToolCall(
McpServerToolCallResponse::from(result),
),
)
.await;
}
Err(error) => {
@@ -6125,9 +6256,9 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ThreadUnsubscribeResponse {
ClientResponsePayload::ThreadUnsubscribe(ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::NotLoaded,
},
}),
)
.await;
return;
@@ -6144,7 +6275,10 @@ impl CodexMessageProcessor {
ThreadUnsubscribeStatus::NotSubscribed
};
self.outgoing
.send_response(request_id, ThreadUnsubscribeResponse { status })
.send_response(
request_id,
ClientResponsePayload::ThreadUnsubscribe(ThreadUnsubscribeResponse { status }),
)
.await;
}
@@ -6199,10 +6333,10 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
AppsListResponse {
ClientResponsePayload::AppsList(AppsListResponse {
data: Vec::new(),
next_cursor: None,
},
}),
)
.await;
return;
@@ -6389,7 +6523,9 @@ impl CodexMessageProcessor {
if accessible_loaded && all_loaded {
match apps_list_helpers::paginate_apps(merged.as_slice(), start, limit) {
Ok(response) => {
outgoing.send_response(request_id, response).await;
outgoing
.send_response(request_id, ClientResponsePayload::AppsList(response))
.await;
return;
}
Err(error) => {
@@ -6527,7 +6663,10 @@ impl CodexMessageProcessor {
});
}
self.outgoing
.send_response(request_id, SkillsListResponse { data })
.send_response(
request_id,
ClientResponsePayload::SkillsList(SkillsListResponse { data }),
)
.await;
}
async fn marketplace_remove(
@@ -6548,10 +6687,10 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
MarketplaceRemoveResponse {
ClientResponsePayload::MarketplaceRemove(MarketplaceRemoveResponse {
marketplace_name: outcome.marketplace_name,
installed_root: outcome.removed_installed_root,
},
}),
)
.await;
}
@@ -6579,11 +6718,11 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
MarketplaceAddResponse {
ClientResponsePayload::MarketplaceAdd(MarketplaceAddResponse {
marketplace_name: outcome.marketplace_name,
installed_root: outcome.installed_root,
already_added: outcome.already_added,
},
}),
)
.await;
}
@@ -6637,9 +6776,9 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
SkillsConfigWriteResponse {
ClientResponsePayload::SkillsConfigWrite(SkillsConfigWriteResponse {
effective_enabled: enabled,
},
}),
)
.await;
}
@@ -6779,16 +6918,9 @@ impl CodexMessageProcessor {
};
let response = TurnStartResponse { turn };
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::TurnStart(response))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -6835,7 +6967,10 @@ impl CodexMessageProcessor {
match thread.inject_response_items(items).await {
Ok(()) => {
self.outgoing
.send_response(request_id, ThreadInjectItemsResponse {})
.send_response(
request_id,
ClientResponsePayload::ThreadInjectItems(ThreadInjectItemsResponse {}),
)
.await;
}
Err(CodexErr::InvalidRequest(message)) => {
@@ -6913,16 +7048,9 @@ impl CodexMessageProcessor {
{
Ok(turn_id) => {
let response = TurnSteerResponse { turn_id };
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnSteer {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::TurnSteer(response))
.await;
}
Err(err) => {
let (code, message, data, error_type) = match err {
@@ -7075,7 +7203,12 @@ impl CodexMessageProcessor {
match submit {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadRealtimeStartResponse::default())
.send_response(
request_id,
ClientResponsePayload::ThreadRealtimeStart(
ThreadRealtimeStartResponse::default(),
),
)
.await;
}
Err(err) => {
@@ -7113,7 +7246,12 @@ impl CodexMessageProcessor {
match submit {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadRealtimeAppendAudioResponse::default())
.send_response(
request_id,
ClientResponsePayload::ThreadRealtimeAppendAudio(
ThreadRealtimeAppendAudioResponse::default(),
),
)
.await;
}
Err(err) => {
@@ -7149,7 +7287,12 @@ impl CodexMessageProcessor {
match submit {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadRealtimeAppendTextResponse::default())
.send_response(
request_id,
ClientResponsePayload::ThreadRealtimeAppendText(
ThreadRealtimeAppendTextResponse::default(),
),
)
.await;
}
Err(err) => {
@@ -7181,7 +7324,12 @@ impl CodexMessageProcessor {
match submit {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadRealtimeStopResponse::default())
.send_response(
request_id,
ClientResponsePayload::ThreadRealtimeStop(
ThreadRealtimeStopResponse::default(),
),
)
.await;
}
Err(err) => {
@@ -7202,9 +7350,9 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
ThreadRealtimeListVoicesResponse {
ClientResponsePayload::ThreadRealtimeListVoices(ThreadRealtimeListVoicesResponse {
voices: RealtimeVoicesList::builtin(),
},
}),
)
.await;
}
@@ -7245,7 +7393,10 @@ impl CodexMessageProcessor {
review_thread_id,
};
self.outgoing
.send_response(request_id.clone(), response)
.send_response(
request_id.clone(),
ClientResponsePayload::ReviewStart(response),
)
.await;
}
@@ -7489,7 +7640,10 @@ impl CodexMessageProcessor {
match submit_result {
Ok(_) if is_startup_interrupt => {
self.outgoing
.send_response(request_id, TurnInterruptResponse {})
.send_response(
request_id,
ClientResponsePayload::TurnInterrupt(TurnInterruptResponse {}),
)
.await;
}
Ok(_) => {}
@@ -7835,7 +7989,9 @@ impl CodexMessageProcessor {
sha: value.sha,
diff: value.diff,
};
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::GitDiffToRemote(response))
.await;
}
None => {
let error = JSONRPCErrorError {
@@ -7889,7 +8045,9 @@ impl CodexMessageProcessor {
}
let response = FuzzyFileSearchResponse { files: results };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::FuzzyFileSearch(response))
.await;
}
async fn fuzzy_file_search_session_start(
@@ -7917,7 +8075,12 @@ impl CodexMessageProcessor {
.await
.insert(session_id, session);
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
.send_response(
request_id,
ClientResponsePayload::FuzzyFileSearchSessionStart(
FuzzyFileSearchSessionStartResponse {},
),
)
.await;
}
Err(err) => {
@@ -7957,7 +8120,12 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
.send_response(
request_id,
ClientResponsePayload::FuzzyFileSearchSessionUpdate(
FuzzyFileSearchSessionUpdateResponse {},
),
)
.await;
}
@@ -7973,7 +8141,12 @@ impl CodexMessageProcessor {
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
.send_response(
request_id,
ClientResponsePayload::FuzzyFileSearchSessionStop(
FuzzyFileSearchSessionStopResponse {},
),
)
.await;
}
@@ -8150,7 +8323,9 @@ impl CodexMessageProcessor {
match upload_result {
Ok(()) => {
let response = FeedbackUploadResponse { thread_id };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id, ClientResponsePayload::FeedbackUpload(response))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -8171,7 +8346,9 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id.clone(),
WindowsSandboxSetupStartResponse { started: true },
ClientResponsePayload::WindowsSandboxSetupStart(WindowsSandboxSetupStartResponse {
started: true,
}),
)
.await;
@@ -8475,7 +8652,9 @@ async fn handle_pending_thread_resume_request(
&token_usage_thread,
)
.await;
outgoing.send_response(request_id, response).await;
outgoing
.send_response(request_id, ClientResponsePayload::ThreadResume(response))
.await;
// Rejoining a loaded thread has the same UI contract as a cold resume, but
// uses the live conversation state instead of reconstructing a new session.
send_thread_token_usage_update_to_connection(
@@ -10619,7 +10798,11 @@ mod tests {
let connection_id = ConnectionId(7);
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
vec![connection_id],

View File

@@ -1,4 +1,5 @@
use super::*;
use codex_app_server_protocol::ClientResponsePayload;
use codex_plugin::validate_plugin_segment;
impl CodexMessageProcessor {
@@ -22,11 +23,11 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
PluginListResponse {
ClientResponsePayload::PluginList(PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
},
}),
)
.await;
return;
@@ -161,11 +162,11 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
PluginListResponse {
ClientResponsePayload::PluginList(PluginListResponse {
marketplaces: data,
marketplace_load_errors,
featured_plugin_ids,
},
}),
)
.await;
}
@@ -338,7 +339,10 @@ impl CodexMessageProcessor {
};
self.outgoing
.send_response(request_id, PluginReadResponse { plugin })
.send_response(
request_id,
ClientResponsePayload::PluginRead(PluginReadResponse { plugin }),
)
.await;
}
@@ -487,10 +491,10 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
PluginInstallResponse {
ClientResponsePayload::PluginInstall(PluginInstallResponse {
auth_policy: result.auth_policy.into(),
apps_needing_auth,
},
}),
)
.await;
}
@@ -553,7 +557,10 @@ impl CodexMessageProcessor {
Ok(()) => {
self.clear_plugin_related_caches();
self.outgoing
.send_response(request_id, PluginUninstallResponse {})
.send_response(
request_id,
ClientResponsePayload::PluginUninstall(PluginUninstallResponse {}),
)
.await;
}
Err(err) => {

View File

@@ -6,6 +6,7 @@ use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CommandExecOutputDeltaNotification;
use codex_app_server_protocol::CommandExecOutputStream;
use codex_app_server_protocol::CommandExecResizeParams;
@@ -209,11 +210,11 @@ impl CommandExecManager {
outgoing
.send_response(
request_id,
CommandExecResponse {
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
exit_code: output.exit_code,
stdout: output.stdout.text,
stderr: output.stderr.text,
},
}),
)
.await;
}
@@ -554,11 +555,11 @@ async fn run_command(params: RunCommandParams) {
outgoing
.send_response(
request_id,
CommandExecResponse {
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
exit_code,
stdout,
stderr,
},
}),
)
.await;
}
@@ -757,7 +758,11 @@ mod tests {
let manager = CommandExecManager::default();
let err = manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
)),
request_id: ConnectionRequestId {
connection_id: ConnectionId(1),
request_id: codex_app_server_protocol::RequestId::Integer(42),
@@ -793,7 +798,11 @@ mod tests {
manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
)),
request_id: request_id.clone(),
process_id: Some("proc-99".to_string()),
exec_request: windows_sandbox_exec_request(),
@@ -843,7 +852,11 @@ mod tests {
manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
)),
request_id: request_id.clone(),
process_id: Some("proc-100".to_string()),
exec_request: ExecRequest::new(

View File

@@ -234,7 +234,11 @@ mod tests {
const OUTGOING_BUFFER: usize = 1;
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
FsWatchManager::new_with_file_watcher(
Arc::new(OutgoingMessageSender::new(tx)),
Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
)),
Arc::new(FileWatcher::noop()),
)
}

View File

@@ -82,6 +82,7 @@ use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_exec_server::EnvironmentManager;
use codex_features::Feature;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_protocol::protocol::SessionSource;
@@ -365,7 +366,18 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
let analytics_events_client = crate::analytics::analytics_events_client_from_config(
auth_manager.clone(),
args.config.as_ref(),
);
let general_analytics_enabled = args.config.features.enabled(Feature::GeneralAnalytics);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
general_analytics_enabled,
));
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
let outbound_initialized = Arc::new(AtomicBool::new(false));
@@ -390,8 +402,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
});
let processor_outgoing = Arc::clone(&outgoing_message_sender);
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
let config_manager = ConfigManager::new(
args.config.codex_home.to_path_buf(),
args.cli_overrides,
@@ -404,6 +414,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let mut processor_handle = tokio::spawn(async move {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
arg0_paths: args.arg0_paths,
config: args.config,
config_manager,

View File

@@ -65,6 +65,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::Registry;
use tracing_subscriber::util::SubscriberInitExt;
mod analytics;
mod app_server_tracing;
mod bespoke_event_handling;
mod codex_message_processor;
@@ -645,12 +646,20 @@ pub async fn run_main_with_transport(
});
let processor_handle = tokio::spawn({
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outbound_control_tx = outbound_control_tx;
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
let analytics_events_client =
crate::analytics::analytics_events_client_from_config(auth_manager.clone(), &config);
let general_analytics_enabled = config.features.enabled(Feature::GeneralAnalytics);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
general_analytics_enabled,
));
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
analytics_events_client,
arg0_paths,
config: Arc::new(config),
config_manager,

View File

@@ -33,6 +33,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
use codex_app_server_protocol::ConfigValueWriteParams;
@@ -249,6 +250,7 @@ impl ConnectionSessionState {
pub(crate) struct MessageProcessorArgs {
pub(crate) outgoing: Arc<OutgoingMessageSender>,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) arg0_paths: Arg0DispatchPaths,
pub(crate) config: Arc<Config>,
pub(crate) config_manager: ConfigManager,
@@ -268,6 +270,7 @@ impl MessageProcessor {
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
let MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths,
config,
config_manager,
@@ -283,11 +286,6 @@ impl MessageProcessor {
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
}));
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
@@ -713,7 +711,10 @@ impl MessageProcessor {
};
self.outgoing
.send_response(connection_request_id, response)
.send_response(
connection_request_id,
ClientResponsePayload::Initialize(response),
)
.await;
if let Some(outbound_initialized) = outbound_initialized {
@@ -765,14 +766,10 @@ impl MessageProcessor {
self.outgoing.send_error(connection_request_id, error).await;
return;
}
let connection_id = connection_request_id.connection_id;
if self.config.features.enabled(Feature::GeneralAnalytics)
&& let ClientRequest::TurnStart { request_id, .. }
| ClientRequest::TurnSteer { request_id, .. } = &codex_request
{
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_request(
connection_id.0,
request_id.clone(),
connection_request_id.connection_id.0,
connection_request_id.request_id.clone(),
codex_request.clone(),
);
}
@@ -1019,7 +1016,11 @@ impl MessageProcessor {
async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) {
match self.config_api.read(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::ConfigRead(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1029,7 +1030,11 @@ impl MessageProcessor {
request_id: ConnectionRequestId,
params: ConfigValueWriteParams,
) {
let result = self.config_api.write_value(params).await;
let result = self
.config_api
.write_value(params)
.await
.map(ClientResponsePayload::ConfigValueWrite);
self.handle_config_mutation_result(request_id, result).await
}
@@ -1038,7 +1043,11 @@ impl MessageProcessor {
request_id: ConnectionRequestId,
params: ConfigBatchWriteParams,
) {
let result = self.config_api.batch_write(params).await;
let result = self
.config_api
.batch_write(params)
.await
.map(ClientResponsePayload::ConfigBatchWrite);
self.handle_config_mutation_result(request_id, result).await;
}
@@ -1051,7 +1060,8 @@ impl MessageProcessor {
let result = self
.config_api
.set_experimental_feature_enablement(params)
.await;
.await
.map(ClientResponsePayload::ExperimentalFeatureEnablementSet);
let is_ok = result.is_ok();
self.handle_config_mutation_result(request_id, result).await;
if should_refresh_apps_list && is_ok {
@@ -1129,10 +1139,10 @@ impl MessageProcessor {
});
}
async fn handle_config_mutation_result<T: serde::Serialize>(
async fn handle_config_mutation_result(
&self,
request_id: ConnectionRequestId,
result: std::result::Result<T, JSONRPCErrorError>,
result: std::result::Result<ClientResponsePayload, JSONRPCErrorError>,
) {
match result {
Ok(response) => {
@@ -1168,7 +1178,14 @@ impl MessageProcessor {
async fn handle_config_requirements_read(&self, request_id: ConnectionRequestId) {
match self.config_api.config_requirements_read().await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(
request_id,
ClientResponsePayload::ConfigRequirementsRead(response),
)
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1191,7 +1208,11 @@ impl MessageProcessor {
}
match self.device_key_api.create(params) {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::DeviceKeyCreate(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1214,7 +1235,11 @@ impl MessageProcessor {
}
match self.device_key_api.public(params) {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::DeviceKeyPublic(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1237,7 +1262,11 @@ impl MessageProcessor {
}
match self.device_key_api.sign(params) {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::DeviceKeySign(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1271,7 +1300,14 @@ impl MessageProcessor {
params: ExternalAgentConfigDetectParams,
) {
match self.external_agent_config_api.detect(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(
request_id,
ClientResponsePayload::ExternalAgentConfigDetect(response),
)
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1293,7 +1329,12 @@ impl MessageProcessor {
self.handle_config_mutation().await;
}
self.outgoing
.send_response(request_id, ExternalAgentConfigImportResponse {})
.send_response(
request_id,
ClientResponsePayload::ExternalAgentConfigImport(
ExternalAgentConfigImportResponse {},
),
)
.await;
if !has_plugin_imports {
@@ -1346,7 +1387,11 @@ impl MessageProcessor {
async fn handle_fs_read_file(&self, request_id: ConnectionRequestId, params: FsReadFileParams) {
match self.fs_api.read_file(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsReadFile(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1357,7 +1402,11 @@ impl MessageProcessor {
params: FsWriteFileParams,
) {
match self.fs_api.write_file(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsWriteFile(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1368,7 +1417,14 @@ impl MessageProcessor {
params: FsCreateDirectoryParams,
) {
match self.fs_api.create_directory(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(
request_id,
ClientResponsePayload::FsCreateDirectory(response),
)
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1379,7 +1435,11 @@ impl MessageProcessor {
params: FsGetMetadataParams,
) {
match self.fs_api.get_metadata(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsGetMetadata(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1390,21 +1450,33 @@ impl MessageProcessor {
params: FsReadDirectoryParams,
) {
match self.fs_api.read_directory(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsReadDirectory(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_remove(&self, request_id: ConnectionRequestId, params: FsRemoveParams) {
match self.fs_api.remove(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsRemove(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_copy(&self, request_id: ConnectionRequestId, params: FsCopyParams) {
match self.fs_api.copy(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsCopy(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1416,7 +1488,11 @@ impl MessageProcessor {
params: FsWatchParams,
) {
match self.fs_watch_manager.watch(connection_id, params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsWatch(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
@@ -1428,7 +1504,11 @@ impl MessageProcessor {
params: FsUnwatchParams,
) {
match self.fs_watch_manager.unwatch(connection_id, params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Ok(response) => {
self.outgoing
.send_response(request_id, ClientResponsePayload::FsUnwatch(response))
.await
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}

View File

@@ -263,7 +263,12 @@ fn build_test_processor(
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let analytics_events_client = codex_analytics::AnalyticsEventsClient::disabled();
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
false,
));
let auth_manager =
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
let config_manager = ConfigManager::new(
@@ -276,6 +281,7 @@ fn build_test_processor(
);
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths: Arg0DispatchPaths::default(),
config,
config_manager,

View File

@@ -4,6 +4,8 @@ use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
@@ -117,6 +119,8 @@ pub(crate) struct OutgoingMessageSender {
/// We keep them here because this is where responses, errors, and
/// disconnect cleanup all get handled.
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
analytics_events_client: AnalyticsEventsClient,
general_analytics_enabled: bool,
}
#[derive(Clone)]
@@ -185,10 +189,10 @@ impl ThreadScopedOutgoingMessageSender {
.await
}
pub(crate) async fn send_response<T: Serialize>(
pub(crate) async fn send_response(
&self,
request_id: ConnectionRequestId,
response: T,
response: ClientResponsePayload,
) {
self.outgoing.send_response(request_id, response).await;
}
@@ -203,12 +207,18 @@ impl ThreadScopedOutgoingMessageSender {
}
impl OutgoingMessageSender {
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
pub(crate) fn new(
sender: mpsc::Sender<OutgoingEnvelope>,
analytics_events_client: AnalyticsEventsClient,
general_analytics_enabled: bool,
) -> Self {
Self {
next_server_request_id: AtomicI64::new(0),
sender,
request_id_to_callback: Mutex::new(HashMap::new()),
request_contexts: Mutex::new(HashMap::new()),
analytics_events_client,
general_analytics_enabled,
}
}
@@ -469,21 +479,37 @@ impl OutgoingMessageSender {
}
}
pub(crate) async fn send_response<T: Serialize>(
pub(crate) async fn send_response(
&self,
request_id: ConnectionRequestId,
response: T,
response: ClientResponsePayload,
) {
let connection_id = request_id.connection_id;
let request_id_for_response = request_id.request_id.clone();
let serialized = if self.general_analytics_enabled {
response
.into_jsonrpc_parts_and_payload(request_id_for_response.clone())
.map(|(id, result, response)| {
if let Some(response) = response {
self.analytics_events_client.track_response_payload(
connection_id.0,
request_id_for_response,
response,
);
}
(id, result)
})
} else {
response.into_jsonrpc_parts(request_id_for_response)
};
let request_context = self.take_request_context(&request_id).await;
match serde_json::to_value(response) {
Ok(result) => {
let outgoing_message = OutgoingMessage::Response(OutgoingResponse {
id: request_id.request_id.clone(),
result,
});
match serialized {
Ok((id, result)) => {
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
self.send_outgoing_message_to_connection(
request_context,
request_id.connection_id,
connection_id,
outgoing_message,
"response",
)
@@ -843,14 +869,23 @@ mod tests {
#[tokio::test]
async fn send_response_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
};
outgoing
.send_response(request_id.clone(), json!({ "ok": true }))
.send_response(
request_id.clone(),
ClientResponsePayload::ThreadArchive(
codex_app_server_protocol::ThreadArchiveResponse {},
),
)
.await;
let envelope = timeout(Duration::from_secs(1), rx.recv())
@@ -869,7 +904,7 @@ mod tests {
panic!("expected response message");
};
assert_eq!(response.id, request_id.request_id);
assert_eq!(response.result, json!({ "ok": true }));
assert_eq!(response.result, json!({}));
}
other => panic!("expected targeted response envelope, got: {other:?}"),
}
@@ -878,7 +913,11 @@ mod tests {
#[tokio::test]
async fn send_response_clears_registered_request_context() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
@@ -894,7 +933,12 @@ mod tests {
assert_eq!(outgoing.request_context_count().await, 1);
outgoing
.send_response(request_id, json!({ "ok": true }))
.send_response(
request_id,
ClientResponsePayload::ThreadArchive(
codex_app_server_protocol::ThreadArchiveResponse {},
),
)
.await;
assert_eq!(outgoing.request_context_count().await, 0);
@@ -903,7 +947,11 @@ mod tests {
#[tokio::test]
async fn send_error_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
@@ -941,7 +989,11 @@ mod tests {
#[tokio::test]
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
);
let send_task = tokio::spawn(async move {
outgoing
.send_server_notification_to_connection_and_wait(
@@ -985,7 +1037,11 @@ mod tests {
#[tokio::test]
async fn connection_closed_clears_registered_request_contexts() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
);
let closed_connection_request = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
@@ -1019,7 +1075,11 @@ mod tests {
#[tokio::test]
async fn notify_client_error_forwards_error_to_waiter() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
);
let (request_id, wait_for_result) = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(
@@ -1053,7 +1113,11 @@ mod tests {
#[tokio::test]
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
@@ -1111,7 +1175,11 @@ mod tests {
#[tokio::test]
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),

View File

@@ -722,6 +722,8 @@ mod tests {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
)));
manager
@@ -764,6 +766,8 @@ mod tests {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
codex_analytics::AnalyticsEventsClient::disabled(),
false,
)));
manager