mirror of
https://github.com/openai/codex.git
synced 2026-06-01 19:02:59 +00:00
[app-server] type client response payloads (#20050)
## Why `pr17088` adds typed server-originated request/response plumbing, but successful client responses are still erased into bare JSON-RPC `result` values before app-server can make any typed decision about them. This precursor PR keeps successful client responses typed until the outgoing response seam. It is intentionally limited to protocol/app-server plumbing so the analytics behavior change can review separately on top. ## What changed - Add `ClientResponsePayload` as the pre-serialization client response body type. - Route app-server successful response paths through the typed payload seam while preserving existing handler-local analytics behavior. - Keep `InterruptConversation` JSON-RPC-only because it has no `ClientResponse` variant. - Move the new payload conversion tests into a dedicated protocol test module. ## Verification - `cargo check -p codex-app-server` - `cargo test -p codex-app-server-protocol`
This commit is contained in:
@@ -43,6 +43,7 @@ use codex_app_server_protocol::CancelLoginAccountResponse;
|
||||
use codex_app_server_protocol::CancelLoginAccountStatus;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::CollaborationModeListParams;
|
||||
use codex_app_server_protocol::CollaborationModeListResponse;
|
||||
@@ -2118,7 +2119,7 @@ impl CodexMessageProcessor {
|
||||
let result = self
|
||||
.exec_one_off_command_inner(request_id.clone(), params)
|
||||
.await
|
||||
.map(|()| None::<serde_json::Value>);
|
||||
.map(|()| None::<ClientResponsePayload>);
|
||||
self.send_optional_result(request_id, result).await;
|
||||
}
|
||||
|
||||
@@ -2864,7 +2865,6 @@ impl CodexMessageProcessor {
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
listener_task_context
|
||||
.outgoing
|
||||
.send_response(request_id, response)
|
||||
@@ -3544,7 +3544,7 @@ impl CodexMessageProcessor {
|
||||
let result = self
|
||||
.thread_rollback_start(&request_id, params)
|
||||
.await
|
||||
.map(|()| None::<serde_json::Value>);
|
||||
.map(|()| None::<ClientResponsePayload>);
|
||||
self.send_optional_result(request_id, result).await;
|
||||
}
|
||||
|
||||
@@ -4401,6 +4401,7 @@ impl CodexMessageProcessor {
|
||||
permission_profile,
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
};
|
||||
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::ThreadResume {
|
||||
@@ -4408,7 +4409,6 @@ impl CodexMessageProcessor {
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = include_turns.then(|| response.thread.clone());
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
@@ -5027,7 +5027,6 @@ impl CodexMessageProcessor {
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = include_turns.then(|| response.thread.clone());
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
@@ -5811,7 +5810,7 @@ impl CodexMessageProcessor {
|
||||
request_id: ConnectionRequestId,
|
||||
result: Result<Option<T>, JSONRPCErrorError>,
|
||||
) where
|
||||
T: serde::Serialize,
|
||||
T: Into<ClientResponsePayload>,
|
||||
{
|
||||
match result {
|
||||
Ok(Some(response)) => self.outgoing.send_response(request_id, response).await,
|
||||
|
||||
@@ -37,6 +37,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigValueWriteParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
@@ -732,15 +733,11 @@ impl MessageProcessor {
|
||||
return Err(invalid_request(experimental_required_message(reason)));
|
||||
}
|
||||
let connection_id = connection_request_id.connection_id;
|
||||
if let ClientRequest::TurnStart { request_id, .. }
|
||||
| ClientRequest::TurnSteer { request_id, .. } = &codex_request
|
||||
{
|
||||
self.analytics_events_client.track_request(
|
||||
connection_id.0,
|
||||
request_id.clone(),
|
||||
codex_request.clone(),
|
||||
);
|
||||
}
|
||||
self.analytics_events_client.track_request(
|
||||
connection_id.0,
|
||||
connection_request_id.request_id.clone(),
|
||||
&codex_request,
|
||||
);
|
||||
|
||||
let serialization_scope = codex_request.serialization_scope();
|
||||
let app_server_client_name = session.app_server_client_name().map(str::to_string);
|
||||
@@ -992,7 +989,12 @@ impl MessageProcessor {
|
||||
params: ConfigValueWriteParams,
|
||||
) {
|
||||
let result = self.config_api.write_value(params).await;
|
||||
self.handle_config_mutation_result(request_id, result).await
|
||||
self.handle_config_mutation_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ConfigValueWrite,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_config_batch_write(
|
||||
@@ -1001,7 +1003,12 @@ impl MessageProcessor {
|
||||
params: ConfigBatchWriteParams,
|
||||
) {
|
||||
let result = self.config_api.batch_write(params).await;
|
||||
self.handle_config_mutation_result(request_id, result).await;
|
||||
self.handle_config_mutation_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ConfigBatchWrite,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_experimental_feature_enablement_set(
|
||||
@@ -1015,7 +1022,12 @@ impl MessageProcessor {
|
||||
.set_experimental_feature_enablement(params)
|
||||
.await;
|
||||
let is_ok = result.is_ok();
|
||||
self.handle_config_mutation_result(request_id, result).await;
|
||||
self.handle_config_mutation_result(
|
||||
request_id,
|
||||
result,
|
||||
ClientResponsePayload::ExperimentalFeatureEnablementSet,
|
||||
)
|
||||
.await;
|
||||
if should_refresh_apps_list && is_ok {
|
||||
self.refresh_apps_list_after_experimental_feature_enablement_set()
|
||||
.await;
|
||||
@@ -1091,15 +1103,18 @@ impl MessageProcessor {
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_config_mutation_result<T: serde::Serialize>(
|
||||
async fn handle_config_mutation_result<T>(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
result: std::result::Result<T, JSONRPCErrorError>,
|
||||
wrap_success: impl FnOnce(T) -> ClientResponsePayload,
|
||||
) {
|
||||
match result {
|
||||
Ok(response) => {
|
||||
self.handle_config_mutation().await;
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response_as(request_id, wrap_success(response))
|
||||
.await;
|
||||
}
|
||||
Err(error) => self.outgoing.send_error(request_id, error).await,
|
||||
}
|
||||
@@ -1177,7 +1192,7 @@ impl MessageProcessor {
|
||||
device_key_requests_allowed: bool,
|
||||
run_request: F,
|
||||
) where
|
||||
R: serde::Serialize + Send + 'static,
|
||||
R: Into<ClientResponsePayload> + Send + 'static,
|
||||
F: FnOnce(DeviceKeyApi) -> Fut + Send + 'static,
|
||||
Fut: Future<Output = Result<R, JSONRPCErrorError>> + Send + 'static,
|
||||
{
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
@@ -188,11 +189,10 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response<T: Serialize>(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
response: T,
|
||||
) {
|
||||
pub(crate) async fn send_response<T>(&self, request_id: ConnectionRequestId, response: T)
|
||||
where
|
||||
T: Into<ClientResponsePayload>,
|
||||
{
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
@@ -482,21 +482,28 @@ impl OutgoingMessageSender {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response<T: Serialize>(
|
||||
pub(crate) async fn send_response<T>(&self, request_id: ConnectionRequestId, response: T)
|
||||
where
|
||||
T: Into<ClientResponsePayload>,
|
||||
{
|
||||
self.send_response_as(request_id, response.into()).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_response_as(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
response: T,
|
||||
response: ClientResponsePayload,
|
||||
) {
|
||||
let connection_id = request_id.connection_id;
|
||||
let serialized_response = response.into_jsonrpc_parts(request_id.request_id.clone());
|
||||
let request_context = self.take_request_context(&request_id).await;
|
||||
match serde_json::to_value(response) {
|
||||
Ok(result) => {
|
||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse {
|
||||
id: request_id.request_id.clone(),
|
||||
result,
|
||||
});
|
||||
|
||||
match serialized_response {
|
||||
Ok((id, result)) => {
|
||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
|
||||
self.send_outgoing_message_to_connection(
|
||||
request_context,
|
||||
request_id.connection_id,
|
||||
connection_id,
|
||||
outgoing_message,
|
||||
"response",
|
||||
)
|
||||
@@ -592,11 +599,13 @@ impl OutgoingMessageSender {
|
||||
request_id: ConnectionRequestId,
|
||||
result: std::result::Result<T, E>,
|
||||
) where
|
||||
T: Serialize,
|
||||
T: Into<ClientResponsePayload>,
|
||||
E: Into<JSONRPCErrorError>,
|
||||
{
|
||||
match result {
|
||||
Ok(response) => self.send_response(request_id, response).await,
|
||||
Ok(response) => {
|
||||
self.send_response(request_id, response).await;
|
||||
}
|
||||
Err(error) => self.send_error(request_id, error).await,
|
||||
}
|
||||
}
|
||||
@@ -966,7 +975,12 @@ mod tests {
|
||||
};
|
||||
|
||||
outgoing
|
||||
.send_response(request_id.clone(), json!({ "ok": true }))
|
||||
.send_response(
|
||||
request_id.clone(),
|
||||
ClientResponsePayload::ThreadArchive(
|
||||
codex_app_server_protocol::ThreadArchiveResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let envelope = timeout(Duration::from_secs(1), rx.recv())
|
||||
@@ -985,7 +999,7 @@ mod tests {
|
||||
panic!("expected response message");
|
||||
};
|
||||
assert_eq!(response.id, request_id.request_id);
|
||||
assert_eq!(response.result, json!({ "ok": true }));
|
||||
assert_eq!(response.result, json!({}));
|
||||
}
|
||||
other => panic!("expected targeted response envelope, got: {other:?}"),
|
||||
}
|
||||
@@ -1011,7 +1025,12 @@ mod tests {
|
||||
assert_eq!(outgoing.request_context_count().await, 1);
|
||||
|
||||
outgoing
|
||||
.send_response(request_id, json!({ "ok": true }))
|
||||
.send_response(
|
||||
request_id,
|
||||
ClientResponsePayload::ThreadArchive(
|
||||
codex_app_server_protocol::ThreadArchiveResponse {},
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(outgoing.request_context_count().await, 0);
|
||||
|
||||
Reference in New Issue
Block a user