mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
core: preconnect Responses websocket for first turn (#10698)
## Problem The first user turn can pay websocket handshake latency even when a session has already started. We want to reduce that initial delay while preserving turn semantics and avoiding any prompt send during startup. Reviewer feedback also called out duplicated connect/setup paths and unnecessary preconnect state complexity. ## Mental model `ModelClient` owns session-scoped transport state. During session startup, it can opportunistically warm one websocket handshake slot. A turn-scoped `ModelClientSession` adopts that slot once if available, restores captured sticky turn-state, and otherwise opens a websocket through the same shared connect path. If startup preconnect is still in flight, first turn setup awaits that task and treats it as the first connection attempt for the turn. Preconnect is handshake-only. The first `response.create` is still sent only when a turn starts. ## Non-goals This change does not make preconnect required for correctness and does not change prompt/turn payload semantics. It also does not expand fallback behavior beyond clearing preconnect state when fallback activates. ## Tradeoffs The implementation prioritizes simpler ownership and shared connection code over header-match gating for reuse. The single-slot cache keeps lifecycle straightforward but only benefits the immediate next turn. Awaiting in-flight preconnect has the same app-level connect-timeout semantics as existing websocket connect behavior (no new timeout class introduced by this PR). ## Architecture `core/src/client.rs`: - Added session-level preconnect lifecycle state (`Idle` / `InFlight` / `Ready`) carrying one warmed websocket plus optional captured turn-state. - Added `pre_establish_connection()` startup warmup and `preconnect()` handshake-only setup. - Deduped auth/provider resolution into `current_client_setup()` and websocket handshake wiring into `connect_websocket()` / `build_websocket_headers()`. - Updated turn websocket path to adopt preconnect first, await in-flight preconnect when present, then create a new websocket only when needed. - Ensured fallback activation clears warmed preconnect state. - Added documentation for lifecycle, ownership, sticky-routing invariants, and timeout semantics. `core/src/codex.rs`: - Session startup invokes `model_client.pre_establish_connection(...)`. - Turn metadata resolution uses the shared timeout helper. `core/src/turn_metadata.rs`: - Centralized shared timeout helper used by both turn-time metadata resolution and startup preconnect metadata building. `core/tests/common/responses.rs` + websocket test suites: - Added deterministic handshake waiting helper (`wait_for_handshakes`) with bounded polling. - Added startup preconnect and in-flight preconnect reuse coverage. - Fallback expectations now assert exactly two websocket attempts in covered scenarios (startup preconnect + turn attempt before fallback sticks). ## Observability Preconnect remains best-effort and non-fatal. Existing websocket/fallback telemetry remains in place, and debug logs now make preconnect-await behavior and preconnect failures easier to reason about. ## Tests Validated with: 1. `just fmt` 2. `cargo test -p codex-core websocket_preconnect -- --nocapture` 3. `cargo test -p codex-core websocket_fallback -- --nocapture` 4. `cargo test -p codex-core websocket_first_turn_waits_for_inflight_preconnect -- --nocapture`
This commit is contained in:
@@ -9,10 +9,35 @@
|
||||
//! call site.
|
||||
//!
|
||||
//! A [`ModelClientSession`] is created per turn and is used to stream one or more Responses API
|
||||
//! requests during that turn. It caches a Responses WebSocket connection (opened lazily) and
|
||||
//! stores per-turn state such as the `x-codex-turn-state` token used for sticky routing.
|
||||
//! requests during that turn. It caches a Responses WebSocket connection (opened lazily, or reused
|
||||
//! from a session-level preconnect) and stores per-turn state such as the `x-codex-turn-state`
|
||||
//! token used for sticky routing.
|
||||
//!
|
||||
//! Preconnect is intentionally handshake-only: it may warm a socket and capture sticky-routing
|
||||
//! state, but the first `response.create` payload is still sent only when a turn starts.
|
||||
//!
|
||||
//! Internally, startup preconnect and warmed-socket adoption share one session-level lifecycle:
|
||||
//! `Idle` (no task/socket), `InFlight` (startup preconnect task running), and `Ready` (one-shot
|
||||
//! warmed socket available). On first use in a turn, the session tries to adopt `Ready`; if not
|
||||
//! ready, it awaits `InFlight` and retries adoption before opening a new websocket. This prevents
|
||||
//! racing duplicate first-turn handshakes while keeping preconnect best-effort.
|
||||
//!
|
||||
//! ## Retry-Budget Tradeoff
|
||||
//!
|
||||
//! `stream_max_retries` applies to retryable turn stream failures, not to background startup
|
||||
//! preconnect handshakes. In failure cases this can produce two websocket handshakes on the first
|
||||
//! turn (startup preconnect, then turn-time connect) before HTTP fallback becomes sticky. We keep
|
||||
//! this split intentionally so opportunistic preconnect cannot consume the user-visible stream
|
||||
//! retry budget before any turn payload is sent.
|
||||
//!
|
||||
//! If this policy needs to change later, preconnect can be modeled as an explicit first connection
|
||||
//! attempt in the same retry budget as turn streaming. That would require plumbing websocket
|
||||
//! attempt accounting from connection acquisition into the turn retry loop and updating fallback
|
||||
//! expectations/tests accordingly.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -64,8 +89,10 @@ use reqwest::StatusCode;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
@@ -81,6 +108,8 @@ use crate::flags::CODEX_RS_SSE_FIXTURE;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::tools::spec::create_tools_json_for_responses_api;
|
||||
use crate::turn_metadata::build_turn_metadata_header;
|
||||
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
|
||||
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
|
||||
@@ -93,7 +122,6 @@ pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
///
|
||||
/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most
|
||||
/// configuration is per turn and is passed explicitly to streaming/unary methods.
|
||||
#[derive(Debug)]
|
||||
struct ModelClientState {
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
conversation_id: ThreadId,
|
||||
@@ -105,6 +133,70 @@ struct ModelClientState {
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
disable_websockets: AtomicBool,
|
||||
/// Session-scoped preconnect lifecycle state.
|
||||
///
|
||||
/// This keeps startup preconnect task tracking and warmed-socket adoption in one lock so
|
||||
/// turn-time websocket setup observes a single, coherent state.
|
||||
preconnect: Mutex<PreconnectState>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ModelClientState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ModelClientState")
|
||||
.field("auth_manager", &self.auth_manager)
|
||||
.field("conversation_id", &self.conversation_id)
|
||||
.field("provider", &self.provider)
|
||||
.field("session_source", &self.session_source)
|
||||
.field("model_verbosity", &self.model_verbosity)
|
||||
.field(
|
||||
"enable_responses_websockets",
|
||||
&self.enable_responses_websockets,
|
||||
)
|
||||
.field(
|
||||
"enable_request_compression",
|
||||
&self.enable_request_compression,
|
||||
)
|
||||
.field("include_timing_metrics", &self.include_timing_metrics)
|
||||
.field("beta_features_header", &self.beta_features_header)
|
||||
.field(
|
||||
"disable_websockets",
|
||||
&self.disable_websockets.load(Ordering::Relaxed),
|
||||
)
|
||||
.field("preconnect", &"<opaque>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolved API client setup for a single request attempt.
|
||||
///
|
||||
/// Keeping this as a single bundle ensures preconnect and normal request paths
|
||||
/// share the same auth/provider setup flow.
|
||||
struct CurrentClientSetup {
|
||||
auth: Option<CodexAuth>,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
}
|
||||
|
||||
/// One-shot preconnected websocket slot consumed by the next turn.
|
||||
///
|
||||
/// This bundles the socket with optional sticky-routing state captured during
|
||||
/// handshake so they are taken and cleared atomically.
|
||||
struct PreconnectedWebSocket {
|
||||
connection: ApiWebSocketConnection,
|
||||
turn_state: Option<String>,
|
||||
}
|
||||
|
||||
/// Session-level lifecycle of startup websocket preconnect.
|
||||
///
|
||||
/// `InFlight` tracks the startup task so the first turn can await it and reuse the same socket.
|
||||
/// `Ready` stores a one-shot warmed socket for turn adoption.
|
||||
enum PreconnectState {
|
||||
/// No startup preconnect task is active and no warmed socket is available.
|
||||
Idle,
|
||||
/// Startup preconnect is currently running; first turn may await this task.
|
||||
InFlight(JoinHandle<()>),
|
||||
/// Startup preconnect finished and produced a one-shot warmed socket.
|
||||
Ready(PreconnectedWebSocket),
|
||||
}
|
||||
|
||||
/// A session-scoped client for model-provider API calls.
|
||||
@@ -116,8 +208,9 @@ struct ModelClientState {
|
||||
/// WebSocket fallback is session-scoped: once a turn activates the HTTP fallback, subsequent turns
|
||||
/// will also use HTTP for the remainder of the session.
|
||||
///
|
||||
/// Turn-scoped settings (model selection, reasoning controls, telemetry context, and turn metadata)
|
||||
/// are passed explicitly to the relevant methods to keep turn lifetime visible at the call site.
|
||||
/// Turn-scoped settings (model selection, reasoning controls, telemetry context, and turn
|
||||
/// metadata) are passed explicitly to the relevant methods to keep turn lifetime visible at the
|
||||
/// call site.
|
||||
///
|
||||
/// This type is cheap to clone.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -127,14 +220,17 @@ pub struct ModelClient {
|
||||
|
||||
/// A turn-scoped streaming session created from a [`ModelClient`].
|
||||
///
|
||||
/// The session lazily establishes a Responses WebSocket connection (and reuses it across multiple
|
||||
/// requests) and caches per-turn state:
|
||||
/// The session establishes a Responses WebSocket connection lazily (or adopts a preconnected one)
|
||||
/// and reuses it across multiple requests within the turn. It also caches per-turn state:
|
||||
///
|
||||
/// - The last request's input items, so subsequent calls can use `response.append` when the input
|
||||
/// is an incremental extension of the previous request.
|
||||
/// - The `x-codex-turn-state` sticky-routing token, which must be replayed for all requests within
|
||||
/// the same turn.
|
||||
///
|
||||
/// When startup preconnect is still running, first use of this session awaits that in-flight task
|
||||
/// before opening a new websocket so preconnect acts as the first connection attempt for the turn.
|
||||
///
|
||||
/// Create a fresh `ModelClientSession` for each Codex turn. Reusing it across turns would replay
|
||||
/// the previous turn's sticky-routing token into the next turn, which violates the client/server
|
||||
/// contract and can cause routing bugs.
|
||||
@@ -184,14 +280,16 @@ impl ModelClient {
|
||||
include_timing_metrics,
|
||||
beta_features_header,
|
||||
disable_websockets: AtomicBool::new(false),
|
||||
preconnect: Mutex::new(PreconnectState::Idle),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a fresh turn-scoped streaming session.
|
||||
///
|
||||
/// This does not open any network connections; the WebSocket connection is established lazily
|
||||
/// when the first WebSocket stream request is issued.
|
||||
/// This constructor does not perform network I/O itself. The returned session either adopts a
|
||||
/// previously preconnected websocket or opens a websocket lazily when the first stream request
|
||||
/// is issued.
|
||||
pub fn new_session(&self) -> ModelClientSession {
|
||||
ModelClientSession {
|
||||
client: self.clone(),
|
||||
@@ -201,6 +299,79 @@ impl ModelClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a best-effort task that warms a websocket for the first turn.
|
||||
///
|
||||
/// This call performs only connection setup; it never sends prompt payloads.
|
||||
///
|
||||
/// A timeout when computing turn metadata is treated the same as "no metadata" so startup
|
||||
/// cannot block indefinitely on optional preconnect context.
|
||||
pub fn pre_establish_connection(&self, otel_manager: OtelManager, cwd: PathBuf) {
|
||||
if !self.responses_websocket_enabled() || self.disable_websockets() {
|
||||
return;
|
||||
}
|
||||
|
||||
let model_client = self.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let turn_metadata_header = resolve_turn_metadata_header_with_timeout(
|
||||
build_turn_metadata_header(cwd.as_path()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let _ = model_client
|
||||
.preconnect(&otel_manager, turn_metadata_header.as_deref())
|
||||
.await;
|
||||
});
|
||||
self.store_preconnect_task(handle);
|
||||
}
|
||||
|
||||
/// Opportunistically pre-establishes a Responses WebSocket connection for this session.
|
||||
///
|
||||
/// This method is best-effort: it returns `false` on any setup/connect failure and the caller
|
||||
/// should continue normally. A successful preconnect reduces first-turn latency but never sends
|
||||
/// an initial prompt; the first `response.create` is still sent only when a turn starts.
|
||||
///
|
||||
/// The preconnected slot is single-consumer and single-use: the next `ModelClientSession` may
|
||||
/// adopt it once, after which later turns either keep using that same turn-local connection or
|
||||
/// create a new one.
|
||||
pub async fn preconnect(
|
||||
&self,
|
||||
otel_manager: &OtelManager,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> bool {
|
||||
if !self.responses_websocket_enabled() || self.disable_websockets() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let client_setup = match self.current_client_setup().await {
|
||||
Ok(client_setup) => client_setup,
|
||||
Err(err) => {
|
||||
warn!("failed to build websocket preconnect client setup: {err}");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
let turn_state = Arc::new(OnceLock::new());
|
||||
|
||||
match self
|
||||
.connect_websocket(
|
||||
otel_manager,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
Some(Arc::clone(&turn_state)),
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(connection) => {
|
||||
self.store_preconnected_websocket(connection, turn_state.get().cloned());
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("websocket preconnect failed: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Compacts the current conversation history using the Compact endpoint.
|
||||
///
|
||||
/// This is a unary call (no streaming) that returns a new list of
|
||||
@@ -217,20 +388,12 @@ impl ModelClient {
|
||||
if prompt.input.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let auth_manager = self.state.auth_manager.clone();
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.state
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
||||
let client_setup = self.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = Self::build_request_telemetry(otel_manager);
|
||||
let client = ApiCompactClient::new(transport, api_provider, api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
let client =
|
||||
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
|
||||
let instructions = prompt.base_instructions.text.clone();
|
||||
let payload = ApiCompactionInput {
|
||||
@@ -263,20 +426,12 @@ impl ModelClient {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let auth_manager = self.state.auth_manager.clone();
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.state
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth, &self.state.provider)?;
|
||||
let client_setup = self.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = Self::build_request_telemetry(otel_manager);
|
||||
let client = ApiMemoriesClient::new(transport, api_provider, api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
let client =
|
||||
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
|
||||
let payload = ApiMemoryTraceSummarizeInput {
|
||||
model: model_info.slug.clone(),
|
||||
@@ -315,13 +470,223 @@ impl ModelClient {
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
|
||||
request_telemetry
|
||||
}
|
||||
|
||||
/// Returns whether this session is configured to use Responses-over-WebSocket.
|
||||
///
|
||||
/// This combines provider capability and feature gating; both must be true for websocket paths
|
||||
/// to be eligible.
|
||||
fn responses_websocket_enabled(&self) -> bool {
|
||||
self.state.provider.supports_websockets && self.state.enable_responses_websockets
|
||||
}
|
||||
|
||||
/// Returns whether websocket transport has been permanently disabled for this session.
|
||||
///
|
||||
/// Once set by fallback activation, subsequent turns must stay on HTTP transport.
|
||||
fn disable_websockets(&self) -> bool {
|
||||
self.state.disable_websockets.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns auth + provider configuration resolved from the current session auth state.
|
||||
///
|
||||
/// This centralizes setup used by both preconnect and normal request paths so they stay in
|
||||
/// lockstep when auth/provider resolution changes.
|
||||
async fn current_client_setup(&self) -> Result<CurrentClientSetup> {
|
||||
let auth = match self.state.auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.state
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
||||
Ok(CurrentClientSetup {
|
||||
auth,
|
||||
api_provider,
|
||||
api_auth,
|
||||
})
|
||||
}
|
||||
|
||||
/// Opens a websocket connection using the same header and telemetry wiring as normal turns.
|
||||
///
|
||||
/// Both startup preconnect and in-turn `needs_new` reconnects call this path so handshake
|
||||
/// behavior remains consistent across both flows.
|
||||
async fn connect_websocket(
|
||||
&self,
|
||||
otel_manager: &OtelManager,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
|
||||
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
|
||||
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(otel_manager);
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(headers, turn_state, Some(websocket_telemetry))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Builds websocket handshake headers for both preconnect and turn-time reconnect.
|
||||
///
|
||||
/// Callers should pass the current turn-state lock when available so sticky-routing state is
|
||||
/// replayed on reconnect within the same turn.
|
||||
fn build_websocket_headers(
|
||||
&self,
|
||||
turn_state: Option<&Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> ApiHeaderMap {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
let mut headers = build_responses_headers(
|
||||
self.state.beta_features_header.as_deref(),
|
||||
turn_state,
|
||||
turn_metadata_header.as_ref(),
|
||||
);
|
||||
headers.extend(build_conversation_headers(Some(
|
||||
self.state.conversation_id.to_string(),
|
||||
)));
|
||||
headers.insert(
|
||||
OPENAI_BETA_HEADER,
|
||||
HeaderValue::from_static(OPENAI_BETA_RESPONSES_WEBSOCKETS),
|
||||
);
|
||||
if self.state.include_timing_metrics {
|
||||
headers.insert(
|
||||
X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER,
|
||||
HeaderValue::from_static("true"),
|
||||
);
|
||||
}
|
||||
headers
|
||||
}
|
||||
|
||||
/// Consumes the warmed websocket slot.
|
||||
fn take_preconnected_websocket(&self) -> Option<PreconnectedWebSocket> {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let previous = std::mem::replace(&mut *state, PreconnectState::Idle);
|
||||
match previous {
|
||||
PreconnectState::Ready(preconnected) => Some(preconnected),
|
||||
other => {
|
||||
*state = other;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores a freshly preconnected websocket and optional captured turn-state token.
|
||||
///
|
||||
/// This overwrites any previously warmed socket because only one preconnect candidate is kept.
|
||||
fn store_preconnected_websocket(
|
||||
&self,
|
||||
connection: ApiWebSocketConnection,
|
||||
turn_state: Option<String>,
|
||||
) {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if self.disable_websockets() {
|
||||
debug!("discarding startup websocket preconnect because websocket fallback is active");
|
||||
*state = PreconnectState::Idle;
|
||||
return;
|
||||
}
|
||||
*state = PreconnectState::Ready(PreconnectedWebSocket {
|
||||
connection,
|
||||
turn_state,
|
||||
});
|
||||
}
|
||||
|
||||
/// Stores the latest startup preconnect task handle.
|
||||
///
|
||||
/// If a previous task is still running, it is aborted so only one in-flight startup attempt
|
||||
/// is tracked.
|
||||
fn store_preconnect_task(&self, task: JoinHandle<()>) {
|
||||
let mut task = Some(task);
|
||||
let previous_in_flight = {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
match &*state {
|
||||
// A very fast startup preconnect can complete before this method stores the
|
||||
// task handle; keep the warmed socket and drop the now-useless handle.
|
||||
PreconnectState::Ready(_) => None,
|
||||
_ => match task.take() {
|
||||
Some(next_task) => {
|
||||
match std::mem::replace(&mut *state, PreconnectState::InFlight(next_task)) {
|
||||
PreconnectState::InFlight(previous) => Some(previous),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
},
|
||||
}
|
||||
};
|
||||
if let Some(previous) = previous_in_flight {
|
||||
previous.abort();
|
||||
}
|
||||
if let Some(task) = task {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// Awaits the startup preconnect task once, if one is currently tracked.
|
||||
///
|
||||
/// This lets the first turn treat startup preconnect as the first websocket connection
|
||||
/// attempt, avoiding a redundant second connect while the preconnect attempt is in flight.
|
||||
///
|
||||
/// This await intentionally has no separate timeout wrapper. WebSocket connect handshakes
|
||||
/// already run without an app-level timeout, so waiting on the in-flight preconnect task does
|
||||
/// not add a new unbounded wait class; it reuses the same first connection attempt.
|
||||
async fn await_preconnect_task(&self) {
|
||||
let task = {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let previous = std::mem::replace(&mut *state, PreconnectState::Idle);
|
||||
match previous {
|
||||
PreconnectState::InFlight(task) => Some(task),
|
||||
other => {
|
||||
*state = other;
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(task) = task {
|
||||
let in_flight = !task.is_finished();
|
||||
if in_flight {
|
||||
debug!("awaiting startup websocket preconnect before opening a new websocket");
|
||||
}
|
||||
if let Err(err) = task.await {
|
||||
debug!("startup websocket preconnect task failed: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears all startup preconnect state.
|
||||
///
|
||||
/// This aborts any in-flight startup preconnect task and drops any warmed socket.
|
||||
fn clear_preconnect(&self) {
|
||||
let previous = {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
std::mem::replace(&mut *state, PreconnectState::Idle)
|
||||
};
|
||||
if let PreconnectState::InFlight(task) = previous {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ModelClientSession {
|
||||
fn disable_websockets(&self) -> bool {
|
||||
self.client.state.disable_websockets.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn activate_http_fallback(&self, websocket_enabled: bool) -> bool {
|
||||
websocket_enabled
|
||||
&& !self
|
||||
@@ -331,11 +696,6 @@ impl ModelClientSession {
|
||||
.swap(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn responses_websocket_enabled(&self) -> bool {
|
||||
self.client.state.provider.supports_websockets
|
||||
&& self.client.state.enable_responses_websockets
|
||||
}
|
||||
|
||||
fn build_responses_request(prompt: &Prompt) -> Result<ApiPrompt> {
|
||||
let instructions = prompt.base_instructions.text.clone();
|
||||
let tools_json: Vec<Value> = create_tools_json_for_responses_api(&prompt.tools)?;
|
||||
@@ -343,6 +703,10 @@ impl ModelClientSession {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Builds shared Responses API request options for both HTTP and WebSocket streaming.
|
||||
///
|
||||
/// Keeping option construction in one place ensures request-scoped headers are consistent
|
||||
/// regardless of transport choice.
|
||||
fn build_responses_options(
|
||||
&self,
|
||||
prompt: &Prompt,
|
||||
@@ -352,8 +716,7 @@ impl ModelClientSession {
|
||||
turn_metadata_header: Option<&str>,
|
||||
compression: Compression,
|
||||
) -> ApiResponsesOptions {
|
||||
let turn_metadata_header =
|
||||
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok());
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
|
||||
let default_reasoning_effort = model_info.default_reasoning_level;
|
||||
let reasoning = if model_info.supports_reasoning_summaries {
|
||||
@@ -466,30 +829,54 @@ impl ModelClientSession {
|
||||
ResponsesWsRequest::ResponseCreate(payload)
|
||||
}
|
||||
|
||||
/// Returns a websocket connection for this turn, reusing preconnect when possible.
|
||||
///
|
||||
/// This method first tries to adopt the session-level preconnect slot, then falls back to a
|
||||
/// fresh websocket handshake only when the turn has no live connection. If startup preconnect
|
||||
/// is still running, it is awaited first so that task acts as the first connection attempt for
|
||||
/// this turn instead of racing a second handshake. If that attempt fails, the normal connect
|
||||
/// and stream retry flow continues unchanged.
|
||||
async fn websocket_connection(
|
||||
&mut self,
|
||||
otel_manager: &OtelManager,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_metadata_header: Option<&str>,
|
||||
options: &ApiResponsesOptions,
|
||||
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
||||
// Prefer the session-level preconnect slot before creating a new websocket.
|
||||
if self.connection.is_none() {
|
||||
if let Some(preconnected) = self.try_use_preconnected_websocket() {
|
||||
self.adopt_preconnected_websocket(preconnected);
|
||||
} else {
|
||||
self.client.await_preconnect_task().await;
|
||||
if let Some(preconnected) = self.try_use_preconnected_websocket() {
|
||||
self.adopt_preconnected_websocket(preconnected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let needs_new = match self.connection.as_ref() {
|
||||
Some(conn) => conn.is_closed().await,
|
||||
None => true,
|
||||
};
|
||||
|
||||
if needs_new {
|
||||
let headers =
|
||||
build_websocket_connect_headers(options, self.client.state.include_timing_metrics);
|
||||
let websocket_telemetry = Self::build_websocket_telemetry(otel_manager);
|
||||
let new_conn: ApiWebSocketConnection =
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(
|
||||
headers,
|
||||
options.turn_state.clone(),
|
||||
Some(websocket_telemetry),
|
||||
)
|
||||
.await?;
|
||||
self.client.clear_preconnect();
|
||||
let turn_state = options
|
||||
.turn_state
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::clone(&self.turn_state));
|
||||
let new_conn = self
|
||||
.client
|
||||
.connect_websocket(
|
||||
otel_manager,
|
||||
api_provider,
|
||||
api_auth,
|
||||
Some(turn_state),
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await?;
|
||||
self.connection = Some(new_conn);
|
||||
}
|
||||
|
||||
@@ -498,6 +885,33 @@ impl ModelClientSession {
|
||||
))
|
||||
}
|
||||
|
||||
/// Adopts the session-level preconnect slot for this turn.
|
||||
///
|
||||
/// If a turn-local connection already exists, this intentionally does nothing to avoid
|
||||
/// replacing an active connection mid-turn.
|
||||
fn try_use_preconnected_websocket(&mut self) -> Option<PreconnectedWebSocket> {
|
||||
if self.connection.is_some() {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.client.take_preconnected_websocket()
|
||||
}
|
||||
|
||||
/// Moves a preconnected socket into the turn-local connection slot.
|
||||
///
|
||||
/// If the preconnect handshake captured sticky-routing turn state, this also seeds the
|
||||
/// turn-local state lock so all later requests in the turn replay the same token.
|
||||
fn adopt_preconnected_websocket(&mut self, preconnected: PreconnectedWebSocket) {
|
||||
let PreconnectedWebSocket {
|
||||
connection,
|
||||
turn_state,
|
||||
} = preconnected;
|
||||
if let Some(turn_state) = turn_state {
|
||||
let _ = self.turn_state.set(turn_state);
|
||||
}
|
||||
self.connection = Some(connection);
|
||||
}
|
||||
|
||||
fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression {
|
||||
if self.client.state.enable_request_compression
|
||||
&& auth.is_some_and(CodexAuth::is_chatgpt_auth)
|
||||
@@ -540,22 +954,17 @@ impl ModelClientSession {
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
loop {
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.client
|
||||
.state
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.client.state.provider)?;
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(otel_manager);
|
||||
let compression = self.responses_request_compression(auth.as_ref());
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
|
||||
let client = ApiResponsesClient::new(transport, api_provider, api_auth)
|
||||
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
|
||||
let client = ApiResponsesClient::new(
|
||||
transport,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
)
|
||||
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
|
||||
|
||||
let options = self.build_responses_options(
|
||||
prompt,
|
||||
@@ -603,17 +1012,8 @@ impl ModelClientSession {
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
loop {
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.client
|
||||
.state
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.client.state.provider)?;
|
||||
let compression = self.responses_request_compression(auth.as_ref());
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
|
||||
let options = self.build_responses_options(
|
||||
prompt,
|
||||
@@ -628,8 +1028,9 @@ impl ModelClientSession {
|
||||
let connection = match self
|
||||
.websocket_connection(
|
||||
otel_manager,
|
||||
api_provider.clone(),
|
||||
api_auth.clone(),
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
turn_metadata_header,
|
||||
&options,
|
||||
)
|
||||
.await
|
||||
@@ -691,7 +1092,7 @@ impl ModelClientSession {
|
||||
match wire_api {
|
||||
WireApi::Responses => {
|
||||
let websocket_enabled =
|
||||
self.responses_websocket_enabled() && !self.disable_websockets();
|
||||
self.client.responses_websocket_enabled() && !self.client.disable_websockets();
|
||||
|
||||
if websocket_enabled {
|
||||
self.stream_responses_websocket(
|
||||
@@ -721,10 +1122,16 @@ impl ModelClientSession {
|
||||
/// Permanently disables WebSockets for this Codex session and resets WebSocket state.
|
||||
///
|
||||
/// This is used after exhausting the provider retry budget, to force subsequent requests onto
|
||||
/// the HTTP transport. Returns `true` if this call activated fallback, or `false` if fallback
|
||||
/// was already active.
|
||||
/// the HTTP transport. It also clears any warmed websocket preconnect state so future turns
|
||||
/// cannot accidentally adopt a stale socket after fallback has been activated.
|
||||
///
|
||||
/// Startup preconnect handshakes are intentionally not counted against `stream_max_retries`.
|
||||
/// See [`crate::client`] module docs ("Retry-Budget Tradeoff") for rationale and future
|
||||
/// alternatives.
|
||||
///
|
||||
/// Returns `true` if this call activated fallback, or `false` if fallback was already active.
|
||||
pub(crate) fn try_switch_fallback_transport(&mut self, otel_manager: &OtelManager) -> bool {
|
||||
let websocket_enabled = self.responses_websocket_enabled();
|
||||
let websocket_enabled = self.client.responses_websocket_enabled();
|
||||
let activated = self.activate_http_fallback(websocket_enabled);
|
||||
if activated {
|
||||
warn!("falling back to HTTP");
|
||||
@@ -736,6 +1143,7 @@ impl ModelClientSession {
|
||||
|
||||
self.connection = None;
|
||||
self.websocket_last_items.clear();
|
||||
self.client.clear_preconnect();
|
||||
}
|
||||
activated
|
||||
}
|
||||
@@ -752,6 +1160,14 @@ fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec<Value
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses per-turn metadata into an HTTP header value.
|
||||
///
|
||||
/// Invalid values are treated as absent so callers can compare and propagate
|
||||
/// metadata with the same sanitization path used when constructing headers.
|
||||
fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<HeaderValue> {
|
||||
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok())
|
||||
}
|
||||
|
||||
/// Builds the extra headers attached to Responses API requests.
|
||||
///
|
||||
/// These headers implement Codex-specific conventions:
|
||||
@@ -783,25 +1199,6 @@ fn build_responses_headers(
|
||||
headers
|
||||
}
|
||||
|
||||
fn build_websocket_connect_headers(
|
||||
options: &ApiResponsesOptions,
|
||||
include_timing_metrics: bool,
|
||||
) -> ApiHeaderMap {
|
||||
let mut headers = options.extra_headers.clone();
|
||||
headers.extend(build_conversation_headers(options.conversation_id.clone()));
|
||||
headers.insert(
|
||||
OPENAI_BETA_HEADER,
|
||||
HeaderValue::from_static(OPENAI_BETA_RESPONSES_WEBSOCKETS),
|
||||
);
|
||||
if include_timing_metrics {
|
||||
headers.insert(
|
||||
X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER,
|
||||
HeaderValue::from_static("true"),
|
||||
);
|
||||
}
|
||||
headers
|
||||
}
|
||||
|
||||
fn map_response_stream<S>(api_stream: S, otel_manager: OtelManager) -> ResponseStream
|
||||
where
|
||||
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
|
||||
|
||||
@@ -38,6 +38,7 @@ use crate::stream_events_utils::last_assistant_message_from_item;
|
||||
use crate::terminal;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::turn_metadata::build_turn_metadata_header;
|
||||
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
|
||||
use crate::util::error_or_panic;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -563,22 +564,22 @@ impl TurnContext {
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Resolves the per-turn metadata header under a shared timeout policy.
|
||||
///
|
||||
/// This uses the same timeout helper as websocket startup preconnect so both turn execution
|
||||
/// and background preconnect observe identical "timeout means best-effort fallback" behavior.
|
||||
pub async fn resolve_turn_metadata_header(&self) -> Option<String> {
|
||||
const TURN_METADATA_HEADER_TIMEOUT_MS: u64 = 250;
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_millis(TURN_METADATA_HEADER_TIMEOUT_MS),
|
||||
resolve_turn_metadata_header_with_timeout(
|
||||
self.build_turn_metadata_header(),
|
||||
self.turn_metadata_header.get().cloned().flatten(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(header) => header,
|
||||
Err(_) => {
|
||||
warn!("timed out after 250ms while building turn metadata header");
|
||||
self.turn_metadata_header.get().cloned().flatten()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts best-effort background computation of turn metadata.
|
||||
///
|
||||
/// This warms the cached value used by [`TurnContext::resolve_turn_metadata_header`] so turns
|
||||
/// and websocket preconnect are less likely to pay metadata construction latency on demand.
|
||||
pub fn spawn_turn_metadata_header_task(self: &Arc<Self>) {
|
||||
let context = Arc::clone(self);
|
||||
tokio::spawn(async move {
|
||||
@@ -1078,6 +1079,14 @@ impl Session {
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
// Warm a websocket in the background so the first turn can reuse it.
|
||||
// This performs only connection setup; user input is still sent later via response.create
|
||||
// when submit_turn() runs.
|
||||
sess.services.model_client.pre_establish_connection(
|
||||
sess.services.otel_manager.clone(),
|
||||
session_configuration.cwd.clone(),
|
||||
);
|
||||
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
|
||||
@@ -1,12 +1,49 @@
|
||||
//! Helpers for computing and resolving optional per-turn metadata headers.
|
||||
//!
|
||||
//! This module owns both metadata construction and the shared timeout policy used by
|
||||
//! turn execution and startup websocket preconnect. Keeping timeout behavior centralized
|
||||
//! ensures both call sites treat timeout as the same best-effort fallback condition.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::Serialize;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::git_info::get_git_remote_urls_assume_git_repo;
|
||||
use crate::git_info::get_git_repo_root;
|
||||
use crate::git_info::get_head_commit_hash;
|
||||
|
||||
/// Timeout used when resolving the optional turn-metadata header.
|
||||
pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Resolves turn metadata with a shared timeout policy.
|
||||
///
|
||||
/// On timeout, this logs a warning and returns the provided fallback header.
|
||||
///
|
||||
/// Keeping this helper centralized avoids drift between turn-time metadata resolution and startup
|
||||
/// websocket preconnect, both of which need identical timeout semantics.
|
||||
pub(crate) async fn resolve_turn_metadata_header_with_timeout<F>(
|
||||
build_header: F,
|
||||
fallback_on_timeout: Option<String>,
|
||||
) -> Option<String>
|
||||
where
|
||||
F: Future<Output = Option<String>>,
|
||||
{
|
||||
match tokio::time::timeout(TURN_METADATA_HEADER_TIMEOUT, build_header).await {
|
||||
Ok(header) => header,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"timed out after {}ms while building turn metadata header",
|
||||
TURN_METADATA_HEADER_TIMEOUT.as_millis()
|
||||
);
|
||||
fallback_on_timeout
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TurnMetadataWorkspace {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
|
||||
@@ -268,6 +268,11 @@ impl WebSocketHandshake {
|
||||
pub struct WebSocketConnectionConfig {
|
||||
pub requests: Vec<Vec<Value>>,
|
||||
pub response_headers: Vec<(String, String)>,
|
||||
/// Optional delay inserted before accepting the websocket handshake.
|
||||
///
|
||||
/// Tests use this to force startup preconnect into an in-flight state so first-turn adoption
|
||||
/// paths can be exercised deterministically.
|
||||
pub accept_delay: Option<Duration>,
|
||||
}
|
||||
|
||||
pub struct WebSocketTestServer {
|
||||
@@ -299,6 +304,29 @@ impl WebSocketTestServer {
|
||||
self.handshakes.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Waits until at least `expected` websocket handshakes have been observed or timeout elapses.
|
||||
///
|
||||
/// Uses a short bounded polling interval so tests can deterministically wait for background
|
||||
/// preconnect activity without busy-spinning.
|
||||
pub async fn wait_for_handshakes(&self, expected: usize, timeout: Duration) -> bool {
|
||||
if self.handshakes.lock().unwrap().len() >= expected {
|
||||
return true;
|
||||
}
|
||||
|
||||
let deadline = tokio::time::Instant::now() + timeout;
|
||||
let poll_interval = Duration::from_millis(10);
|
||||
loop {
|
||||
if self.handshakes.lock().unwrap().len() >= expected {
|
||||
return true;
|
||||
}
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline {
|
||||
return false;
|
||||
}
|
||||
let sleep_for = std::cmp::min(poll_interval, deadline.saturating_duration_since(now));
|
||||
tokio::time::sleep(sleep_for).await;
|
||||
}
|
||||
}
|
||||
pub fn single_handshake(&self) -> WebSocketHandshake {
|
||||
let handshakes = self.handshakes.lock().unwrap();
|
||||
if handshakes.len() != 1 {
|
||||
@@ -861,6 +889,7 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
|
||||
.map(|requests| WebSocketConnectionConfig {
|
||||
requests,
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: None,
|
||||
})
|
||||
.collect();
|
||||
start_websocket_server_with_headers(connections).await
|
||||
@@ -900,6 +929,10 @@ pub async fn start_websocket_server_with_headers(
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Some(delay) = connection.accept_delay {
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
|
||||
let response_headers = connection.response_headers.clone();
|
||||
let handshake_log = Arc::clone(&handshakes);
|
||||
let callback = move |req: &Request, mut response: Response| {
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
use anyhow::Result;
|
||||
use core_test_support::responses::WebSocketConnectionConfig;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_done;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::ev_shell_command_call;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::responses::start_websocket_server_with_headers;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_test_codex_shell_chain() -> Result<()> {
|
||||
@@ -67,3 +70,53 @@ async fn websocket_test_codex_shell_chain() -> Result<()> {
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_preconnect_happens_on_session_start() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
|
||||
assert!(
|
||||
server.wait_for_handshakes(1, Duration::from_secs(2)).await,
|
||||
"expected websocket preconnect handshake during session startup"
|
||||
);
|
||||
|
||||
test.submit_turn("hello").await?;
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
assert_eq!(server.single_connection().len(), 1);
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_first_turn_waits_for_inflight_preconnect() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
|
||||
requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
|
||||
response_headers: Vec::new(),
|
||||
// Delay handshake so submit_turn() observes startup preconnect as in-flight.
|
||||
accept_delay: Some(Duration::from_millis(150)),
|
||||
}])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
test.submit_turn("hello").await?;
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
assert_eq!(server.single_connection().len(), 1);
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -85,6 +85,68 @@ async fn responses_websocket_streams_request() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preconnect_reuses_connection() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
assert!(harness.client.preconnect(&harness.otel_manager, None).await);
|
||||
|
||||
let mut client_session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
stream_until_complete(&mut client_session, &harness, &prompt).await;
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
assert_eq!(server.single_connection().len(), 1);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
assert!(harness.client.preconnect(&harness.otel_manager, None).await);
|
||||
|
||||
let mut client_session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&harness.model_info,
|
||||
&harness.otel_manager,
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
|
||||
while let Some(event) = stream.next().await {
|
||||
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
assert_eq!(server.single_connection().len(), 1);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[traced_test]
|
||||
async fn responses_websocket_emits_websocket_telemetry_events() {
|
||||
@@ -198,6 +260,7 @@ async fn responses_websocket_emits_reasoning_included_event() {
|
||||
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
|
||||
requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
|
||||
response_headers: vec![("X-Reasoning-Included".to_string(), "true".to_string())],
|
||||
accept_delay: None,
|
||||
}])
|
||||
.await;
|
||||
|
||||
@@ -268,6 +331,7 @@ async fn responses_websocket_emits_rate_limit_events() {
|
||||
("X-Models-Etag".to_string(), "etag-123".to_string()),
|
||||
("X-Reasoning-Included".to_string(), "true".to_string()),
|
||||
],
|
||||
accept_delay: None,
|
||||
}])
|
||||
.await;
|
||||
|
||||
|
||||
@@ -83,6 +83,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
ev_done(),
|
||||
]],
|
||||
response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())],
|
||||
accept_delay: None,
|
||||
},
|
||||
WebSocketConnectionConfig {
|
||||
requests: vec![vec![
|
||||
@@ -91,6 +92,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
ev_completed("resp-2"),
|
||||
]],
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: None,
|
||||
},
|
||||
WebSocketConnectionConfig {
|
||||
requests: vec![vec![
|
||||
@@ -99,6 +101,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
ev_completed("resp-3"),
|
||||
]],
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: None,
|
||||
},
|
||||
])
|
||||
.await;
|
||||
|
||||
@@ -46,7 +46,10 @@ async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result
|
||||
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
|
||||
.count();
|
||||
|
||||
assert_eq!(websocket_attempts, 1);
|
||||
// One websocket attempt comes from startup preconnect and one from the first turn's stream
|
||||
// attempt before fallback activates; after fallback, transport is HTTP. This matches the
|
||||
// retry-budget tradeoff documented in [`codex_core::client`] module docs.
|
||||
assert_eq!(websocket_attempts, 2);
|
||||
assert_eq!(http_attempts, 1);
|
||||
assert_eq!(response_mock.requests().len(), 1);
|
||||
|
||||
@@ -92,7 +95,10 @@ async fn websocket_fallback_is_sticky_across_turns() -> Result<()> {
|
||||
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
|
||||
.count();
|
||||
|
||||
assert_eq!(websocket_attempts, 1);
|
||||
// The first turn issues exactly two websocket attempts (startup preconnect + first stream
|
||||
// attempt). After fallback becomes sticky, subsequent turns stay on HTTP. This mirrors the
|
||||
// retry-budget tradeoff documented in [`codex_core::client`] module docs.
|
||||
assert_eq!(websocket_attempts, 2);
|
||||
assert_eq!(http_attempts, 2);
|
||||
assert_eq!(response_mock.requests().len(), 2);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user