mirror of
https://github.com/openai/codex.git
synced 2026-02-08 01:43:46 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fe5066bcc | ||
|
|
2e89cb9117 |
@@ -16,26 +16,17 @@
|
||||
//! Preconnect is intentionally handshake-only: it may warm a socket and capture sticky-routing
|
||||
//! state, but the first `response.create` payload is still sent only when a turn starts.
|
||||
//!
|
||||
//! Internally, startup preconnect and warmed-socket adoption share one session-level lifecycle:
|
||||
//! `Idle` (no task/socket), `InFlight` (startup preconnect task running), and `Ready` (one-shot
|
||||
//! warmed socket available). On first use in a turn, the session tries to adopt `Ready`; if not
|
||||
//! ready, it awaits `InFlight` and retries adoption before opening a new websocket. This prevents
|
||||
//! racing duplicate first-turn handshakes while keeping preconnect best-effort.
|
||||
//! Internally, startup preconnect stores a single task handle. On first use in a turn, the session
|
||||
//! awaits that task and adopts the warmed socket if it succeeds; if it fails, the stream attempt
|
||||
//! fails and the normal retry/fallback loop decides what to do next.
|
||||
//!
|
||||
//! ## Retry-Budget Tradeoff
|
||||
//!
|
||||
//! `stream_max_retries` applies to retryable turn stream failures, not to background startup
|
||||
//! preconnect handshakes. In failure cases this can produce two websocket handshakes on the first
|
||||
//! turn (startup preconnect, then turn-time connect) before HTTP fallback becomes sticky. We keep
|
||||
//! this split intentionally so opportunistic preconnect cannot consume the user-visible stream
|
||||
//! retry budget before any turn payload is sent.
|
||||
//!
|
||||
//! If this policy needs to change later, preconnect can be modeled as an explicit first connection
|
||||
//! attempt in the same retry budget as turn streaming. That would require plumbing websocket
|
||||
//! attempt accounting from connection acquisition into the turn retry loop and updating fallback
|
||||
//! expectations/tests accordingly.
|
||||
//! Startup preconnect is treated as the first websocket connection attempt for the first turn. If
|
||||
//! it fails, the stream attempt fails and the retry/fallback loop decides whether to retry or fall
|
||||
//! back. This avoids duplicate handshakes but means a failed preconnect can consume one retry
|
||||
//! budget slot before any turn payload is sent.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
@@ -82,6 +73,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use eventsource_stream::Event;
|
||||
use eventsource_stream::EventStreamError;
|
||||
use futures::StreamExt;
|
||||
use futures::future::BoxFuture;
|
||||
use http::HeaderMap as ApiHeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode as HttpStatusCode;
|
||||
@@ -94,7 +86,6 @@ use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
@@ -110,8 +101,6 @@ use crate::flags::CODEX_RS_SSE_FIXTURE;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::tools::spec::create_tools_json_for_responses_api;
|
||||
use crate::turn_metadata::build_turn_metadata_header;
|
||||
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
|
||||
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
|
||||
@@ -121,6 +110,12 @@ pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
|
||||
struct PreconnectedWebSocket {
|
||||
connection: ApiWebSocketConnection,
|
||||
turn_state: Option<String>,
|
||||
}
|
||||
|
||||
type PreconnectTask = JoinHandle<Option<PreconnectedWebSocket>>;
|
||||
/// Session-scoped state shared by all [`ModelClient`] clones.
|
||||
///
|
||||
/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most
|
||||
@@ -137,11 +132,8 @@ struct ModelClientState {
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
disable_websockets: AtomicBool,
|
||||
/// Session-scoped preconnect lifecycle state.
|
||||
///
|
||||
/// This keeps startup preconnect task tracking and warmed-socket adoption in one lock so
|
||||
/// turn-time websocket setup observes a single, coherent state.
|
||||
preconnect: Mutex<PreconnectState>,
|
||||
|
||||
preconnect: Mutex<Option<PreconnectTask>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ModelClientState {
|
||||
@@ -181,28 +173,6 @@ struct CurrentClientSetup {
|
||||
api_auth: CoreAuthProvider,
|
||||
}
|
||||
|
||||
/// One-shot preconnected websocket slot consumed by the next turn.
|
||||
///
|
||||
/// This bundles the socket with optional sticky-routing state captured during
|
||||
/// handshake so they are taken and cleared atomically.
|
||||
struct PreconnectedWebSocket {
|
||||
connection: ApiWebSocketConnection,
|
||||
turn_state: Option<String>,
|
||||
}
|
||||
|
||||
/// Session-level lifecycle of startup websocket preconnect.
|
||||
///
|
||||
/// `InFlight` tracks the startup task so the first turn can await it and reuse the same socket.
|
||||
/// `Ready` stores a one-shot warmed socket for turn adoption.
|
||||
enum PreconnectState {
|
||||
/// No startup preconnect task is active and no warmed socket is available.
|
||||
Idle,
|
||||
/// Startup preconnect is currently running; first turn may await this task.
|
||||
InFlight(JoinHandle<()>),
|
||||
/// Startup preconnect finished and produced a one-shot warmed socket.
|
||||
Ready(PreconnectedWebSocket),
|
||||
}
|
||||
|
||||
/// A session-scoped client for model-provider API calls.
|
||||
///
|
||||
/// This holds configuration and state that should be shared across turns within a Codex session
|
||||
@@ -215,8 +185,6 @@ enum PreconnectState {
|
||||
/// Turn-scoped settings (model selection, reasoning controls, telemetry context, and turn
|
||||
/// metadata) are passed explicitly to the relevant methods to keep turn lifetime visible at the
|
||||
/// call site.
|
||||
///
|
||||
/// This type is cheap to clone.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ModelClient {
|
||||
state: Arc<ModelClientState>,
|
||||
@@ -288,7 +256,7 @@ impl ModelClient {
|
||||
include_timing_metrics,
|
||||
beta_features_header,
|
||||
disable_websockets: AtomicBool::new(false),
|
||||
preconnect: Mutex::new(PreconnectState::Idle),
|
||||
preconnect: Mutex::new(None),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -315,53 +283,57 @@ impl ModelClient {
|
||||
///
|
||||
/// A timeout when computing turn metadata is treated the same as "no metadata" so startup
|
||||
/// cannot block indefinitely on optional preconnect context.
|
||||
pub fn pre_establish_connection(&self, otel_manager: OtelManager, cwd: PathBuf) {
|
||||
pub fn pre_establish_connection(
|
||||
&self,
|
||||
otel_manager: OtelManager,
|
||||
turn_metadata_header: BoxFuture<'static, Option<String>>,
|
||||
) {
|
||||
if !self.responses_websocket_enabled() || self.disable_websockets() {
|
||||
return;
|
||||
}
|
||||
|
||||
let model_client = self.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let turn_metadata_header = resolve_turn_metadata_header_with_timeout(
|
||||
build_turn_metadata_header(cwd.as_path()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let _ = model_client
|
||||
let turn_metadata_header = turn_metadata_header.await;
|
||||
|
||||
model_client
|
||||
.preconnect(&otel_manager, turn_metadata_header.as_deref())
|
||||
.await;
|
||||
.await
|
||||
});
|
||||
self.store_preconnect_task(handle);
|
||||
self.set_preconnected_task(Some(handle));
|
||||
}
|
||||
|
||||
/// Opportunistically pre-establishes a Responses WebSocket connection for this session.
|
||||
///
|
||||
/// This method is best-effort: it returns `false` on any setup/connect failure and the caller
|
||||
/// should continue normally. A successful preconnect reduces first-turn latency but never sends
|
||||
/// an initial prompt; the first `response.create` is still sent only when a turn starts.
|
||||
/// This method is best-effort: it returns an error on setup/connect failure and the caller
|
||||
/// can decide whether to ignore it. A successful preconnect reduces first-turn latency but
|
||||
/// never sends an initial prompt; the first `response.create` is still sent only when a turn
|
||||
/// starts.
|
||||
///
|
||||
/// The preconnected slot is single-consumer and single-use: the next `ModelClientSession` may
|
||||
/// adopt it once, after which later turns either keep using that same turn-local connection or
|
||||
/// create a new one.
|
||||
pub async fn preconnect(
|
||||
async fn preconnect(
|
||||
&self,
|
||||
otel_manager: &OtelManager,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> bool {
|
||||
) -> Option<PreconnectedWebSocket> {
|
||||
if !self.responses_websocket_enabled() || self.disable_websockets() {
|
||||
return false;
|
||||
return None;
|
||||
}
|
||||
|
||||
let client_setup = match self.current_client_setup().await {
|
||||
Ok(client_setup) => client_setup,
|
||||
Err(err) => {
|
||||
warn!("failed to build websocket preconnect client setup: {err}");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
let turn_state = Arc::new(OnceLock::new());
|
||||
let client_setup = self
|
||||
.current_client_setup()
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ApiError::Stream(format!(
|
||||
"failed to build websocket preconnect client setup: {err}"
|
||||
))
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
match self
|
||||
let turn_state = Arc::new(OnceLock::new());
|
||||
let connection = self
|
||||
.connect_websocket(
|
||||
otel_manager,
|
||||
client_setup.api_provider,
|
||||
@@ -370,16 +342,12 @@ impl ModelClient {
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(connection) => {
|
||||
self.store_preconnected_websocket(connection, turn_state.get().cloned());
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("websocket preconnect failed: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
.ok()?;
|
||||
|
||||
Some(PreconnectedWebSocket {
|
||||
connection,
|
||||
turn_state: turn_state.get().cloned(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Compacts the current conversation history using the Compact endpoint.
|
||||
@@ -576,132 +544,26 @@ impl ModelClient {
|
||||
headers
|
||||
}
|
||||
|
||||
/// Consumes the warmed websocket slot.
|
||||
fn take_preconnected_websocket(&self) -> Option<PreconnectedWebSocket> {
|
||||
/// Consumes the warmed websocket task slot.
|
||||
fn take_preconnected_task(&self) -> Option<PreconnectTask> {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let previous = std::mem::replace(&mut *state, PreconnectState::Idle);
|
||||
match previous {
|
||||
PreconnectState::Ready(preconnected) => Some(preconnected),
|
||||
other => {
|
||||
*state = other;
|
||||
None
|
||||
}
|
||||
}
|
||||
state.take()
|
||||
}
|
||||
|
||||
/// Stores a freshly preconnected websocket and optional captured turn-state token.
|
||||
///
|
||||
/// This overwrites any previously warmed socket because only one preconnect candidate is kept.
|
||||
fn store_preconnected_websocket(
|
||||
&self,
|
||||
connection: ApiWebSocketConnection,
|
||||
turn_state: Option<String>,
|
||||
) {
|
||||
fn set_preconnected_task(&self, task: Option<PreconnectTask>) {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if self.disable_websockets() {
|
||||
debug!("discarding startup websocket preconnect because websocket fallback is active");
|
||||
*state = PreconnectState::Idle;
|
||||
return;
|
||||
}
|
||||
*state = PreconnectState::Ready(PreconnectedWebSocket {
|
||||
connection,
|
||||
turn_state,
|
||||
});
|
||||
}
|
||||
|
||||
/// Stores the latest startup preconnect task handle.
|
||||
///
|
||||
/// If a previous task is still running, it is aborted so only one in-flight startup attempt
|
||||
/// is tracked.
|
||||
fn store_preconnect_task(&self, task: JoinHandle<()>) {
|
||||
let mut task = Some(task);
|
||||
let previous_in_flight = {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
match &*state {
|
||||
// A very fast startup preconnect can complete before this method stores the
|
||||
// task handle; keep the warmed socket and drop the now-useless handle.
|
||||
PreconnectState::Ready(_) => None,
|
||||
_ => match task.take() {
|
||||
Some(next_task) => {
|
||||
match std::mem::replace(&mut *state, PreconnectState::InFlight(next_task)) {
|
||||
PreconnectState::InFlight(previous) => Some(previous),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
},
|
||||
}
|
||||
};
|
||||
if let Some(previous) = previous_in_flight {
|
||||
previous.abort();
|
||||
}
|
||||
if let Some(task) = task {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// Awaits the startup preconnect task once, if one is currently tracked.
|
||||
///
|
||||
/// This lets the first turn treat startup preconnect as the first websocket connection
|
||||
/// attempt, avoiding a redundant second connect while the preconnect attempt is in flight.
|
||||
///
|
||||
/// This await intentionally has no separate timeout wrapper. WebSocket connect handshakes
|
||||
/// already run without an app-level timeout, so waiting on the in-flight preconnect task does
|
||||
/// not add a new unbounded wait class; it reuses the same first connection attempt.
|
||||
async fn await_preconnect_task(&self) {
|
||||
let task = {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let previous = std::mem::replace(&mut *state, PreconnectState::Idle);
|
||||
match previous {
|
||||
PreconnectState::InFlight(task) => Some(task),
|
||||
other => {
|
||||
*state = other;
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(task) = task {
|
||||
let in_flight = !task.is_finished();
|
||||
if in_flight {
|
||||
debug!("awaiting startup websocket preconnect before opening a new websocket");
|
||||
}
|
||||
if let Err(err) = task.await {
|
||||
debug!("startup websocket preconnect task failed: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears all startup preconnect state.
|
||||
///
|
||||
/// This aborts any in-flight startup preconnect task and drops any warmed socket.
|
||||
fn clear_preconnect(&self) {
|
||||
let previous = {
|
||||
let mut state = self
|
||||
.state
|
||||
.preconnect
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
std::mem::replace(&mut *state, PreconnectState::Idle)
|
||||
};
|
||||
if let PreconnectState::InFlight(task) = previous {
|
||||
task.abort();
|
||||
if let Some(running_task) = state.take() {
|
||||
running_task.abort();
|
||||
}
|
||||
*state = task;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -921,15 +783,24 @@ impl ModelClientSession {
|
||||
options: &ApiResponsesOptions,
|
||||
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
||||
// Prefer the session-level preconnect slot before creating a new websocket.
|
||||
if self.connection.is_none() {
|
||||
if let Some(preconnected) = self.try_use_preconnected_websocket() {
|
||||
self.adopt_preconnected_websocket(preconnected);
|
||||
} else {
|
||||
self.client.await_preconnect_task().await;
|
||||
if let Some(preconnected) = self.try_use_preconnected_websocket() {
|
||||
self.adopt_preconnected_websocket(preconnected);
|
||||
if self.connection.is_none()
|
||||
&& let Some(task) = self.client.take_preconnected_task()
|
||||
{
|
||||
match task.await {
|
||||
Ok(Some(preconnected)) => {
|
||||
let PreconnectedWebSocket {
|
||||
connection,
|
||||
turn_state,
|
||||
} = preconnected;
|
||||
if let Some(turn_state) = turn_state {
|
||||
let _ = self.turn_state.set(turn_state);
|
||||
}
|
||||
self.connection = Some(connection);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
warn!("startup websocket preconnect task failed");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let needs_new = match self.connection.as_ref() {
|
||||
@@ -938,7 +809,6 @@ impl ModelClientSession {
|
||||
};
|
||||
|
||||
if needs_new {
|
||||
self.client.clear_preconnect();
|
||||
self.websocket_last_items.clear();
|
||||
self.websocket_last_response_id = None;
|
||||
self.websocket_last_response_id_rx = None;
|
||||
@@ -964,33 +834,6 @@ impl ModelClientSession {
|
||||
))
|
||||
}
|
||||
|
||||
/// Adopts the session-level preconnect slot for this turn.
|
||||
///
|
||||
/// If a turn-local connection already exists, this intentionally does nothing to avoid
|
||||
/// replacing an active connection mid-turn.
|
||||
fn try_use_preconnected_websocket(&mut self) -> Option<PreconnectedWebSocket> {
|
||||
if self.connection.is_some() {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.client.take_preconnected_websocket()
|
||||
}
|
||||
|
||||
/// Moves a preconnected socket into the turn-local connection slot.
|
||||
///
|
||||
/// If the preconnect handshake captured sticky-routing turn state, this also seeds the
|
||||
/// turn-local state lock so all later requests in the turn replay the same token.
|
||||
fn adopt_preconnected_websocket(&mut self, preconnected: PreconnectedWebSocket) {
|
||||
let PreconnectedWebSocket {
|
||||
connection,
|
||||
turn_state,
|
||||
} = preconnected;
|
||||
if let Some(turn_state) = turn_state {
|
||||
let _ = self.turn_state.set(turn_state);
|
||||
}
|
||||
self.connection = Some(connection);
|
||||
}
|
||||
|
||||
fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression {
|
||||
if self.client.state.enable_request_compression
|
||||
&& auth.is_some_and(CodexAuth::is_chatgpt_auth)
|
||||
@@ -1223,10 +1066,6 @@ impl ModelClientSession {
|
||||
/// the HTTP transport. It also clears any warmed websocket preconnect state so future turns
|
||||
/// cannot accidentally adopt a stale socket after fallback has been activated.
|
||||
///
|
||||
/// Startup preconnect handshakes are intentionally not counted against `stream_max_retries`.
|
||||
/// See [`crate::client`] module docs ("Retry-Budget Tradeoff") for rationale and future
|
||||
/// alternatives.
|
||||
///
|
||||
/// Returns `true` if this call activated fallback, or `false` if fallback was already active.
|
||||
pub(crate) fn try_switch_fallback_transport(&mut self, otel_manager: &OtelManager) -> bool {
|
||||
let websocket_enabled = self.client.responses_websocket_enabled();
|
||||
@@ -1239,9 +1078,9 @@ impl ModelClientSession {
|
||||
&[("from_wire_api", "responses_websocket")],
|
||||
);
|
||||
|
||||
self.client.set_preconnected_task(None);
|
||||
self.connection = None;
|
||||
self.websocket_last_items.clear();
|
||||
self.client.clear_preconnect();
|
||||
}
|
||||
activated
|
||||
}
|
||||
|
||||
@@ -558,8 +558,9 @@ impl TurnContext {
|
||||
}
|
||||
|
||||
async fn build_turn_metadata_header(&self) -> Option<String> {
|
||||
let cwd = self.cwd.clone();
|
||||
self.turn_metadata_header
|
||||
.get_or_init(|| async { build_turn_metadata_header(self.cwd.as_path()).await })
|
||||
.get_or_init(|| async { build_turn_metadata_header(cwd).await })
|
||||
.await
|
||||
.clone()
|
||||
}
|
||||
@@ -1099,7 +1100,11 @@ impl Session {
|
||||
// when submit_turn() runs.
|
||||
sess.services.model_client.pre_establish_connection(
|
||||
sess.services.otel_manager.clone(),
|
||||
session_configuration.cwd.clone(),
|
||||
resolve_turn_metadata_header_with_timeout(
|
||||
build_turn_metadata_header(session_configuration.cwd.clone()),
|
||||
None,
|
||||
)
|
||||
.boxed(),
|
||||
);
|
||||
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::Serialize;
|
||||
@@ -16,7 +16,6 @@ use crate::git_info::get_git_remote_urls_assume_git_repo;
|
||||
use crate::git_info::get_git_repo_root;
|
||||
use crate::git_info::get_head_commit_hash;
|
||||
|
||||
/// Timeout used when resolving the optional turn-metadata header.
|
||||
pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Resolves turn metadata with a shared timeout policy.
|
||||
@@ -57,7 +56,8 @@ struct TurnMetadata {
|
||||
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
|
||||
}
|
||||
|
||||
pub async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
|
||||
pub async fn build_turn_metadata_header(cwd: PathBuf) -> Option<String> {
|
||||
let cwd = cwd.as_path();
|
||||
let repo_root = get_git_repo_root(cwd)?;
|
||||
|
||||
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(
|
||||
|
||||
@@ -30,6 +30,7 @@ use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::responses::start_websocket_server_with_headers;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -97,7 +98,9 @@ async fn responses_websocket_preconnect_reuses_connection() {
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
assert!(harness.client.preconnect(&harness.otel_manager, None).await);
|
||||
harness
|
||||
.client
|
||||
.pre_establish_connection(harness.otel_manager.clone(), async { None }.boxed());
|
||||
|
||||
let mut client_session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
@@ -120,7 +123,9 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
assert!(harness.client.preconnect(&harness.otel_manager, None).await);
|
||||
harness
|
||||
.client
|
||||
.pre_establish_connection(harness.otel_manager.clone(), async { None }.boxed());
|
||||
|
||||
let mut client_session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
@@ -3,6 +3,7 @@ use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::ConfigLayerStack;
|
||||
use codex_core::config_loader::ConfigLayerStackOrdering;
|
||||
use codex_core::config_loader::NetworkConstraints;
|
||||
use codex_core::config_loader::RequirementSource;
|
||||
use codex_core::config_loader::ResidencyRequirement;
|
||||
use codex_core::config_loader::SandboxModeRequirement;
|
||||
@@ -115,6 +116,14 @@ fn render_debug_config_lines(stack: &ConfigLayerStack) -> Vec<Line<'static>> {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(network) = requirements.network.as_ref() {
|
||||
requirement_lines.push(requirement_line(
|
||||
"experimental_network",
|
||||
format_network_constraints(&network.value),
|
||||
Some(&network.source),
|
||||
));
|
||||
}
|
||||
|
||||
if requirement_lines.is_empty() {
|
||||
lines.push(" <none>".dim().into());
|
||||
} else {
|
||||
@@ -199,6 +208,63 @@ fn format_residency_requirement(requirement: ResidencyRequirement) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
fn format_network_constraints(network: &NetworkConstraints) -> String {
|
||||
let mut parts = Vec::new();
|
||||
|
||||
let NetworkConstraints {
|
||||
enabled,
|
||||
http_port,
|
||||
socks_port,
|
||||
allow_upstream_proxy,
|
||||
dangerously_allow_non_loopback_proxy,
|
||||
dangerously_allow_non_loopback_admin,
|
||||
allowed_domains,
|
||||
denied_domains,
|
||||
allow_unix_sockets,
|
||||
allow_local_binding,
|
||||
} = network;
|
||||
|
||||
if let Some(enabled) = enabled {
|
||||
parts.push(format!("enabled={enabled}"));
|
||||
}
|
||||
if let Some(http_port) = http_port {
|
||||
parts.push(format!("http_port={http_port}"));
|
||||
}
|
||||
if let Some(socks_port) = socks_port {
|
||||
parts.push(format!("socks_port={socks_port}"));
|
||||
}
|
||||
if let Some(allow_upstream_proxy) = allow_upstream_proxy {
|
||||
parts.push(format!("allow_upstream_proxy={allow_upstream_proxy}"));
|
||||
}
|
||||
if let Some(dangerously_allow_non_loopback_proxy) = dangerously_allow_non_loopback_proxy {
|
||||
parts.push(format!(
|
||||
"dangerously_allow_non_loopback_proxy={dangerously_allow_non_loopback_proxy}"
|
||||
));
|
||||
}
|
||||
if let Some(dangerously_allow_non_loopback_admin) = dangerously_allow_non_loopback_admin {
|
||||
parts.push(format!(
|
||||
"dangerously_allow_non_loopback_admin={dangerously_allow_non_loopback_admin}"
|
||||
));
|
||||
}
|
||||
if let Some(allowed_domains) = allowed_domains {
|
||||
parts.push(format!("allowed_domains=[{}]", allowed_domains.join(", ")));
|
||||
}
|
||||
if let Some(denied_domains) = denied_domains {
|
||||
parts.push(format!("denied_domains=[{}]", denied_domains.join(", ")));
|
||||
}
|
||||
if let Some(allow_unix_sockets) = allow_unix_sockets {
|
||||
parts.push(format!(
|
||||
"allow_unix_sockets=[{}]",
|
||||
allow_unix_sockets.join(", ")
|
||||
));
|
||||
}
|
||||
if let Some(allow_local_binding) = allow_local_binding {
|
||||
parts.push(format!("allow_local_binding={allow_local_binding}"));
|
||||
}
|
||||
|
||||
join_or_empty(parts)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::render_debug_config_lines;
|
||||
@@ -211,6 +277,7 @@ mod tests {
|
||||
use codex_core::config_loader::ConstrainedWithSource;
|
||||
use codex_core::config_loader::McpServerIdentity;
|
||||
use codex_core::config_loader::McpServerRequirement;
|
||||
use codex_core::config_loader::NetworkConstraints;
|
||||
use codex_core::config_loader::RequirementSource;
|
||||
use codex_core::config_loader::ResidencyRequirement;
|
||||
use codex_core::config_loader::SandboxModeRequirement;
|
||||
@@ -323,6 +390,14 @@ mod tests {
|
||||
Constrained::allow_any(WebSearchMode::Cached),
|
||||
Some(RequirementSource::CloudRequirements),
|
||||
);
|
||||
requirements.network = Some(Sourced::new(
|
||||
NetworkConstraints {
|
||||
enabled: Some(true),
|
||||
allowed_domains: Some(vec!["example.com".to_string()]),
|
||||
..Default::default()
|
||||
},
|
||||
RequirementSource::CloudRequirements,
|
||||
));
|
||||
|
||||
let requirements_toml = ConfigRequirementsToml {
|
||||
allowed_approval_policies: Some(vec![AskForApproval::OnRequest]),
|
||||
@@ -376,6 +451,9 @@ mod tests {
|
||||
);
|
||||
assert!(rendered.contains("mcp_servers: docs (source: MDM managed_config.toml (legacy))"));
|
||||
assert!(rendered.contains("enforce_residency: us (source: cloud requirements)"));
|
||||
assert!(rendered.contains(
|
||||
"experimental_network: enabled=true, allowed_domains=[example.com] (source: cloud requirements)"
|
||||
));
|
||||
assert!(!rendered.contains(" - rules:"));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user