explicit event

This commit is contained in:
Rasmus Rygaard
2026-02-01 11:46:17 -08:00
parent 27967500b8
commit 7db6ae48d4
4 changed files with 118 additions and 73 deletions

View File

@@ -5,7 +5,7 @@ use crate::common::ResponseStream;
use crate::common::ResponsesWsRequest;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::rate_limits::parse_rate_limit;
use crate::rate_limits::parse_rate_limit_event;
use crate::sse::responses::ResponsesStreamEvent;
use crate::sse::responses::process_responses_event;
use crate::telemetry::WebsocketTelemetry;
@@ -13,7 +13,6 @@ use codex_client::TransportError;
use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use serde_json::Value;
use std::sync::Arc;
@@ -36,6 +35,7 @@ use url::Url;
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
const X_MODELS_ETAG_HEADER: &str = "x-models-etag";
const X_REASONING_INCLUDED_HEADER: &str = "x-reasoning-included";
pub struct ResponsesWebsocketConnection {
@@ -43,6 +43,7 @@ pub struct ResponsesWebsocketConnection {
// TODO (pakrym): is this the right place for timeout?
idle_timeout: Duration,
server_reasoning_included: bool,
models_etag: Option<String>,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
turn_state: Option<Arc<OnceLock<String>>>,
}
@@ -52,6 +53,7 @@ impl ResponsesWebsocketConnection {
stream: WsStream,
idle_timeout: Duration,
server_reasoning_included: bool,
models_etag: Option<String>,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
turn_state: Option<Arc<OnceLock<String>>>,
) -> Self {
@@ -59,6 +61,7 @@ impl ResponsesWebsocketConnection {
stream: Arc::new(Mutex::new(Some(stream))),
idle_timeout,
server_reasoning_included,
models_etag,
telemetry,
turn_state,
}
@@ -77,13 +80,16 @@ impl ResponsesWebsocketConnection {
let stream = Arc::clone(&self.stream);
let idle_timeout = self.idle_timeout;
let server_reasoning_included = self.server_reasoning_included;
let models_etag = self.models_etag.clone();
let telemetry = self.telemetry.clone();
let turn_state = self.turn_state.clone();
let request_body = serde_json::to_value(&request).map_err(|err| {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;
tokio::spawn(async move {
if let Some(etag) = models_etag {
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
}
if server_reasoning_included {
let _ = tx_event
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
@@ -105,7 +111,6 @@ impl ResponsesWebsocketConnection {
request_body,
idle_timeout,
telemetry,
turn_state,
)
.await
{
@@ -144,12 +149,13 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
headers.extend(extra_headers);
add_auth_headers_to_header_map(&self.auth, &mut headers);
let (stream, server_reasoning_included) =
let (stream, server_reasoning_included, models_etag) =
connect_websocket(ws_url, headers, turn_state.clone()).await?;
Ok(ResponsesWebsocketConnection::new(
stream,
self.provider.stream_idle_timeout,
server_reasoning_included,
models_etag,
telemetry,
turn_state,
))
@@ -160,7 +166,7 @@ async fn connect_websocket(
url: Url,
headers: HeaderMap,
turn_state: Option<Arc<OnceLock<String>>>,
) -> Result<(WsStream, bool), ApiError> {
) -> Result<(WsStream, bool, Option<String>), ApiError> {
info!("connecting to websocket: {url}");
let mut request = url
@@ -186,6 +192,11 @@ async fn connect_websocket(
};
let reasoning_included = response.headers().contains_key(X_REASONING_INCLUDED_HEADER);
let models_etag = response
.headers()
.get(X_MODELS_ETAG_HEADER)
.and_then(|value| value.to_str().ok())
.map(ToString::to_string);
if let Some(turn_state) = turn_state
&& let Some(header_value) = response
.headers()
@@ -194,7 +205,7 @@ async fn connect_websocket(
{
let _ = turn_state.set(header_value.to_string());
}
Ok((stream, reasoning_included))
Ok((stream, reasoning_included, models_etag))
}
fn map_ws_error(err: WsError, url: &Url) -> ApiError {
@@ -221,31 +232,12 @@ fn map_ws_error(err: WsError, url: &Url) -> ApiError {
}
}
fn headers_from_value(raw: &Value) -> Option<HeaderMap> {
let obj = raw.as_object()?;
let mut headers = HeaderMap::new();
for (name, value) in obj {
let Some(value_str) = value.as_str() else {
continue;
};
let Ok(header_name) = HeaderName::from_bytes(name.as_bytes()) else {
continue;
};
let Ok(header_value) = HeaderValue::from_str(value_str) else {
continue;
};
headers.insert(header_name, header_value);
}
Some(headers)
}
async fn run_websocket_response_stream(
ws_stream: &mut WsStream,
tx_event: mpsc::Sender<std::result::Result<ResponseEvent, ApiError>>,
request_body: Value,
idle_timeout: Duration,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
turn_state: Option<Arc<OnceLock<String>>>,
) -> Result<(), ApiError> {
let request_text = match serde_json::to_string(&request_body) {
Ok(text) => text,
@@ -301,33 +293,9 @@ async fn run_websocket_response_stream(
continue;
}
};
if event.kind() == "codex.metadata" {
if let Some(raw_headers) = event.headers()
&& let Some(headers) = headers_from_value(raw_headers)
{
if let Some(turn_state) = turn_state.as_ref()
&& let Some(header_value) = headers
.get(X_CODEX_TURN_STATE_HEADER)
.and_then(|value| value.to_str().ok())
{
let _ = turn_state.set(header_value.to_string());
}
if let Some(snapshot) = parse_rate_limit(&headers) {
let _ = tx_event.send(Ok(ResponseEvent::RateLimits(snapshot))).await;
}
if let Some(etag) = headers
.get("X-Models-Etag")
.and_then(|value| value.to_str().ok())
{
let _ = tx_event
.send(Ok(ResponseEvent::ModelsEtag(etag.to_string())))
.await;
}
if headers.contains_key(X_REASONING_INCLUDED_HEADER) {
let _ = tx_event
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
.await;
}
if event.kind() == "codex.rate_limits" {
if let Some(snapshot) = parse_rate_limit_event(&text) {
let _ = tx_event.send(Ok(ResponseEvent::RateLimits(snapshot))).await;
}
continue;
}

View File

@@ -1,7 +1,9 @@
use codex_protocol::account::PlanType;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use http::HeaderMap;
use serde::Deserialize;
use std::fmt::Display;
#[derive(Debug)]
@@ -41,6 +43,67 @@ pub fn parse_rate_limit(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
})
}
#[derive(Debug, Deserialize)]
struct RateLimitEventWindow {
used_percent: f64,
window_minutes: Option<i64>,
reset_at: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct RateLimitEventDetails {
primary: Option<RateLimitEventWindow>,
secondary: Option<RateLimitEventWindow>,
}
#[derive(Debug, Deserialize)]
struct RateLimitEventCredits {
has_credits: bool,
unlimited: bool,
balance: Option<String>,
}
#[derive(Debug, Deserialize)]
struct RateLimitEvent {
#[serde(rename = "type")]
kind: String,
plan_type: Option<PlanType>,
rate_limits: Option<RateLimitEventDetails>,
credits: Option<RateLimitEventCredits>,
}
pub fn parse_rate_limit_event(payload: &str) -> Option<RateLimitSnapshot> {
let event: RateLimitEvent = serde_json::from_str(payload).ok()?;
if event.kind != "codex.rate_limits" {
return None;
}
let (primary, secondary) = if let Some(details) = event.rate_limits.as_ref() {
(map_event_window(details.primary.as_ref()), map_event_window(details.secondary.as_ref()))
} else {
(None, None)
};
let credits = event.credits.map(|credits| CreditsSnapshot {
has_credits: credits.has_credits,
unlimited: credits.unlimited,
balance: credits.balance,
});
Some(RateLimitSnapshot {
primary,
secondary,
credits,
plan_type: event.plan_type,
})
}
fn map_event_window(window: Option<&RateLimitEventWindow>) -> Option<RateLimitWindow> {
let window = window?;
Some(RateLimitWindow {
used_percent: window.used_percent,
window_minutes: window.window_minutes,
resets_at: window.reset_at,
})
}
/// Parses the bespoke Codex rate-limit headers into a `RateLimitSnapshot`.
pub fn parse_promo_message(headers: &HeaderMap) -> Option<String> {
parse_header_str(headers, "x-codex-promo-message")

View File

@@ -163,17 +163,12 @@ pub struct ResponsesStreamEvent {
delta: Option<String>,
summary_index: Option<i64>,
content_index: Option<i64>,
headers: Option<Value>,
}
impl ResponsesStreamEvent {
pub fn kind(&self) -> &str {
&self.kind
}
pub fn headers(&self) -> Option<&Value> {
self.headers.as_ref()
}
}
#[derive(Debug)]

View File

@@ -16,6 +16,7 @@ use codex_core::protocol::SessionSource;
use codex_otel::OtelManager;
use codex_otel::metrics::MetricsClient;
use codex_otel::metrics::MetricsConfig;
use codex_protocol::account::PlanType;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary;
use core_test_support::load_default_config_for_test;
@@ -138,25 +139,42 @@ async fn responses_websocket_emits_reasoning_included_event() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_emits_metadata_events() {
async fn responses_websocket_emits_rate_limit_events() {
skip_if_no_network!();
let metadata_headers = json!({
"X-Codex-Primary-Used-Percent": "42",
"X-Codex-Primary-Window-Minutes": "60",
"X-Codex-Primary-Reset-At": "1700000000",
"X-Codex-Credits-Has-Credits": "true",
"X-Codex-Credits-Unlimited": "false",
"X-Codex-Credits-Balance": "123",
"X-Models-Etag": "etag-123",
"X-Reasoning-Included": "true",
let rate_limit_event = json!({
"type": "codex.rate_limits",
"plan_type": "plus",
"rate_limits": {
"allowed": true,
"limit_reached": false,
"primary": {
"used_percent": 42,
"window_minutes": 60,
"reset_at": 1700000000
},
"secondary": null
},
"code_review_rate_limits": null,
"credits": {
"has_credits": true,
"unlimited": false,
"balance": "123"
},
"promo": null
});
let server = start_websocket_server(vec![vec![vec![
json!({"type": "codex.metadata", "headers": metadata_headers}),
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![vec![
rate_limit_event,
ev_response_created("resp-1"),
ev_completed("resp-1"),
]],
response_headers: vec![
("X-Models-Etag".to_string(), "etag-123".to_string()),
("X-Reasoning-Included".to_string(), "true".to_string()),
],
}])
.await;
let harness = websocket_harness(&server).await;
@@ -193,6 +211,7 @@ async fn responses_websocket_emits_metadata_events() {
assert_eq!(primary.used_percent, 42.0);
assert_eq!(primary.window_minutes, Some(60));
assert_eq!(primary.resets_at, Some(1_700_000_000));
assert_eq!(rate_limits.plan_type, Some(PlanType::Plus));
let credits = rate_limits.credits.expect("missing credits");
assert!(credits.has_credits);
assert!(!credits.unlimited);