[codex] Add response.processed websocket request (#21284)

## Summary

- Add a `response.processed` websocket request payload and sender for
Responses API websockets.
- Send `response.processed` from `try_run_sampling_request` after a
response completes, local turn processing succeeds, and the
session-owned feature flag is enabled.
- Add websocket coverage for both enabled and disabled feature-flag
behavior.

## Validation

- `just fmt`
- `cargo test -p codex-core response_processed`
- `cargo test -p codex-api responses_websocket`
- `cargo test -p codex-features
responses_websocket_response_processed_is_under_development`
- `git diff --check`
- `just fix -p codex-api -p codex-core -p codex-features`
- `git diff --check origin/main...HEAD`
This commit is contained in:
pakrym-oai
2026-05-06 09:58:46 -07:00
committed by GitHub
parent 2004173cd7
commit 2070d5bfd3
9 changed files with 232 additions and 29 deletions

View File

@@ -239,6 +239,11 @@ pub struct ResponseCreateWsRequest {
pub client_metadata: Option<HashMap<String, String>>,
}
#[derive(Debug, Serialize)]
pub struct ResponseProcessedWsRequest {
pub response_id: String,
}
pub fn response_create_client_metadata(
client_metadata: Option<HashMap<String, String>>,
trace: Option<&W3cTraceContext>,
@@ -267,6 +272,8 @@ pub fn response_create_client_metadata(
pub enum ResponsesWsRequest {
#[serde(rename = "response.create")]
ResponseCreate(ResponseCreateWsRequest),
#[serde(rename = "response.processed")]
ResponseProcessed(ResponseProcessedWsRequest),
}
pub fn create_text_param_for_request(

View File

@@ -1,5 +1,6 @@
use crate::auth::SharedAuthProvider;
use crate::common::ResponseEvent;
use crate::common::ResponseProcessedWsRequest;
use crate::common::ResponseStream;
use crate::common::ResponsesWsRequest;
use crate::error::ApiError;
@@ -204,6 +205,40 @@ impl ResponsesWebsocketConnection {
self.stream.lock().await.is_none()
}
#[instrument(
name = "responses_websocket.send_response_processed",
level = "info",
skip_all,
fields(transport = "responses_websocket", api.path = "responses")
)]
#[expect(
clippy::await_holding_invalid_type,
reason = "the guard serializes exclusive use of the websocket while sending a request frame"
)]
pub async fn send_response_processed(&self, response_id: String) -> Result<(), ApiError> {
let request =
ResponsesWsRequest::ResponseProcessed(ResponseProcessedWsRequest { response_id });
let request_body = serde_json::to_value(&request).map_err(|err| {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;
let mut guard = self.stream.lock().await;
let Some(ws_stream) = guard.as_mut() else {
return Err(ApiError::Stream(
"websocket connection is closed".to_string(),
));
};
send_websocket_request(
ws_stream,
request_body,
self.idle_timeout,
self.telemetry.as_ref(),
/*connection_reused*/ true,
)
.await
}
#[instrument(
name = "responses_websocket.stream_request",
level = "info",
@@ -545,36 +580,14 @@ async fn run_websocket_response_stream(
connection_reused: bool,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let request_text = match serde_json::to_string(&request_body) {
Ok(text) => text,
Err(err) => {
return Err(ApiError::Stream(format!(
"failed to encode websocket request: {err}"
)));
}
};
trace!("websocket request: {request_text}");
let request_start = Instant::now();
let result = tokio::time::timeout(
send_websocket_request(
ws_stream,
request_body,
idle_timeout,
ws_stream.send(Message::Text(request_text.into())),
telemetry.as_ref(),
connection_reused,
)
.await
.map_err(|_| ApiError::Stream("idle timeout sending websocket request".into()))
.and_then(|result| {
result.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")))
});
if let Some(t) = telemetry.as_ref() {
t.on_ws_request(
request_start.elapsed(),
result.as_ref().err(),
connection_reused,
);
}
result?;
.await?;
loop {
let poll_start = Instant::now();
@@ -671,6 +684,47 @@ async fn run_websocket_response_stream(
Ok(())
}
async fn send_websocket_request(
ws_stream: &WsStream,
request_body: Value,
idle_timeout: Duration,
telemetry: Option<&Arc<dyn WebsocketTelemetry>>,
connection_reused: bool,
) -> Result<(), ApiError> {
let request_text = match serde_json::to_string(&request_body) {
Ok(text) => text,
Err(err) => {
return Err(ApiError::Stream(format!(
"failed to encode websocket request: {err}"
)));
}
};
trace!("websocket request: {request_text}");
let request_start = Instant::now();
let result = tokio::time::timeout(
idle_timeout,
ws_stream.send(Message::Text(request_text.into())),
)
.await
.map_err(|_| ApiError::Stream("idle timeout sending websocket request".into()))
.and_then(|result| {
result.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")))
});
if let Some(t) = telemetry.as_ref() {
t.on_ws_request(
request_start.elapsed(),
result.as_ref().err(),
connection_reused,
);
}
result?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -30,6 +30,7 @@ pub use crate::common::RawMemoryMetadata;
pub use crate::common::Reasoning;
pub use crate::common::ResponseCreateWsRequest;
pub use crate::common::ResponseEvent;
pub use crate::common::ResponseProcessedWsRequest;
pub use crate::common::ResponseStream;
pub use crate::common::ResponsesApiRequest;
pub use crate::common::ResponsesWsRequest;

View File

@@ -523,6 +523,9 @@
"request_rule": {
"type": "boolean"
},
"responses_websocket_response_processed": {
"type": "boolean"
},
"responses_websockets": {
"type": "boolean"
},
@@ -4078,6 +4081,9 @@
"request_rule": {
"type": "boolean"
},
"responses_websocket_response_processed": {
"type": "boolean"
},
"responses_websockets": {
"type": "boolean"
},

View File

@@ -100,6 +100,7 @@ use tokio::sync::oneshot::error::TryRecvError;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::instrument;
use tracing::trace;
use tracing::warn;
@@ -902,6 +903,18 @@ impl ModelClientSession {
.set_connection_reused(/*connection_reused*/ false);
}
pub(crate) async fn send_response_processed(&self, response_id: &str) {
let Some(connection) = self.websocket_session.connection.as_ref() else {
return;
};
if let Err(err) = connection
.send_response_processed(response_id.to_string())
.await
{
debug!("failed to send response.processed websocket request: {err}");
}
}
#[allow(clippy::too_many_arguments)]
/// Builds shared Responses API transport options and request-body options.
///

View File

@@ -1884,6 +1884,7 @@ async fn try_run_sampling_request(
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let receiving_span = trace_span!("receiving_stream");
let mut completed_response_id: Option<String> = None;
let outcome: CodexResult<SamplingRequestResult> = loop {
let handle_responses = trace_span!(
parent: &receiving_span,
@@ -2111,7 +2112,7 @@ async fn try_run_sampling_request(
sess.services.models_manager.refresh_if_new_etag(etag).await;
}
ResponseEvent::Completed {
response_id: _,
response_id,
token_usage,
end_turn,
} => {
@@ -2128,6 +2129,7 @@ async fn try_run_sampling_request(
if let Some(false) = end_turn {
needs_follow_up = true;
}
completed_response_id = Some(response_id);
break Ok(SamplingRequestResult {
needs_follow_up,
last_agent_message,
@@ -2239,6 +2241,15 @@ async fn try_run_sampling_request(
)
.await;
if sess
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
&& outcome.is_ok()
&& let Some(response_id) = completed_response_id.as_deref()
{
client_session.send_response_processed(response_id).await;
}
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if cancellation_token.is_cancelled() {

View File

@@ -171,6 +171,93 @@ async fn responses_websocket_streams_without_feature_flag_when_provider_supports
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_sends_response_processed_when_feature_enabled() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hi"),
ev_completed("resp-1"),
],
vec![],
]])
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::ResponsesWebsocketResponseProcessed)
.expect("test config should allow feature update");
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("submission should send response.processed after processing");
let processed = server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 2)
.await;
assert_eq!(
processed.body_json(),
json!({
"type": "response.processed",
"response_id": "resp-1",
})
);
let connection = server.single_connection();
assert_eq!(connection.len(), 3);
assert_eq!(
connection[1].body_json()["type"].as_str(),
Some("response.create")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_omits_response_processed_without_feature() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hi"),
ev_completed("resp-1"),
],
vec![],
]])
.await;
let mut builder = test_codex();
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("submission should complete without response.processed");
let connection = server.single_connection();
assert_eq!(connection.len(), 2);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() {
skip_if_no_network!();

View File

@@ -231,6 +231,8 @@ pub enum Feature {
ResponsesWebsockets,
/// Legacy rollout flag for Responses API WebSocket transport v2 experiments.
ResponsesWebsocketsV2,
/// Send `response.processed` over Responses API websockets after a turn response is recorded.
ResponsesWebsocketResponseProcessed,
/// Enable remote compaction v2 over the normal Responses API.
RemoteCompactionV2,
/// Enable workspace dependency support.
@@ -1139,6 +1141,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Removed,
default_enabled: false,
},
FeatureSpec {
id: Feature::ResponsesWebsocketResponseProcessed,
key: "responses_websocket_response_processed",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::RemoteCompactionV2,
key: "remote_compaction_v2",

View File

@@ -129,6 +129,22 @@ fn remote_compaction_v2_is_under_development() {
);
}
#[test]
fn responses_websocket_response_processed_is_under_development() {
assert_eq!(
Feature::ResponsesWebsocketResponseProcessed.stage(),
Stage::UnderDevelopment
);
assert_eq!(
Feature::ResponsesWebsocketResponseProcessed.default_enabled(),
false
);
assert_eq!(
feature_for_key("responses_websocket_response_processed"),
Some(Feature::ResponsesWebsocketResponseProcessed)
);
}
#[test]
fn builtin_mcp_is_under_development() {
assert_eq!(Feature::BuiltInMcp.stage(), Stage::UnderDevelopment);