From fbb3a309538180ca4c550ad0d8e75f2bfeae3a07 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Thu, 29 Jan 2026 13:50:53 -0800 Subject: [PATCH] Remove WebSocket wire format (#10179) I'd like WireApi to go away (when chat is removed) and WebSockets is still responses API just over a different transport. --- codex-rs/core/config.schema.json | 12 ++++---- codex-rs/core/src/client.rs | 28 +++++++++++++++---- codex-rs/core/src/config/mod.rs | 11 ++------ codex-rs/core/src/model_provider_info.rs | 17 +++++++---- codex-rs/core/src/models_manager/manager.rs | 1 + codex-rs/core/src/transport_manager.rs | 19 ++++--------- .../core/tests/chat_completions_payload.rs | 1 + codex-rs/core/tests/chat_completions_sse.rs | 1 + codex-rs/core/tests/common/test_codex.rs | 3 +- codex-rs/core/tests/responses_headers.rs | 3 ++ codex-rs/core/tests/suite/client.rs | 3 ++ .../core/tests/suite/client_websockets.rs | 5 +++- .../suite/stream_error_allows_next_turn.rs | 1 + .../core/tests/suite/stream_no_completed.rs | 1 + .../core/tests/suite/websocket_fallback.rs | 8 ++++-- 15 files changed, 68 insertions(+), 46 deletions(-) diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 16c0c8945c..fe27f7e24e 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -443,6 +443,11 @@ "minimum": 0.0, "type": "integer" }, + "supports_websockets": { + "default": false, + "description": "Whether this provider supports the Responses API WebSocket transport.", + "type": "boolean" + }, "wire_api": { "allOf": [ { @@ -1081,13 +1086,6 @@ ], "type": "string" }, - { - "description": "Experimental: Responses API over WebSocket transport.", - "enum": [ - "responses_websocket" - ], - "type": "string" - }, { "description": "Regular Chat Completions compatible with `/v1/chat/completions`.", "enum": [ diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 98fce871fd..4cb37a46d4 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -260,12 +260,18 @@ impl ModelClientSession { /// For Chat providers, the underlying stream is optionally aggregated /// based on the `show_raw_agent_reasoning` flag in the config. pub async fn stream(&mut self, prompt: &Prompt) -> Result { - let wire_api = self - .transport_manager - .effective_wire_api(self.state.provider.wire_api); + let wire_api = self.state.provider.wire_api; match wire_api { - WireApi::Responses => self.stream_responses_api(prompt).await, - WireApi::ResponsesWebsocket => self.stream_responses_websocket(prompt).await, + WireApi::Responses => { + let websocket_enabled = self.responses_websocket_enabled() + && !self.transport_manager.disable_websockets(); + + if websocket_enabled { + self.stream_responses_websocket(prompt).await + } else { + self.stream_responses_api(prompt).await + } + } WireApi::Chat => { let api_stream = self.stream_chat_completions(prompt).await?; @@ -285,9 +291,10 @@ impl ModelClientSession { } pub(crate) fn try_switch_fallback_transport(&mut self) -> bool { + let websocket_enabled = self.responses_websocket_enabled(); let activated = self .transport_manager - .activate_http_fallback(self.state.provider.wire_api); + .activate_http_fallback(websocket_enabled); if activated { warn!("falling back to HTTP"); self.state.otel_manager.counter( @@ -302,6 +309,15 @@ impl ModelClientSession { activated } + fn responses_websocket_enabled(&self) -> bool { + self.state.provider.supports_websockets + && self + .state + .config + .features + .enabled(Feature::ResponsesWebsockets) + } + fn build_responses_request(&self, prompt: &Prompt) -> Result { let instructions = prompt.base_instructions.text.clone(); let tools_json: Vec = create_tools_json_for_responses_api(&prompt.tools)?; diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 895e54c040..bc7dba6d71 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -1372,12 +1372,6 @@ impl Config { || cfg.sandbox_mode.is_some(); let mut model_providers = built_in_model_providers(); - if features.enabled(Feature::ResponsesWebsockets) - && let Some(provider) = model_providers.get_mut("openai") - && provider.is_openai() - { - provider.wire_api = crate::model_provider_info::WireApi::ResponsesWebsocket; - } // Merge user-defined providers into the built-in list. for (key, provider) in cfg.model_providers.into_iter() { model_providers.entry(key).or_insert(provider); @@ -2555,7 +2549,7 @@ profile = "project" } #[test] - fn responses_websockets_feature_updates_openai_provider() -> std::io::Result<()> { + fn responses_websockets_feature_does_not_change_wire_api() -> std::io::Result<()> { let codex_home = TempDir::new()?; let mut entries = BTreeMap::new(); entries.insert("responses_websockets".to_string(), true); @@ -2572,7 +2566,7 @@ profile = "project" assert_eq!( config.model_provider.wire_api, - crate::model_provider_info::WireApi::ResponsesWebsocket + crate::model_provider_info::WireApi::Responses ); Ok(()) @@ -3692,6 +3686,7 @@ model_verbosity = "high" stream_max_retries: Some(10), stream_idle_timeout_ms: Some(300_000), requires_openai_auth: false, + supports_websockets: false, }; let model_provider_map = { let mut model_provider_map = built_in_model_providers(); diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index b1422458ca..4f69f02d91 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -43,10 +43,6 @@ pub enum WireApi { /// The Responses API exposed by OpenAI at `/v1/responses`. Responses, - /// Experimental: Responses API over WebSocket transport. - #[serde(rename = "responses_websocket")] - ResponsesWebsocket, - /// Regular Chat Completions compatible with `/v1/chat/completions`. #[default] Chat, @@ -105,6 +101,10 @@ pub struct ModelProviderInfo { /// and API key (if needed) comes from the "env_key" environment variable. #[serde(default)] pub requires_openai_auth: bool, + + /// Whether this provider supports the Responses API WebSocket transport. + #[serde(default)] + pub supports_websockets: bool, } impl ModelProviderInfo { @@ -162,7 +162,6 @@ impl ModelProviderInfo { query_params: self.query_params.clone(), wire: match self.wire_api { WireApi::Responses => ApiWireApi::Responses, - WireApi::ResponsesWebsocket => ApiWireApi::Responses, WireApi::Chat => ApiWireApi::Chat, }, headers, @@ -254,6 +253,7 @@ impl ModelProviderInfo { stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: true, + supports_websockets: true, } } @@ -332,6 +332,7 @@ pub fn create_oss_provider_with_base_url(base_url: &str, wire_api: WireApi) -> M stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, } } @@ -360,6 +361,7 @@ base_url = "http://localhost:11434/v1" stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); @@ -390,6 +392,7 @@ query_params = { api-version = "2025-04-01-preview" } stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); @@ -423,6 +426,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); @@ -454,6 +458,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; let api = provider.to_api_provider(None).expect("api provider"); assert!( @@ -476,6 +481,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; let named_api = named_provider.to_api_provider(None).expect("api provider"); assert!(named_api.is_azure_responses_endpoint()); @@ -500,6 +506,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; let api = provider.to_api_provider(None).expect("api provider"); assert!( diff --git a/codex-rs/core/src/models_manager/manager.rs b/codex-rs/core/src/models_manager/manager.rs index 58049b4747..77fb44cd5d 100644 --- a/codex-rs/core/src/models_manager/manager.rs +++ b/codex-rs/core/src/models_manager/manager.rs @@ -432,6 +432,7 @@ mod tests { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, } } diff --git a/codex-rs/core/src/transport_manager.rs b/codex-rs/core/src/transport_manager.rs index f1fe68faf4..2b5d095000 100644 --- a/codex-rs/core/src/transport_manager.rs +++ b/codex-rs/core/src/transport_manager.rs @@ -2,11 +2,9 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use crate::model_provider_info::WireApi; - #[derive(Clone, Debug, Default)] pub struct TransportManager { - fallback_to_http: Arc, + disable_websockets: Arc, } impl TransportManager { @@ -14,18 +12,11 @@ impl TransportManager { Self::default() } - pub fn effective_wire_api(&self, provider_wire_api: WireApi) -> WireApi { - if self.fallback_to_http.load(Ordering::Relaxed) - && provider_wire_api == WireApi::ResponsesWebsocket - { - WireApi::Responses - } else { - provider_wire_api - } + pub fn disable_websockets(&self) -> bool { + self.disable_websockets.load(Ordering::Relaxed) } - pub fn activate_http_fallback(&self, provider_wire_api: WireApi) -> bool { - provider_wire_api == WireApi::ResponsesWebsocket - && !self.fallback_to_http.swap(true, Ordering::Relaxed) + pub fn activate_http_fallback(&self, websocket_enabled: bool) -> bool { + websocket_enabled && !self.disable_websockets.swap(true, Ordering::Relaxed) } } diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index 5e8c895a3f..ffa2caa59f 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -60,6 +60,7 @@ async fn run_request(input: Vec) -> Value { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, }; let codex_home = match TempDir::new() { diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index 84d0ede085..160699733e 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -59,6 +59,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, }; let codex_home = match TempDir::new() { diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index a247453775..af85ebb955 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -8,7 +8,6 @@ use codex_core::CodexAuth; use codex_core::CodexThread; use codex_core::ModelProviderInfo; use codex_core::ThreadManager; -use codex_core::WireApi; use codex_core::built_in_model_providers; use codex_core::config::Config; use codex_core::features::Feature; @@ -127,7 +126,7 @@ impl TestCodexBuilder { let base_url_clone = base_url.clone(); self.config_mutators.push(Box::new(move |config| { config.model_provider.base_url = Some(base_url_clone); - config.model_provider.wire_api = WireApi::ResponsesWebsocket; + config.features.enable(Feature::ResponsesWebsockets); })); self.build_with_home_and_base_url(base_url, home, None) .await diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index b3c0165f46..b2c64818cc 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -57,6 +57,7 @@ async fn responses_stream_includes_subagent_header_on_review() { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, }; let codex_home = TempDir::new().expect("failed to create TempDir"); @@ -154,6 +155,7 @@ async fn responses_stream_includes_subagent_header_on_other() { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, }; let codex_home = TempDir::new().expect("failed to create TempDir"); @@ -307,6 +309,7 @@ async fn responses_respects_model_info_overrides_from_config() { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, }; let codex_home = TempDir::new().expect("failed to create TempDir"); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 8b8adf06ab..72608a44d8 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1151,6 +1151,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: false, }; let codex_home = TempDir::new().unwrap(); @@ -1671,6 +1672,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; // Init session @@ -1751,6 +1753,7 @@ async fn env_var_overrides_loaded_auth() { stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, + supports_websockets: false, }; // Init session diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 60229e01d6..5037479987 100644 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -10,6 +10,7 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::TransportManager; use codex_core::WireApi; +use codex_core::features::Feature; use codex_core::models_manager::manager::ModelsManager; use codex_core::protocol::SessionSource; use codex_otel::OtelManager; @@ -188,7 +189,7 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo { env_key: None, env_key_instructions: None, experimental_bearer_token: None, - wire_api: WireApi::ResponsesWebsocket, + wire_api: WireApi::Responses, query_params: None, http_headers: None, env_http_headers: None, @@ -196,6 +197,7 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo { stream_max_retries: Some(0), stream_idle_timeout_ms: Some(5_000), requires_openai_auth: false, + supports_websockets: true, } } @@ -204,6 +206,7 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home).await; config.model = Some(MODEL.to_string()); + config.features.enable(Feature::ResponsesWebsockets); let config = Arc::new(config); let model_info = ModelsManager::construct_model_info_offline(MODEL, &config); let conversation_id = ThreadId::new(); diff --git a/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs b/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs index 6bfcef38b3..ec18816235 100644 --- a/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs +++ b/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs @@ -73,6 +73,7 @@ async fn continue_after_stream_error() { stream_max_retries: Some(1), stream_idle_timeout_ms: Some(2_000), requires_openai_auth: false, + supports_websockets: false, }; let TestCodex { codex, .. } = test_codex() diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index a4962d497e..68b7763eca 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -81,6 +81,7 @@ async fn retries_on_early_close() { stream_max_retries: Some(1), stream_idle_timeout_ms: Some(2000), requires_openai_auth: false, + supports_websockets: false, }; let TestCodex { codex, .. } = test_codex() diff --git a/codex-rs/core/tests/suite/websocket_fallback.rs b/codex-rs/core/tests/suite/websocket_fallback.rs index d52236a8a2..e61c4e5c2d 100644 --- a/codex-rs/core/tests/suite/websocket_fallback.rs +++ b/codex-rs/core/tests/suite/websocket_fallback.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use codex_core::WireApi; +use codex_core::features::Feature; use core_test_support::responses; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_response_created; @@ -26,7 +26,8 @@ async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result let base_url = format!("{}/v1", server.uri()); move |config| { config.model_provider.base_url = Some(base_url); - config.model_provider.wire_api = WireApi::ResponsesWebsocket; + config.model_provider.wire_api = codex_core::WireApi::Responses; + config.features.enable(Feature::ResponsesWebsockets); config.model_provider.stream_max_retries = Some(0); config.model_provider.request_max_retries = Some(0); } @@ -70,7 +71,8 @@ async fn websocket_fallback_is_sticky_across_turns() -> Result<()> { let base_url = format!("{}/v1", server.uri()); move |config| { config.model_provider.base_url = Some(base_url); - config.model_provider.wire_api = WireApi::ResponsesWebsocket; + config.model_provider.wire_api = codex_core::WireApi::Responses; + config.features.enable(Feature::ResponsesWebsockets); config.model_provider.stream_max_retries = Some(0); config.model_provider.request_max_retries = Some(0); }