Compare commits

...

2 Commits

Author SHA1 Message Date
pakrym-oai
8fe5066bcc Simplify pre-connect (#11040) 2026-02-07 15:52:03 -08:00
Michael Bolin
2e89cb9117 feat: include state of [experimental_network] in /debug-config output (#11039)
#10958 introduced experimental support for a network config in
`/etc/codex/requirements.toml`, so this extends `/debug-config` to
surface this information, if set, which should make it easier to debug.
2026-02-07 21:38:12 +00:00
5 changed files with 171 additions and 244 deletions

View File

@@ -16,26 +16,17 @@
//! Preconnect is intentionally handshake-only: it may warm a socket and capture sticky-routing
//! state, but the first `response.create` payload is still sent only when a turn starts.
//!
//! Internally, startup preconnect and warmed-socket adoption share one session-level lifecycle:
//! `Idle` (no task/socket), `InFlight` (startup preconnect task running), and `Ready` (one-shot
//! warmed socket available). On first use in a turn, the session tries to adopt `Ready`; if not
//! ready, it awaits `InFlight` and retries adoption before opening a new websocket. This prevents
//! racing duplicate first-turn handshakes while keeping preconnect best-effort.
//! Internally, startup preconnect stores a single task handle. On first use in a turn, the session
//! awaits that task and adopts the warmed socket if it succeeds; if it fails, the stream attempt
//! fails and the normal retry/fallback loop decides what to do next.
//!
//! ## Retry-Budget Tradeoff
//!
//! `stream_max_retries` applies to retryable turn stream failures, not to background startup
//! preconnect handshakes. In failure cases this can produce two websocket handshakes on the first
//! turn (startup preconnect, then turn-time connect) before HTTP fallback becomes sticky. We keep
//! this split intentionally so opportunistic preconnect cannot consume the user-visible stream
//! retry budget before any turn payload is sent.
//!
//! If this policy needs to change later, preconnect can be modeled as an explicit first connection
//! attempt in the same retry budget as turn streaming. That would require plumbing websocket
//! attempt accounting from connection acquisition into the turn retry loop and updating fallback
//! expectations/tests accordingly.
//! Startup preconnect is treated as the first websocket connection attempt for the first turn. If
//! it fails, the stream attempt fails and the retry/fallback loop decides whether to retry or fall
//! back. This avoids duplicate handshakes but means a failed preconnect can consume one retry
//! budget slot before any turn payload is sent.
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
@@ -82,6 +73,7 @@ use codex_protocol::protocol::SessionSource;
use eventsource_stream::Event;
use eventsource_stream::EventStreamError;
use futures::StreamExt;
use futures::future::BoxFuture;
use http::HeaderMap as ApiHeaderMap;
use http::HeaderValue;
use http::StatusCode as HttpStatusCode;
@@ -94,7 +86,6 @@ use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
use tracing::debug;
use tracing::warn;
use crate::AuthManager;
@@ -110,8 +101,6 @@ use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::turn_metadata::build_turn_metadata_header;
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
@@ -121,6 +110,12 @@ pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
struct PreconnectedWebSocket {
connection: ApiWebSocketConnection,
turn_state: Option<String>,
}
type PreconnectTask = JoinHandle<Option<PreconnectedWebSocket>>;
/// Session-scoped state shared by all [`ModelClient`] clones.
///
/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most
@@ -137,11 +132,8 @@ struct ModelClientState {
include_timing_metrics: bool,
beta_features_header: Option<String>,
disable_websockets: AtomicBool,
/// Session-scoped preconnect lifecycle state.
///
/// This keeps startup preconnect task tracking and warmed-socket adoption in one lock so
/// turn-time websocket setup observes a single, coherent state.
preconnect: Mutex<PreconnectState>,
preconnect: Mutex<Option<PreconnectTask>>,
}
impl std::fmt::Debug for ModelClientState {
@@ -181,28 +173,6 @@ struct CurrentClientSetup {
api_auth: CoreAuthProvider,
}
/// One-shot preconnected websocket slot consumed by the next turn.
///
/// This bundles the socket with optional sticky-routing state captured during
/// handshake so they are taken and cleared atomically.
struct PreconnectedWebSocket {
connection: ApiWebSocketConnection,
turn_state: Option<String>,
}
/// Session-level lifecycle of startup websocket preconnect.
///
/// `InFlight` tracks the startup task so the first turn can await it and reuse the same socket.
/// `Ready` stores a one-shot warmed socket for turn adoption.
enum PreconnectState {
/// No startup preconnect task is active and no warmed socket is available.
Idle,
/// Startup preconnect is currently running; first turn may await this task.
InFlight(JoinHandle<()>),
/// Startup preconnect finished and produced a one-shot warmed socket.
Ready(PreconnectedWebSocket),
}
/// A session-scoped client for model-provider API calls.
///
/// This holds configuration and state that should be shared across turns within a Codex session
@@ -215,8 +185,6 @@ enum PreconnectState {
/// Turn-scoped settings (model selection, reasoning controls, telemetry context, and turn
/// metadata) are passed explicitly to the relevant methods to keep turn lifetime visible at the
/// call site.
///
/// This type is cheap to clone.
#[derive(Debug, Clone)]
pub struct ModelClient {
state: Arc<ModelClientState>,
@@ -288,7 +256,7 @@ impl ModelClient {
include_timing_metrics,
beta_features_header,
disable_websockets: AtomicBool::new(false),
preconnect: Mutex::new(PreconnectState::Idle),
preconnect: Mutex::new(None),
}),
}
}
@@ -315,53 +283,57 @@ impl ModelClient {
///
/// A timeout when computing turn metadata is treated the same as "no metadata" so startup
/// cannot block indefinitely on optional preconnect context.
pub fn pre_establish_connection(&self, otel_manager: OtelManager, cwd: PathBuf) {
pub fn pre_establish_connection(
&self,
otel_manager: OtelManager,
turn_metadata_header: BoxFuture<'static, Option<String>>,
) {
if !self.responses_websocket_enabled() || self.disable_websockets() {
return;
}
let model_client = self.clone();
let handle = tokio::spawn(async move {
let turn_metadata_header = resolve_turn_metadata_header_with_timeout(
build_turn_metadata_header(cwd.as_path()),
None,
)
.await;
let _ = model_client
let turn_metadata_header = turn_metadata_header.await;
model_client
.preconnect(&otel_manager, turn_metadata_header.as_deref())
.await;
.await
});
self.store_preconnect_task(handle);
self.set_preconnected_task(Some(handle));
}
/// Opportunistically pre-establishes a Responses WebSocket connection for this session.
///
/// This method is best-effort: it returns `false` on any setup/connect failure and the caller
/// should continue normally. A successful preconnect reduces first-turn latency but never sends
/// an initial prompt; the first `response.create` is still sent only when a turn starts.
/// This method is best-effort: it returns an error on setup/connect failure and the caller
/// can decide whether to ignore it. A successful preconnect reduces first-turn latency but
/// never sends an initial prompt; the first `response.create` is still sent only when a turn
/// starts.
///
/// The preconnected slot is single-consumer and single-use: the next `ModelClientSession` may
/// adopt it once, after which later turns either keep using that same turn-local connection or
/// create a new one.
pub async fn preconnect(
async fn preconnect(
&self,
otel_manager: &OtelManager,
turn_metadata_header: Option<&str>,
) -> bool {
) -> Option<PreconnectedWebSocket> {
if !self.responses_websocket_enabled() || self.disable_websockets() {
return false;
return None;
}
let client_setup = match self.current_client_setup().await {
Ok(client_setup) => client_setup,
Err(err) => {
warn!("failed to build websocket preconnect client setup: {err}");
return false;
}
};
let turn_state = Arc::new(OnceLock::new());
let client_setup = self
.current_client_setup()
.await
.map_err(|err| {
ApiError::Stream(format!(
"failed to build websocket preconnect client setup: {err}"
))
})
.ok()?;
match self
let turn_state = Arc::new(OnceLock::new());
let connection = self
.connect_websocket(
otel_manager,
client_setup.api_provider,
@@ -370,16 +342,12 @@ impl ModelClient {
turn_metadata_header,
)
.await
{
Ok(connection) => {
self.store_preconnected_websocket(connection, turn_state.get().cloned());
true
}
Err(err) => {
debug!("websocket preconnect failed: {err}");
false
}
}
.ok()?;
Some(PreconnectedWebSocket {
connection,
turn_state: turn_state.get().cloned(),
})
}
/// Compacts the current conversation history using the Compact endpoint.
@@ -576,132 +544,26 @@ impl ModelClient {
headers
}
/// Consumes the warmed websocket slot.
fn take_preconnected_websocket(&self) -> Option<PreconnectedWebSocket> {
/// Consumes the warmed websocket task slot.
fn take_preconnected_task(&self) -> Option<PreconnectTask> {
let mut state = self
.state
.preconnect
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let previous = std::mem::replace(&mut *state, PreconnectState::Idle);
match previous {
PreconnectState::Ready(preconnected) => Some(preconnected),
other => {
*state = other;
None
}
}
state.take()
}
/// Stores a freshly preconnected websocket and optional captured turn-state token.
///
/// This overwrites any previously warmed socket because only one preconnect candidate is kept.
fn store_preconnected_websocket(
&self,
connection: ApiWebSocketConnection,
turn_state: Option<String>,
) {
fn set_preconnected_task(&self, task: Option<PreconnectTask>) {
let mut state = self
.state
.preconnect
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if self.disable_websockets() {
debug!("discarding startup websocket preconnect because websocket fallback is active");
*state = PreconnectState::Idle;
return;
}
*state = PreconnectState::Ready(PreconnectedWebSocket {
connection,
turn_state,
});
}
/// Stores the latest startup preconnect task handle.
///
/// If a previous task is still running, it is aborted so only one in-flight startup attempt
/// is tracked.
fn store_preconnect_task(&self, task: JoinHandle<()>) {
let mut task = Some(task);
let previous_in_flight = {
let mut state = self
.state
.preconnect
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
match &*state {
// A very fast startup preconnect can complete before this method stores the
// task handle; keep the warmed socket and drop the now-useless handle.
PreconnectState::Ready(_) => None,
_ => match task.take() {
Some(next_task) => {
match std::mem::replace(&mut *state, PreconnectState::InFlight(next_task)) {
PreconnectState::InFlight(previous) => Some(previous),
_ => None,
}
}
None => None,
},
}
};
if let Some(previous) = previous_in_flight {
previous.abort();
}
if let Some(task) = task {
task.abort();
}
}
/// Awaits the startup preconnect task once, if one is currently tracked.
///
/// This lets the first turn treat startup preconnect as the first websocket connection
/// attempt, avoiding a redundant second connect while the preconnect attempt is in flight.
///
/// This await intentionally has no separate timeout wrapper. WebSocket connect handshakes
/// already run without an app-level timeout, so waiting on the in-flight preconnect task does
/// not add a new unbounded wait class; it reuses the same first connection attempt.
async fn await_preconnect_task(&self) {
let task = {
let mut state = self
.state
.preconnect
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let previous = std::mem::replace(&mut *state, PreconnectState::Idle);
match previous {
PreconnectState::InFlight(task) => Some(task),
other => {
*state = other;
None
}
}
};
if let Some(task) = task {
let in_flight = !task.is_finished();
if in_flight {
debug!("awaiting startup websocket preconnect before opening a new websocket");
}
if let Err(err) = task.await {
debug!("startup websocket preconnect task failed: {err}");
}
}
}
/// Clears all startup preconnect state.
///
/// This aborts any in-flight startup preconnect task and drops any warmed socket.
fn clear_preconnect(&self) {
let previous = {
let mut state = self
.state
.preconnect
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
std::mem::replace(&mut *state, PreconnectState::Idle)
};
if let PreconnectState::InFlight(task) = previous {
task.abort();
if let Some(running_task) = state.take() {
running_task.abort();
}
*state = task;
}
}
@@ -921,15 +783,24 @@ impl ModelClientSession {
options: &ApiResponsesOptions,
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
// Prefer the session-level preconnect slot before creating a new websocket.
if self.connection.is_none() {
if let Some(preconnected) = self.try_use_preconnected_websocket() {
self.adopt_preconnected_websocket(preconnected);
} else {
self.client.await_preconnect_task().await;
if let Some(preconnected) = self.try_use_preconnected_websocket() {
self.adopt_preconnected_websocket(preconnected);
if self.connection.is_none()
&& let Some(task) = self.client.take_preconnected_task()
{
match task.await {
Ok(Some(preconnected)) => {
let PreconnectedWebSocket {
connection,
turn_state,
} = preconnected;
if let Some(turn_state) = turn_state {
let _ = self.turn_state.set(turn_state);
}
self.connection = Some(connection);
}
}
_ => {
warn!("startup websocket preconnect task failed");
}
};
}
let needs_new = match self.connection.as_ref() {
@@ -938,7 +809,6 @@ impl ModelClientSession {
};
if needs_new {
self.client.clear_preconnect();
self.websocket_last_items.clear();
self.websocket_last_response_id = None;
self.websocket_last_response_id_rx = None;
@@ -964,33 +834,6 @@ impl ModelClientSession {
))
}
/// Adopts the session-level preconnect slot for this turn.
///
/// If a turn-local connection already exists, this intentionally does nothing to avoid
/// replacing an active connection mid-turn.
fn try_use_preconnected_websocket(&mut self) -> Option<PreconnectedWebSocket> {
if self.connection.is_some() {
return None;
}
self.client.take_preconnected_websocket()
}
/// Moves a preconnected socket into the turn-local connection slot.
///
/// If the preconnect handshake captured sticky-routing turn state, this also seeds the
/// turn-local state lock so all later requests in the turn replay the same token.
fn adopt_preconnected_websocket(&mut self, preconnected: PreconnectedWebSocket) {
let PreconnectedWebSocket {
connection,
turn_state,
} = preconnected;
if let Some(turn_state) = turn_state {
let _ = self.turn_state.set(turn_state);
}
self.connection = Some(connection);
}
fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression {
if self.client.state.enable_request_compression
&& auth.is_some_and(CodexAuth::is_chatgpt_auth)
@@ -1223,10 +1066,6 @@ impl ModelClientSession {
/// the HTTP transport. It also clears any warmed websocket preconnect state so future turns
/// cannot accidentally adopt a stale socket after fallback has been activated.
///
/// Startup preconnect handshakes are intentionally not counted against `stream_max_retries`.
/// See [`crate::client`] module docs ("Retry-Budget Tradeoff") for rationale and future
/// alternatives.
///
/// Returns `true` if this call activated fallback, or `false` if fallback was already active.
pub(crate) fn try_switch_fallback_transport(&mut self, otel_manager: &OtelManager) -> bool {
let websocket_enabled = self.client.responses_websocket_enabled();
@@ -1239,9 +1078,9 @@ impl ModelClientSession {
&[("from_wire_api", "responses_websocket")],
);
self.client.set_preconnected_task(None);
self.connection = None;
self.websocket_last_items.clear();
self.client.clear_preconnect();
}
activated
}

View File

@@ -558,8 +558,9 @@ impl TurnContext {
}
async fn build_turn_metadata_header(&self) -> Option<String> {
let cwd = self.cwd.clone();
self.turn_metadata_header
.get_or_init(|| async { build_turn_metadata_header(self.cwd.as_path()).await })
.get_or_init(|| async { build_turn_metadata_header(cwd).await })
.await
.clone()
}
@@ -1099,7 +1100,11 @@ impl Session {
// when submit_turn() runs.
sess.services.model_client.pre_establish_connection(
sess.services.otel_manager.clone(),
session_configuration.cwd.clone(),
resolve_turn_metadata_header_with_timeout(
build_turn_metadata_header(session_configuration.cwd.clone()),
None,
)
.boxed(),
);
// Dispatch the SessionConfiguredEvent first and then report any errors.

View File

@@ -6,7 +6,7 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use serde::Serialize;
@@ -16,7 +16,6 @@ use crate::git_info::get_git_remote_urls_assume_git_repo;
use crate::git_info::get_git_repo_root;
use crate::git_info::get_head_commit_hash;
/// Timeout used when resolving the optional turn-metadata header.
pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(250);
/// Resolves turn metadata with a shared timeout policy.
@@ -57,7 +56,8 @@ struct TurnMetadata {
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
}
pub async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
pub async fn build_turn_metadata_header(cwd: PathBuf) -> Option<String> {
let cwd = cwd.as_path();
let repo_root = get_git_repo_root(cwd)?;
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(

View File

@@ -30,6 +30,7 @@ use core_test_support::responses::ev_response_created;
use core_test_support::responses::start_websocket_server;
use core_test_support::responses::start_websocket_server_with_headers;
use core_test_support::skip_if_no_network;
use futures::FutureExt;
use futures::StreamExt;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use pretty_assertions::assert_eq;
@@ -97,7 +98,9 @@ async fn responses_websocket_preconnect_reuses_connection() {
.await;
let harness = websocket_harness(&server).await;
assert!(harness.client.preconnect(&harness.otel_manager, None).await);
harness
.client
.pre_establish_connection(harness.otel_manager.clone(), async { None }.boxed());
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
@@ -120,7 +123,9 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
.await;
let harness = websocket_harness(&server).await;
assert!(harness.client.preconnect(&harness.otel_manager, None).await);
harness
.client
.pre_establish_connection(harness.otel_manager.clone(), async { None }.boxed());
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);

View File

@@ -3,6 +3,7 @@ use codex_app_server_protocol::ConfigLayerSource;
use codex_core::config::Config;
use codex_core::config_loader::ConfigLayerStack;
use codex_core::config_loader::ConfigLayerStackOrdering;
use codex_core::config_loader::NetworkConstraints;
use codex_core::config_loader::RequirementSource;
use codex_core::config_loader::ResidencyRequirement;
use codex_core::config_loader::SandboxModeRequirement;
@@ -115,6 +116,14 @@ fn render_debug_config_lines(stack: &ConfigLayerStack) -> Vec<Line<'static>> {
));
}
if let Some(network) = requirements.network.as_ref() {
requirement_lines.push(requirement_line(
"experimental_network",
format_network_constraints(&network.value),
Some(&network.source),
));
}
if requirement_lines.is_empty() {
lines.push(" <none>".dim().into());
} else {
@@ -199,6 +208,63 @@ fn format_residency_requirement(requirement: ResidencyRequirement) -> String {
}
}
fn format_network_constraints(network: &NetworkConstraints) -> String {
let mut parts = Vec::new();
let NetworkConstraints {
enabled,
http_port,
socks_port,
allow_upstream_proxy,
dangerously_allow_non_loopback_proxy,
dangerously_allow_non_loopback_admin,
allowed_domains,
denied_domains,
allow_unix_sockets,
allow_local_binding,
} = network;
if let Some(enabled) = enabled {
parts.push(format!("enabled={enabled}"));
}
if let Some(http_port) = http_port {
parts.push(format!("http_port={http_port}"));
}
if let Some(socks_port) = socks_port {
parts.push(format!("socks_port={socks_port}"));
}
if let Some(allow_upstream_proxy) = allow_upstream_proxy {
parts.push(format!("allow_upstream_proxy={allow_upstream_proxy}"));
}
if let Some(dangerously_allow_non_loopback_proxy) = dangerously_allow_non_loopback_proxy {
parts.push(format!(
"dangerously_allow_non_loopback_proxy={dangerously_allow_non_loopback_proxy}"
));
}
if let Some(dangerously_allow_non_loopback_admin) = dangerously_allow_non_loopback_admin {
parts.push(format!(
"dangerously_allow_non_loopback_admin={dangerously_allow_non_loopback_admin}"
));
}
if let Some(allowed_domains) = allowed_domains {
parts.push(format!("allowed_domains=[{}]", allowed_domains.join(", ")));
}
if let Some(denied_domains) = denied_domains {
parts.push(format!("denied_domains=[{}]", denied_domains.join(", ")));
}
if let Some(allow_unix_sockets) = allow_unix_sockets {
parts.push(format!(
"allow_unix_sockets=[{}]",
allow_unix_sockets.join(", ")
));
}
if let Some(allow_local_binding) = allow_local_binding {
parts.push(format!("allow_local_binding={allow_local_binding}"));
}
join_or_empty(parts)
}
#[cfg(test)]
mod tests {
use super::render_debug_config_lines;
@@ -211,6 +277,7 @@ mod tests {
use codex_core::config_loader::ConstrainedWithSource;
use codex_core::config_loader::McpServerIdentity;
use codex_core::config_loader::McpServerRequirement;
use codex_core::config_loader::NetworkConstraints;
use codex_core::config_loader::RequirementSource;
use codex_core::config_loader::ResidencyRequirement;
use codex_core::config_loader::SandboxModeRequirement;
@@ -323,6 +390,14 @@ mod tests {
Constrained::allow_any(WebSearchMode::Cached),
Some(RequirementSource::CloudRequirements),
);
requirements.network = Some(Sourced::new(
NetworkConstraints {
enabled: Some(true),
allowed_domains: Some(vec!["example.com".to_string()]),
..Default::default()
},
RequirementSource::CloudRequirements,
));
let requirements_toml = ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::OnRequest]),
@@ -376,6 +451,9 @@ mod tests {
);
assert!(rendered.contains("mcp_servers: docs (source: MDM managed_config.toml (legacy))"));
assert!(rendered.contains("enforce_residency: us (source: cloud requirements)"));
assert!(rendered.contains(
"experimental_network: enabled=true, allowed_domains=[example.com] (source: cloud requirements)"
));
assert!(!rendered.contains(" - rules:"));
}