mirror of
https://github.com/openai/codex.git
synced 2026-05-07 21:06:39 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
746e6df696 |
@@ -70,6 +70,8 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
@@ -943,6 +945,64 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_client_requests_are_ignored_by_reducer() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(ClientRequest::ThreadArchive {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: ThreadArchiveParams {
|
||||
thread_id: "thread-2".to_string(),
|
||||
},
|
||||
}),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
events.is_empty(),
|
||||
"unrelated requests must not create pending turn state"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_client_responses_are_ignored_by_reducer() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
ingest_initialize(&mut reducer, &mut events).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(ClientResponse::ThreadArchive {
|
||||
request_id: RequestId::Integer(9),
|
||||
response: ThreadArchiveResponse {},
|
||||
}),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(events.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compaction_event_ingests_custom_fact() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
|
||||
@@ -22,6 +22,7 @@ use crate::facts::TurnTokenUsageFact;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -123,6 +124,18 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disabled() -> Self {
|
||||
let (sender, _receiver) = mpsc::channel(1);
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue {
|
||||
sender,
|
||||
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
},
|
||||
analytics_enabled: Some(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
@@ -276,6 +289,19 @@ impl AnalyticsEventsClient {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_response_payload(
|
||||
&self,
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
response: Box<ClientResponsePayload>,
|
||||
) {
|
||||
self.record_fact(AnalyticsFact::ResponsePayload {
|
||||
connection_id,
|
||||
request_id,
|
||||
response,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_error_response(
|
||||
&self,
|
||||
connection_id: u64,
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::GuardianReviewEventParams;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -280,6 +281,11 @@ pub(crate) enum AnalyticsFact {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
ResponsePayload {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
response: Box<ClientResponsePayload>,
|
||||
},
|
||||
ErrorResponse {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
|
||||
@@ -184,6 +184,15 @@ impl AnalyticsReducer {
|
||||
} => {
|
||||
self.ingest_response(connection_id, *response, out);
|
||||
}
|
||||
AnalyticsFact::ResponsePayload {
|
||||
connection_id,
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
if let Some(response) = response.into_client_response(request_id) {
|
||||
self.ingest_response(connection_id, response, out);
|
||||
}
|
||||
}
|
||||
AnalyticsFact::ErrorResponse {
|
||||
connection_id,
|
||||
request_id,
|
||||
|
||||
@@ -153,6 +153,108 @@ macro_rules! client_request_definitions {
|
||||
})
|
||||
.unwrap_or_else(|| "<unknown>".to_string())
|
||||
}
|
||||
|
||||
pub fn into_jsonrpc_parts(
|
||||
self,
|
||||
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant { request_id, response } => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result))
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum ClientResponsePayload {
|
||||
$( $variant($response), )*
|
||||
InterruptConversation(v1::InterruptConversationResponse),
|
||||
}
|
||||
|
||||
impl ClientResponsePayload {
|
||||
pub fn into_jsonrpc_parts_and_payload(
|
||||
self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<
|
||||
(RequestId, crate::Result, Option<Box<ClientResponsePayload>>),
|
||||
serde_json::Error,
|
||||
> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
let result = serde_json::to_value(&response)?;
|
||||
Ok((request_id, result, Some(Box::new(Self::$variant(response)))))
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_client_response(self, request_id: RequestId) -> Option<ClientResponse> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
Some(ClientResponse::$variant {
|
||||
request_id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_jsonrpc_parts_and_client_response(
|
||||
self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<
|
||||
(RequestId, crate::Result, Option<ClientResponse>),
|
||||
serde_json::Error,
|
||||
> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
let result = serde_json::to_value(&response)?;
|
||||
let client_response = ClientResponse::$variant {
|
||||
request_id: request_id.clone(),
|
||||
response,
|
||||
};
|
||||
Ok((request_id, result, Some(client_response)))
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_jsonrpc_parts(
|
||||
self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
|
||||
self.to_jsonrpc_parts(request_id)
|
||||
}
|
||||
|
||||
pub fn to_jsonrpc_parts(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
|
||||
match self {
|
||||
$(
|
||||
Self::$variant(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result))
|
||||
}
|
||||
)*
|
||||
Self::InterruptConversation(response) => {
|
||||
serde_json::to_value(response).map(|result| (request_id, result))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::experimental_api::ExperimentalApi for ClientRequest {
|
||||
@@ -1098,6 +1200,7 @@ mod tests {
|
||||
use codex_protocol::protocol::RealtimeConversationVersion;
|
||||
use codex_protocol::protocol::RealtimeOutputModality;
|
||||
use codex_protocol::protocol::RealtimeVoice;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_absolute_path::test_support::PathBufExt;
|
||||
use codex_utils_absolute_path::test_support::test_path_buf;
|
||||
@@ -1510,6 +1613,45 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn client_response_payload_returns_jsonrpc_parts_and_client_response() -> Result<()> {
|
||||
let (request_id, result, client_response) =
|
||||
ClientResponsePayload::ThreadArchive(v2::ThreadArchiveResponse {})
|
||||
.into_jsonrpc_parts_and_client_response(RequestId::Integer(7))?;
|
||||
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
assert_eq!(result, json!({}));
|
||||
|
||||
let Some(ClientResponse::ThreadArchive {
|
||||
request_id,
|
||||
response: _,
|
||||
}) = client_response
|
||||
else {
|
||||
panic!("expected thread/archive client response");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interrupt_conversation_payload_stays_jsonrpc_only() -> Result<()> {
|
||||
let (request_id, result, client_response) =
|
||||
ClientResponsePayload::InterruptConversation(v1::InterruptConversationResponse {
|
||||
abort_reason: TurnAbortReason::Interrupted,
|
||||
})
|
||||
.into_jsonrpc_parts_and_client_response(RequestId::Integer(8))?;
|
||||
|
||||
assert_eq!(request_id, RequestId::Integer(8));
|
||||
assert_eq!(
|
||||
result,
|
||||
json!({
|
||||
"abortReason": "interrupted",
|
||||
})
|
||||
);
|
||||
assert!(client_response.is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_config_requirements_read() -> Result<()> {
|
||||
let request = ClientRequest::ConfigRequirementsRead {
|
||||
|
||||
16
codex-rs/app-server/src/analytics.rs
Normal file
16
codex-rs/app-server/src/analytics.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_core::config::Config;
|
||||
use codex_login::AuthManager;
|
||||
|
||||
pub(crate) fn analytics_events_client_from_config(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: &Config,
|
||||
) -> AnalyticsEventsClient {
|
||||
AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
)
|
||||
}
|
||||
@@ -18,6 +18,7 @@ use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermis
|
||||
use codex_app_server_protocol::AgentMessageDeltaNotification;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo;
|
||||
use codex_app_server_protocol::CollabAgentState as V2CollabAgentStatus;
|
||||
use codex_app_server_protocol::CollabAgentTool;
|
||||
@@ -1826,11 +1827,18 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
let response = InterruptConversationResponse {
|
||||
abort_reason: turn_aborted_event.reason.clone(),
|
||||
};
|
||||
outgoing.send_response(rid, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
rid,
|
||||
ClientResponsePayload::InterruptConversation(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ApiVersion::V2 => {
|
||||
let response = TurnInterruptResponse {};
|
||||
outgoing.send_response(rid, response).await;
|
||||
outgoing
|
||||
.send_response(rid, ClientResponsePayload::TurnInterrupt(response))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1920,7 +1928,9 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
}
|
||||
};
|
||||
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadRollback(response))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
EventMsg::ThreadNameUpdated(thread_name_event) => {
|
||||
@@ -3349,7 +3359,11 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3418,7 +3432,11 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3508,7 +3526,11 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
let thread_watch_manager = ThreadWatchManager::new();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4079,7 +4101,11 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let event_turn_id = "complete1".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4143,7 +4169,11 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4191,7 +4221,11 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4233,7 +4267,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4287,7 +4325,11 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-123".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4376,7 +4418,11 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-456".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4449,7 +4495,11 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4711,7 +4761,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4749,7 +4803,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4775,7 +4833,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let conversation_id = ThreadId::new();
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
|
||||
@@ -42,7 +42,7 @@ use codex_app_server_protocol::CancelLoginAccountParams;
|
||||
use codex_app_server_protocol::CancelLoginAccountResponse;
|
||||
use codex_app_server_protocol::CancelLoginAccountStatus;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::CollaborationModeListParams;
|
||||
use codex_app_server_protocol::CollaborationModeListResponse;
|
||||
@@ -1278,7 +1278,9 @@ impl CodexMessageProcessor {
|
||||
match self.login_api_key_common(¶ms).await {
|
||||
Ok(()) => {
|
||||
let response = codex_app_server_protocol::LoginAccountResponse::ApiKey {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::LoginAccount(response))
|
||||
.await;
|
||||
|
||||
let payload_login_completed = AccountLoginCompletedNotification {
|
||||
login_id: None,
|
||||
@@ -1447,7 +1449,9 @@ impl CodexMessageProcessor {
|
||||
login_id: login_id.to_string(),
|
||||
auth_url,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::LoginAccount(response))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -1490,7 +1494,9 @@ impl CodexMessageProcessor {
|
||||
verification_url,
|
||||
user_code,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::LoginAccount(response))
|
||||
.await;
|
||||
|
||||
let outgoing_clone = self.outgoing.clone();
|
||||
let active_login = self.active_login.clone();
|
||||
@@ -1588,7 +1594,12 @@ impl CodexMessageProcessor {
|
||||
Err(CancelLoginError::NotFound) => CancelLoginAccountStatus::NotFound,
|
||||
};
|
||||
let response = CancelLoginAccountResponse { status };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::CancelLoginAccount(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(_) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -1668,7 +1679,10 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, LoginAccountResponse::ChatgptAuthTokens {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::LoginAccount(LoginAccountResponse::ChatgptAuthTokens {}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload_login_completed = AccountLoginCompletedNotification {
|
||||
@@ -1721,7 +1735,10 @@ impl CodexMessageProcessor {
|
||||
match self.logout_common().await {
|
||||
Ok(current_auth_method) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, LogoutAccountResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::LogoutAccount(LogoutAccountResponse {}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload_v2 = AccountUpdatedNotification {
|
||||
@@ -1811,7 +1828,9 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::GetAuthStatus(response))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_account(&self, request_id: ConnectionRequestId, params: GetAccountParams) {
|
||||
@@ -1827,7 +1846,9 @@ impl CodexMessageProcessor {
|
||||
account: None,
|
||||
requires_openai_auth,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::GetAccount(response))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1863,7 +1884,9 @@ impl CodexMessageProcessor {
|
||||
account,
|
||||
requires_openai_auth,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::GetAccount(response))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_account_rate_limits(&self, request_id: ConnectionRequestId) {
|
||||
@@ -1878,7 +1901,12 @@ impl CodexMessageProcessor {
|
||||
.collect(),
|
||||
),
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::GetAccountRateLimits(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -1894,7 +1922,12 @@ impl CodexMessageProcessor {
|
||||
match self.send_add_credits_nudge_email_inner(params).await {
|
||||
Ok(status) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, SendAddCreditsNudgeEmailResponse { status })
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::SendAddCreditsNudgeEmail(
|
||||
SendAddCreditsNudgeEmailResponse { status },
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
@@ -2283,7 +2316,14 @@ impl CodexMessageProcessor {
|
||||
.write(request_id.clone(), params)
|
||||
.await
|
||||
{
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::CommandExecWrite(response),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -2298,7 +2338,14 @@ impl CodexMessageProcessor {
|
||||
.resize(request_id.clone(), params)
|
||||
.await
|
||||
{
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::CommandExecResize(response),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -2313,7 +2360,14 @@ impl CodexMessageProcessor {
|
||||
.terminate(request_id.clone(), params)
|
||||
.await
|
||||
{
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::CommandExecTerminate(response),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -2699,21 +2753,9 @@ impl CodexMessageProcessor {
|
||||
sandbox: config_snapshot.sandbox_policy.into(),
|
||||
reasoning_effort: config_snapshot.reasoning_effort,
|
||||
};
|
||||
if listener_task_context.general_analytics_enabled {
|
||||
listener_task_context
|
||||
.analytics_events_client
|
||||
.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadStart {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
listener_task_context
|
||||
.outgoing
|
||||
.send_response(request_id, response)
|
||||
.send_response(request_id, ClientResponsePayload::ThreadStart(response))
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.send_response",
|
||||
otel.name = "app_server.thread_start.send_response",
|
||||
@@ -2868,7 +2910,10 @@ impl CodexMessageProcessor {
|
||||
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
|
||||
else {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
@@ -2913,7 +2958,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
|
||||
)
|
||||
.await;
|
||||
for thread_id in archived_thread_ids {
|
||||
let notification = ThreadArchivedNotification { thread_id };
|
||||
@@ -2941,10 +2989,12 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadIncrementElicitationResponse {
|
||||
count,
|
||||
paused: count > 0,
|
||||
},
|
||||
ClientResponsePayload::ThreadIncrementElicitation(
|
||||
ThreadIncrementElicitationResponse {
|
||||
count,
|
||||
paused: count > 0,
|
||||
},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -2976,10 +3026,12 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadDecrementElicitationResponse {
|
||||
count,
|
||||
paused: count > 0,
|
||||
},
|
||||
ClientResponsePayload::ThreadDecrementElicitation(
|
||||
ThreadDecrementElicitationResponse {
|
||||
count,
|
||||
paused: count > 0,
|
||||
},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -3026,7 +3078,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadSetNameResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadSetName(ThreadSetNameResponse {}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
@@ -3050,7 +3105,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadSetNameResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadSetName(ThreadSetNameResponse {}),
|
||||
)
|
||||
.await;
|
||||
let notification = ThreadNameUpdatedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
@@ -3096,7 +3154,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadMemoryModeSetResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadMemoryModeSet(ThreadMemoryModeSetResponse {}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
@@ -3123,7 +3184,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadMemoryModeSetResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadMemoryModeSet(ThreadMemoryModeSetResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -3167,7 +3231,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, MemoryResetResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::MemoryReset(MemoryResetResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -3333,7 +3400,12 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadMetadataUpdateResponse { thread })
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadMetadataUpdate(ThreadMetadataUpdateResponse {
|
||||
thread,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -3514,7 +3586,9 @@ impl CodexMessageProcessor {
|
||||
self.attach_thread_name(thread_id, &mut thread).await;
|
||||
let thread_id = thread.id.clone();
|
||||
let response = ThreadUnarchiveResponse { thread };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadUnarchive(response))
|
||||
.await;
|
||||
let notification = ThreadUnarchivedNotification { thread_id };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadUnarchived(notification))
|
||||
@@ -3606,7 +3680,10 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadCompactStartResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadCompactStart(ThreadCompactStartResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -3637,7 +3714,12 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadBackgroundTerminalsCleanResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadBackgroundTerminalsClean(
|
||||
ThreadBackgroundTerminalsCleanResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -3689,7 +3771,10 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadShellCommandResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadShellCommand(ThreadShellCommandResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -3793,7 +3878,9 @@ impl CodexMessageProcessor {
|
||||
next_cursor,
|
||||
backwards_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadList(response))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_loaded_list(
|
||||
@@ -3815,7 +3902,12 @@ impl CodexMessageProcessor {
|
||||
data,
|
||||
next_cursor: None,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadLoadedList(response),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3852,7 +3944,12 @@ impl CodexMessageProcessor {
|
||||
data: page,
|
||||
next_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadLoadedList(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
|
||||
@@ -3882,7 +3979,9 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
let response = ThreadReadResponse { thread };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadRead(response))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
|
||||
@@ -4162,7 +4261,9 @@ impl CodexMessageProcessor {
|
||||
next_cursor: page.next_cursor,
|
||||
backwards_cursor: page.backwards_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadTurnsList(response))
|
||||
.await;
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
self.send_invalid_request_error(
|
||||
@@ -4432,23 +4533,15 @@ impl CodexMessageProcessor {
|
||||
sandbox: session_configured.sandbox_policy.into(),
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
};
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics) {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadResume {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = response.thread.clone();
|
||||
let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items(
|
||||
&response_history.get_rollout_items(),
|
||||
&token_usage_thread,
|
||||
);
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadResume(response))
|
||||
.await;
|
||||
// The client needs restored usage before it starts another turn.
|
||||
// Sending after the response preserves JSON-RPC request ordering while
|
||||
// still filling the status line before the next turn lifecycle begins.
|
||||
@@ -5077,16 +5170,6 @@ impl CodexMessageProcessor {
|
||||
sandbox: session_configured.sandbox_policy.into(),
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
};
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics) {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadFork {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = response.thread.clone();
|
||||
let token_usage_turn_id = if let Some(turn_id) =
|
||||
@@ -5100,7 +5183,9 @@ impl CodexMessageProcessor {
|
||||
)
|
||||
.await
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadFork(response))
|
||||
.await;
|
||||
// Mirror the resume contract for forks: the new thread is usable as soon
|
||||
// as the response arrives, so restored usage must follow immediately.
|
||||
send_thread_token_usage_update_to_connection(
|
||||
@@ -5177,7 +5262,12 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
};
|
||||
let response = GetConversationSummaryResponse { summary };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::GetConversationSummary(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -5311,7 +5401,9 @@ impl CodexMessageProcessor {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
};
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ModelList(response))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -5354,7 +5446,9 @@ impl CodexMessageProcessor {
|
||||
data: items,
|
||||
next_cursor,
|
||||
};
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ModelList(response))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn list_collaboration_modes(
|
||||
@@ -5370,7 +5464,12 @@ impl CodexMessageProcessor {
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
let response = CollaborationModeListResponse { data: items };
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::CollaborationModeList(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn experimental_feature_list(
|
||||
@@ -5431,10 +5530,12 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ExperimentalFeatureListResponse {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
},
|
||||
ClientResponsePayload::ExperimentalFeatureList(
|
||||
ExperimentalFeatureListResponse {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -5478,7 +5579,10 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ExperimentalFeatureListResponse { data, next_cursor },
|
||||
ClientResponsePayload::ExperimentalFeatureList(ExperimentalFeatureListResponse {
|
||||
data,
|
||||
next_cursor,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -5490,7 +5594,12 @@ impl CodexMessageProcessor {
|
||||
) {
|
||||
let MockExperimentalMethodParams { value } = params;
|
||||
let response = MockExperimentalMethodResponse { echoed: value };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::MockExperimentalMethod(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
|
||||
@@ -5508,7 +5617,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
let response = McpServerRefreshResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::McpServerRefresh(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn queue_mcp_server_refresh_for_config(
|
||||
@@ -5654,7 +5768,12 @@ impl CodexMessageProcessor {
|
||||
});
|
||||
|
||||
let response = McpServerOauthLoginResponse { authorization_url };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::McpServerOauthLogin(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -5813,7 +5932,12 @@ impl CodexMessageProcessor {
|
||||
|
||||
let response = ListMcpServerStatusResponse { data, next_cursor };
|
||||
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::McpServerStatusList(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn read_mcp_resource(
|
||||
@@ -5890,7 +6014,9 @@ impl CodexMessageProcessor {
|
||||
match result {
|
||||
Ok(result) => match serde_json::from_value::<McpResourceReadResponse>(result) {
|
||||
Ok(response) => {
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(request_id, ClientResponsePayload::McpResourceRead(response))
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
outgoing
|
||||
@@ -5945,7 +6071,12 @@ impl CodexMessageProcessor {
|
||||
match result {
|
||||
Ok(result) => {
|
||||
outgoing
|
||||
.send_response(request_id, McpServerToolCallResponse::from(result))
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::McpServerToolCall(
|
||||
McpServerToolCallResponse::from(result),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(error) => {
|
||||
@@ -6125,9 +6256,9 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadUnsubscribeResponse {
|
||||
ClientResponsePayload::ThreadUnsubscribe(ThreadUnsubscribeResponse {
|
||||
status: ThreadUnsubscribeStatus::NotLoaded,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -6144,7 +6275,10 @@ impl CodexMessageProcessor {
|
||||
ThreadUnsubscribeStatus::NotSubscribed
|
||||
};
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadUnsubscribeResponse { status })
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadUnsubscribe(ThreadUnsubscribeResponse { status }),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -6199,10 +6333,10 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
AppsListResponse {
|
||||
ClientResponsePayload::AppsList(AppsListResponse {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -6389,7 +6523,9 @@ impl CodexMessageProcessor {
|
||||
if accessible_loaded && all_loaded {
|
||||
match apps_list_helpers::paginate_apps(merged.as_slice(), start, limit) {
|
||||
Ok(response) => {
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(request_id, ClientResponsePayload::AppsList(response))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(error) => {
|
||||
@@ -6527,7 +6663,10 @@ impl CodexMessageProcessor {
|
||||
});
|
||||
}
|
||||
self.outgoing
|
||||
.send_response(request_id, SkillsListResponse { data })
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::SkillsList(SkillsListResponse { data }),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
async fn marketplace_remove(
|
||||
@@ -6548,10 +6687,10 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
MarketplaceRemoveResponse {
|
||||
ClientResponsePayload::MarketplaceRemove(MarketplaceRemoveResponse {
|
||||
marketplace_name: outcome.marketplace_name,
|
||||
installed_root: outcome.removed_installed_root,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -6579,11 +6718,11 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
MarketplaceAddResponse {
|
||||
ClientResponsePayload::MarketplaceAdd(MarketplaceAddResponse {
|
||||
marketplace_name: outcome.marketplace_name,
|
||||
installed_root: outcome.installed_root,
|
||||
already_added: outcome.already_added,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -6637,9 +6776,9 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
SkillsConfigWriteResponse {
|
||||
ClientResponsePayload::SkillsConfigWrite(SkillsConfigWriteResponse {
|
||||
effective_enabled: enabled,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -6779,16 +6918,9 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
let response = TurnStartResponse { turn };
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics) {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::TurnStart {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::TurnStart(response))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -6835,7 +6967,10 @@ impl CodexMessageProcessor {
|
||||
match thread.inject_response_items(items).await {
|
||||
Ok(()) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadInjectItemsResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadInjectItems(ThreadInjectItemsResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(CodexErr::InvalidRequest(message)) => {
|
||||
@@ -6913,16 +7048,9 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
Ok(turn_id) => {
|
||||
let response = TurnSteerResponse { turn_id };
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics) {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::TurnSteer {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::TurnSteer(response))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let (code, message, data, error_type) = match err {
|
||||
@@ -7075,7 +7203,12 @@ impl CodexMessageProcessor {
|
||||
match submit {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadRealtimeStartResponse::default())
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadRealtimeStart(
|
||||
ThreadRealtimeStartResponse::default(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -7113,7 +7246,12 @@ impl CodexMessageProcessor {
|
||||
match submit {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadRealtimeAppendAudioResponse::default())
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadRealtimeAppendAudio(
|
||||
ThreadRealtimeAppendAudioResponse::default(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -7149,7 +7287,12 @@ impl CodexMessageProcessor {
|
||||
match submit {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadRealtimeAppendTextResponse::default())
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadRealtimeAppendText(
|
||||
ThreadRealtimeAppendTextResponse::default(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -7181,7 +7324,12 @@ impl CodexMessageProcessor {
|
||||
match submit {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadRealtimeStopResponse::default())
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadRealtimeStop(
|
||||
ThreadRealtimeStopResponse::default(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -7202,9 +7350,9 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ThreadRealtimeListVoicesResponse {
|
||||
ClientResponsePayload::ThreadRealtimeListVoices(ThreadRealtimeListVoicesResponse {
|
||||
voices: RealtimeVoicesList::builtin(),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -7245,7 +7393,10 @@ impl CodexMessageProcessor {
|
||||
review_thread_id,
|
||||
};
|
||||
self.outgoing
|
||||
.send_response(request_id.clone(), response)
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
ClientResponsePayload::ReviewStart(response),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -7489,7 +7640,10 @@ impl CodexMessageProcessor {
|
||||
match submit_result {
|
||||
Ok(_) if is_startup_interrupt => {
|
||||
self.outgoing
|
||||
.send_response(request_id, TurnInterruptResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::TurnInterrupt(TurnInterruptResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(_) => {}
|
||||
@@ -7835,7 +7989,9 @@ impl CodexMessageProcessor {
|
||||
sha: value.sha,
|
||||
diff: value.diff,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::GitDiffToRemote(response))
|
||||
.await;
|
||||
}
|
||||
None => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -7889,7 +8045,9 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
let response = FuzzyFileSearchResponse { files: results };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FuzzyFileSearch(response))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_start(
|
||||
@@ -7917,7 +8075,12 @@ impl CodexMessageProcessor {
|
||||
.await
|
||||
.insert(session_id, session);
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::FuzzyFileSearchSessionStart(
|
||||
FuzzyFileSearchSessionStartResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -7957,7 +8120,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::FuzzyFileSearchSessionUpdate(
|
||||
FuzzyFileSearchSessionUpdateResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -7973,7 +8141,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::FuzzyFileSearchSessionStop(
|
||||
FuzzyFileSearchSessionStopResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -8150,7 +8323,9 @@ impl CodexMessageProcessor {
|
||||
match upload_result {
|
||||
Ok(()) => {
|
||||
let response = FeedbackUploadResponse { thread_id };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FeedbackUpload(response))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -8171,7 +8346,9 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
WindowsSandboxSetupStartResponse { started: true },
|
||||
ClientResponsePayload::WindowsSandboxSetupStart(WindowsSandboxSetupStartResponse {
|
||||
started: true,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -8475,7 +8652,9 @@ async fn handle_pending_thread_resume_request(
|
||||
&token_usage_thread,
|
||||
)
|
||||
.await;
|
||||
outgoing.send_response(request_id, response).await;
|
||||
outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ThreadResume(response))
|
||||
.await;
|
||||
// Rejoining a loaded thread has the same UI contract as a cold resume, but
|
||||
// uses the live conversation state instead of reconstructing a new session.
|
||||
send_thread_token_usage_update_to_connection(
|
||||
@@ -10619,7 +10798,11 @@ mod tests {
|
||||
let connection_id = ConnectionId(7);
|
||||
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
vec![connection_id],
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_plugin::validate_plugin_segment;
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
@@ -22,11 +23,11 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
PluginListResponse {
|
||||
ClientResponsePayload::PluginList(PluginListResponse {
|
||||
marketplaces: Vec::new(),
|
||||
marketplace_load_errors: Vec::new(),
|
||||
featured_plugin_ids: Vec::new(),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -161,11 +162,11 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
PluginListResponse {
|
||||
ClientResponsePayload::PluginList(PluginListResponse {
|
||||
marketplaces: data,
|
||||
marketplace_load_errors,
|
||||
featured_plugin_ids,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -338,7 +339,10 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, PluginReadResponse { plugin })
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::PluginRead(PluginReadResponse { plugin }),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -487,10 +491,10 @@ impl CodexMessageProcessor {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
PluginInstallResponse {
|
||||
ClientResponsePayload::PluginInstall(PluginInstallResponse {
|
||||
auth_policy: result.auth_policy.into(),
|
||||
apps_needing_auth,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -553,7 +557,10 @@ impl CodexMessageProcessor {
|
||||
Ok(()) => {
|
||||
self.clear_plugin_related_caches();
|
||||
self.outgoing
|
||||
.send_response(request_id, PluginUninstallResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::PluginUninstall(PluginUninstallResponse {}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::time::Duration;
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CommandExecOutputDeltaNotification;
|
||||
use codex_app_server_protocol::CommandExecOutputStream;
|
||||
use codex_app_server_protocol::CommandExecResizeParams;
|
||||
@@ -209,11 +210,11 @@ impl CommandExecManager {
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
CommandExecResponse {
|
||||
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
|
||||
exit_code: output.exit_code,
|
||||
stdout: output.stdout.text,
|
||||
stderr: output.stderr.text,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -554,11 +555,11 @@ async fn run_command(params: RunCommandParams) {
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
CommandExecResponse {
|
||||
ClientResponsePayload::OneOffCommandExec(CommandExecResponse {
|
||||
exit_code,
|
||||
stdout,
|
||||
stderr,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -757,7 +758,11 @@ mod tests {
|
||||
let manager = CommandExecManager::default();
|
||||
let err = manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
)),
|
||||
request_id: ConnectionRequestId {
|
||||
connection_id: ConnectionId(1),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(42),
|
||||
@@ -793,7 +798,11 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-99".to_string()),
|
||||
exec_request: windows_sandbox_exec_request(),
|
||||
@@ -843,7 +852,11 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-100".to_string()),
|
||||
exec_request: ExecRequest::new(
|
||||
|
||||
@@ -234,7 +234,11 @@ mod tests {
|
||||
const OUTGOING_BUFFER: usize = 1;
|
||||
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
|
||||
FsWatchManager::new_with_file_watcher(
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
)),
|
||||
Arc::new(FileWatcher::noop()),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_features::Feature;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -365,7 +366,18 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let analytics_events_client = crate::analytics::analytics_events_client_from_config(
|
||||
auth_manager.clone(),
|
||||
args.config.as_ref(),
|
||||
);
|
||||
let general_analytics_enabled = args.config.features.enabled(Feature::GeneralAnalytics);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
general_analytics_enabled,
|
||||
));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
@@ -390,8 +402,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let config_manager = ConfigManager::new(
|
||||
args.config.codex_home.to_path_buf(),
|
||||
args.cli_overrides,
|
||||
@@ -404,6 +414,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
analytics_events_client,
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
config_manager,
|
||||
|
||||
@@ -65,6 +65,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::Registry;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
mod analytics;
|
||||
mod app_server_tracing;
|
||||
mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
@@ -645,12 +646,20 @@ pub async fn run_main_with_transport(
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
let analytics_events_client =
|
||||
crate::analytics::analytics_events_client_from_config(auth_manager.clone(), &config);
|
||||
let general_analytics_enabled = config.features.enabled(Feature::GeneralAnalytics);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
general_analytics_enabled,
|
||||
));
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config: Arc::new(config),
|
||||
config_manager,
|
||||
|
||||
@@ -33,6 +33,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
use codex_app_server_protocol::ConfigValueWriteParams;
|
||||
@@ -249,6 +250,7 @@ impl ConnectionSessionState {
|
||||
|
||||
pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) arg0_paths: Arg0DispatchPaths,
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) config_manager: ConfigManager,
|
||||
@@ -268,6 +270,7 @@ impl MessageProcessor {
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config,
|
||||
config_manager,
|
||||
@@ -283,11 +286,6 @@ impl MessageProcessor {
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
}));
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
@@ -713,7 +711,10 @@ impl MessageProcessor {
|
||||
};
|
||||
|
||||
self.outgoing
|
||||
.send_response(connection_request_id, response)
|
||||
.send_response(
|
||||
connection_request_id,
|
||||
ClientResponsePayload::Initialize(response),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(outbound_initialized) = outbound_initialized {
|
||||
@@ -765,14 +766,10 @@ impl MessageProcessor {
|
||||
self.outgoing.send_error(connection_request_id, error).await;
|
||||
return;
|
||||
}
|
||||
let connection_id = connection_request_id.connection_id;
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics)
|
||||
&& let ClientRequest::TurnStart { request_id, .. }
|
||||
| ClientRequest::TurnSteer { request_id, .. } = &codex_request
|
||||
{
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics) {
|
||||
self.analytics_events_client.track_request(
|
||||
connection_id.0,
|
||||
request_id.clone(),
|
||||
connection_request_id.connection_id.0,
|
||||
connection_request_id.request_id.clone(),
|
||||
codex_request.clone(),
|
||||
);
|
||||
}
|
||||
@@ -1019,7 +1016,11 @@ impl MessageProcessor {
|
||||
|
||||
async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) {
|
||||
match self.config_api.read(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::ConfigRead(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1029,7 +1030,11 @@ impl MessageProcessor {
|
||||
request_id: ConnectionRequestId,
|
||||
params: ConfigValueWriteParams,
|
||||
) {
|
||||
let result = self.config_api.write_value(params).await;
|
||||
let result = self
|
||||
.config_api
|
||||
.write_value(params)
|
||||
.await
|
||||
.map(ClientResponsePayload::ConfigValueWrite);
|
||||
self.handle_config_mutation_result(request_id, result).await
|
||||
}
|
||||
|
||||
@@ -1038,7 +1043,11 @@ impl MessageProcessor {
|
||||
request_id: ConnectionRequestId,
|
||||
params: ConfigBatchWriteParams,
|
||||
) {
|
||||
let result = self.config_api.batch_write(params).await;
|
||||
let result = self
|
||||
.config_api
|
||||
.batch_write(params)
|
||||
.await
|
||||
.map(ClientResponsePayload::ConfigBatchWrite);
|
||||
self.handle_config_mutation_result(request_id, result).await;
|
||||
}
|
||||
|
||||
@@ -1051,7 +1060,8 @@ impl MessageProcessor {
|
||||
let result = self
|
||||
.config_api
|
||||
.set_experimental_feature_enablement(params)
|
||||
.await;
|
||||
.await
|
||||
.map(ClientResponsePayload::ExperimentalFeatureEnablementSet);
|
||||
let is_ok = result.is_ok();
|
||||
self.handle_config_mutation_result(request_id, result).await;
|
||||
if should_refresh_apps_list && is_ok {
|
||||
@@ -1129,10 +1139,10 @@ impl MessageProcessor {
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_config_mutation_result<T: serde::Serialize>(
|
||||
async fn handle_config_mutation_result(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
result: std::result::Result<T, JSONRPCErrorError>,
|
||||
result: std::result::Result<ClientResponsePayload, JSONRPCErrorError>,
|
||||
) {
|
||||
match result {
|
||||
Ok(response) => {
|
||||
@@ -1168,7 +1178,14 @@ impl MessageProcessor {
|
||||
|
||||
async fn handle_config_requirements_read(&self, request_id: ConnectionRequestId) {
|
||||
match self.config_api.config_requirements_read().await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ConfigRequirementsRead(response),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1191,7 +1208,11 @@ impl MessageProcessor {
|
||||
}
|
||||
|
||||
match self.device_key_api.create(params) {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::DeviceKeyCreate(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1214,7 +1235,11 @@ impl MessageProcessor {
|
||||
}
|
||||
|
||||
match self.device_key_api.public(params) {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::DeviceKeyPublic(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1237,7 +1262,11 @@ impl MessageProcessor {
|
||||
}
|
||||
|
||||
match self.device_key_api.sign(params) {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::DeviceKeySign(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1271,7 +1300,14 @@ impl MessageProcessor {
|
||||
params: ExternalAgentConfigDetectParams,
|
||||
) {
|
||||
match self.external_agent_config_api.detect(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ExternalAgentConfigDetect(response),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1293,7 +1329,12 @@ impl MessageProcessor {
|
||||
self.handle_config_mutation().await;
|
||||
}
|
||||
self.outgoing
|
||||
.send_response(request_id, ExternalAgentConfigImportResponse {})
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ExternalAgentConfigImport(
|
||||
ExternalAgentConfigImportResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
if !has_plugin_imports {
|
||||
@@ -1346,7 +1387,11 @@ impl MessageProcessor {
|
||||
|
||||
async fn handle_fs_read_file(&self, request_id: ConnectionRequestId, params: FsReadFileParams) {
|
||||
match self.fs_api.read_file(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsReadFile(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1357,7 +1402,11 @@ impl MessageProcessor {
|
||||
params: FsWriteFileParams,
|
||||
) {
|
||||
match self.fs_api.write_file(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsWriteFile(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1368,7 +1417,14 @@ impl MessageProcessor {
|
||||
params: FsCreateDirectoryParams,
|
||||
) {
|
||||
match self.fs_api.create_directory(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::FsCreateDirectory(response),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1379,7 +1435,11 @@ impl MessageProcessor {
|
||||
params: FsGetMetadataParams,
|
||||
) {
|
||||
match self.fs_api.get_metadata(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsGetMetadata(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1390,21 +1450,33 @@ impl MessageProcessor {
|
||||
params: FsReadDirectoryParams,
|
||||
) {
|
||||
match self.fs_api.read_directory(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsReadDirectory(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_fs_remove(&self, request_id: ConnectionRequestId, params: FsRemoveParams) {
|
||||
match self.fs_api.remove(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsRemove(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_fs_copy(&self, request_id: ConnectionRequestId, params: FsCopyParams) {
|
||||
match self.fs_api.copy(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsCopy(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1416,7 +1488,11 @@ impl MessageProcessor {
|
||||
params: FsWatchParams,
|
||||
) {
|
||||
match self.fs_watch_manager.watch(connection_id, params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsWatch(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -1428,7 +1504,11 @@ impl MessageProcessor {
|
||||
params: FsUnwatchParams,
|
||||
) {
|
||||
match self.fs_watch_manager.unwatch(connection_id, params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ClientResponsePayload::FsUnwatch(response))
|
||||
.await
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -263,7 +263,12 @@ fn build_test_processor(
|
||||
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let analytics_events_client = codex_analytics::AnalyticsEventsClient::disabled();
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
false,
|
||||
));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
|
||||
let config_manager = ConfigManager::new(
|
||||
@@ -276,6 +281,7 @@ fn build_test_processor(
|
||||
);
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config,
|
||||
config_manager,
|
||||
|
||||
@@ -4,6 +4,8 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
@@ -117,6 +119,8 @@ pub(crate) struct OutgoingMessageSender {
|
||||
/// We keep them here because this is where responses, errors, and
|
||||
/// disconnect cleanup all get handled.
|
||||
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
general_analytics_enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -185,10 +189,10 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response<T: Serialize>(
|
||||
pub(crate) async fn send_response(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
response: T,
|
||||
response: ClientResponsePayload,
|
||||
) {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
@@ -203,12 +207,18 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
}
|
||||
|
||||
impl OutgoingMessageSender {
|
||||
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
pub(crate) fn new(
|
||||
sender: mpsc::Sender<OutgoingEnvelope>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
general_analytics_enabled: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
next_server_request_id: AtomicI64::new(0),
|
||||
sender,
|
||||
request_id_to_callback: Mutex::new(HashMap::new()),
|
||||
request_contexts: Mutex::new(HashMap::new()),
|
||||
analytics_events_client,
|
||||
general_analytics_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -469,21 +479,37 @@ impl OutgoingMessageSender {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response<T: Serialize>(
|
||||
pub(crate) async fn send_response(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
response: T,
|
||||
response: ClientResponsePayload,
|
||||
) {
|
||||
let connection_id = request_id.connection_id;
|
||||
let request_id_for_response = request_id.request_id.clone();
|
||||
let serialized = if self.general_analytics_enabled {
|
||||
response
|
||||
.into_jsonrpc_parts_and_payload(request_id_for_response.clone())
|
||||
.map(|(id, result, response)| {
|
||||
if let Some(response) = response {
|
||||
self.analytics_events_client.track_response_payload(
|
||||
connection_id.0,
|
||||
request_id_for_response,
|
||||
response,
|
||||
);
|
||||
}
|
||||
(id, result)
|
||||
})
|
||||
} else {
|
||||
response.into_jsonrpc_parts(request_id_for_response)
|
||||
};
|
||||
let request_context = self.take_request_context(&request_id).await;
|
||||
match serde_json::to_value(response) {
|
||||
Ok(result) => {
|
||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse {
|
||||
id: request_id.request_id.clone(),
|
||||
result,
|
||||
});
|
||||
|
||||
match serialized {
|
||||
Ok((id, result)) => {
|
||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
|
||||
self.send_outgoing_message_to_connection(
|
||||
request_context,
|
||||
request_id.connection_id,
|
||||
connection_id,
|
||||
outgoing_message,
|
||||
"response",
|
||||
)
|
||||
@@ -843,14 +869,23 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
};
|
||||
|
||||
outgoing
|
||||
.send_response(request_id.clone(), json!({ "ok": true }))
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
ClientResponsePayload::ThreadArchive(
|
||||
codex_app_server_protocol::ThreadArchiveResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let envelope = timeout(Duration::from_secs(1), rx.recv())
|
||||
@@ -869,7 +904,7 @@ mod tests {
|
||||
panic!("expected response message");
|
||||
};
|
||||
assert_eq!(response.id, request_id.request_id);
|
||||
assert_eq!(response.result, json!({ "ok": true }));
|
||||
assert_eq!(response.result, json!({}));
|
||||
}
|
||||
other => panic!("expected targeted response envelope, got: {other:?}"),
|
||||
}
|
||||
@@ -878,7 +913,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_clears_registered_request_context() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -894,7 +933,12 @@ mod tests {
|
||||
assert_eq!(outgoing.request_context_count().await, 1);
|
||||
|
||||
outgoing
|
||||
.send_response(request_id, json!({ "ok": true }))
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadArchive(
|
||||
codex_app_server_protocol::ThreadArchiveResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(outgoing.request_context_count().await, 0);
|
||||
@@ -903,7 +947,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_error_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -941,7 +989,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
);
|
||||
let send_task = tokio::spawn(async move {
|
||||
outgoing
|
||||
.send_server_notification_to_connection_and_wait(
|
||||
@@ -985,7 +1037,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn connection_closed_clears_registered_request_contexts() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
);
|
||||
let closed_connection_request = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1019,7 +1075,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn notify_client_error_forwards_error_to_waiter() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
);
|
||||
|
||||
let (request_id, wait_for_result) = outgoing
|
||||
.send_request(ServerRequestPayload::ApplyPatchApproval(
|
||||
@@ -1053,7 +1113,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
@@ -1111,7 +1175,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
|
||||
@@ -722,6 +722,8 @@ mod tests {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
)));
|
||||
|
||||
manager
|
||||
@@ -764,6 +766,8 @@ mod tests {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
codex_analytics::AnalyticsEventsClient::disabled(),
|
||||
false,
|
||||
)));
|
||||
|
||||
manager
|
||||
|
||||
Reference in New Issue
Block a user