mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Remove WebSocket wire format (#10179)
I'd like WireApi to go away (when chat is removed) and WebSockets is still responses API just over a different transport.
This commit is contained in:
@@ -443,6 +443,11 @@
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"supports_websockets": {
|
||||
"default": false,
|
||||
"description": "Whether this provider supports the Responses API WebSocket transport.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"wire_api": {
|
||||
"allOf": [
|
||||
{
|
||||
@@ -1081,13 +1086,6 @@
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"description": "Experimental: Responses API over WebSocket transport.",
|
||||
"enum": [
|
||||
"responses_websocket"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"description": "Regular Chat Completions compatible with `/v1/chat/completions`.",
|
||||
"enum": [
|
||||
|
||||
@@ -260,12 +260,18 @@ impl ModelClientSession {
|
||||
/// For Chat providers, the underlying stream is optionally aggregated
|
||||
/// based on the `show_raw_agent_reasoning` flag in the config.
|
||||
pub async fn stream(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
|
||||
let wire_api = self
|
||||
.transport_manager
|
||||
.effective_wire_api(self.state.provider.wire_api);
|
||||
let wire_api = self.state.provider.wire_api;
|
||||
match wire_api {
|
||||
WireApi::Responses => self.stream_responses_api(prompt).await,
|
||||
WireApi::ResponsesWebsocket => self.stream_responses_websocket(prompt).await,
|
||||
WireApi::Responses => {
|
||||
let websocket_enabled = self.responses_websocket_enabled()
|
||||
&& !self.transport_manager.disable_websockets();
|
||||
|
||||
if websocket_enabled {
|
||||
self.stream_responses_websocket(prompt).await
|
||||
} else {
|
||||
self.stream_responses_api(prompt).await
|
||||
}
|
||||
}
|
||||
WireApi::Chat => {
|
||||
let api_stream = self.stream_chat_completions(prompt).await?;
|
||||
|
||||
@@ -285,9 +291,10 @@ impl ModelClientSession {
|
||||
}
|
||||
|
||||
pub(crate) fn try_switch_fallback_transport(&mut self) -> bool {
|
||||
let websocket_enabled = self.responses_websocket_enabled();
|
||||
let activated = self
|
||||
.transport_manager
|
||||
.activate_http_fallback(self.state.provider.wire_api);
|
||||
.activate_http_fallback(websocket_enabled);
|
||||
if activated {
|
||||
warn!("falling back to HTTP");
|
||||
self.state.otel_manager.counter(
|
||||
@@ -302,6 +309,15 @@ impl ModelClientSession {
|
||||
activated
|
||||
}
|
||||
|
||||
fn responses_websocket_enabled(&self) -> bool {
|
||||
self.state.provider.supports_websockets
|
||||
&& self
|
||||
.state
|
||||
.config
|
||||
.features
|
||||
.enabled(Feature::ResponsesWebsockets)
|
||||
}
|
||||
|
||||
fn build_responses_request(&self, prompt: &Prompt) -> Result<ApiPrompt> {
|
||||
let instructions = prompt.base_instructions.text.clone();
|
||||
let tools_json: Vec<Value> = create_tools_json_for_responses_api(&prompt.tools)?;
|
||||
|
||||
@@ -1372,12 +1372,6 @@ impl Config {
|
||||
|| cfg.sandbox_mode.is_some();
|
||||
|
||||
let mut model_providers = built_in_model_providers();
|
||||
if features.enabled(Feature::ResponsesWebsockets)
|
||||
&& let Some(provider) = model_providers.get_mut("openai")
|
||||
&& provider.is_openai()
|
||||
{
|
||||
provider.wire_api = crate::model_provider_info::WireApi::ResponsesWebsocket;
|
||||
}
|
||||
// Merge user-defined providers into the built-in list.
|
||||
for (key, provider) in cfg.model_providers.into_iter() {
|
||||
model_providers.entry(key).or_insert(provider);
|
||||
@@ -2555,7 +2549,7 @@ profile = "project"
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn responses_websockets_feature_updates_openai_provider() -> std::io::Result<()> {
|
||||
fn responses_websockets_feature_does_not_change_wire_api() -> std::io::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut entries = BTreeMap::new();
|
||||
entries.insert("responses_websockets".to_string(), true);
|
||||
@@ -2572,7 +2566,7 @@ profile = "project"
|
||||
|
||||
assert_eq!(
|
||||
config.model_provider.wire_api,
|
||||
crate::model_provider_info::WireApi::ResponsesWebsocket
|
||||
crate::model_provider_info::WireApi::Responses
|
||||
);
|
||||
|
||||
Ok(())
|
||||
@@ -3692,6 +3686,7 @@ model_verbosity = "high"
|
||||
stream_max_retries: Some(10),
|
||||
stream_idle_timeout_ms: Some(300_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
let model_provider_map = {
|
||||
let mut model_provider_map = built_in_model_providers();
|
||||
|
||||
@@ -43,10 +43,6 @@ pub enum WireApi {
|
||||
/// The Responses API exposed by OpenAI at `/v1/responses`.
|
||||
Responses,
|
||||
|
||||
/// Experimental: Responses API over WebSocket transport.
|
||||
#[serde(rename = "responses_websocket")]
|
||||
ResponsesWebsocket,
|
||||
|
||||
/// Regular Chat Completions compatible with `/v1/chat/completions`.
|
||||
#[default]
|
||||
Chat,
|
||||
@@ -105,6 +101,10 @@ pub struct ModelProviderInfo {
|
||||
/// and API key (if needed) comes from the "env_key" environment variable.
|
||||
#[serde(default)]
|
||||
pub requires_openai_auth: bool,
|
||||
|
||||
/// Whether this provider supports the Responses API WebSocket transport.
|
||||
#[serde(default)]
|
||||
pub supports_websockets: bool,
|
||||
}
|
||||
|
||||
impl ModelProviderInfo {
|
||||
@@ -162,7 +162,6 @@ impl ModelProviderInfo {
|
||||
query_params: self.query_params.clone(),
|
||||
wire: match self.wire_api {
|
||||
WireApi::Responses => ApiWireApi::Responses,
|
||||
WireApi::ResponsesWebsocket => ApiWireApi::Responses,
|
||||
WireApi::Chat => ApiWireApi::Chat,
|
||||
},
|
||||
headers,
|
||||
@@ -254,6 +253,7 @@ impl ModelProviderInfo {
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: true,
|
||||
supports_websockets: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,6 +332,7 @@ pub fn create_oss_provider_with_base_url(base_url: &str, wire_api: WireApi) -> M
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -360,6 +361,7 @@ base_url = "http://localhost:11434/v1"
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -390,6 +392,7 @@ query_params = { api-version = "2025-04-01-preview" }
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -423,6 +426,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -454,6 +458,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
let api = provider.to_api_provider(None).expect("api provider");
|
||||
assert!(
|
||||
@@ -476,6 +481,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
let named_api = named_provider.to_api_provider(None).expect("api provider");
|
||||
assert!(named_api.is_azure_responses_endpoint());
|
||||
@@ -500,6 +506,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
let api = provider.to_api_provider(None).expect("api provider");
|
||||
assert!(
|
||||
|
||||
@@ -432,6 +432,7 @@ mod tests {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,11 +2,9 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::model_provider_info::WireApi;
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct TransportManager {
|
||||
fallback_to_http: Arc<AtomicBool>,
|
||||
disable_websockets: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl TransportManager {
|
||||
@@ -14,18 +12,11 @@ impl TransportManager {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn effective_wire_api(&self, provider_wire_api: WireApi) -> WireApi {
|
||||
if self.fallback_to_http.load(Ordering::Relaxed)
|
||||
&& provider_wire_api == WireApi::ResponsesWebsocket
|
||||
{
|
||||
WireApi::Responses
|
||||
} else {
|
||||
provider_wire_api
|
||||
}
|
||||
pub fn disable_websockets(&self) -> bool {
|
||||
self.disable_websockets.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn activate_http_fallback(&self, provider_wire_api: WireApi) -> bool {
|
||||
provider_wire_api == WireApi::ResponsesWebsocket
|
||||
&& !self.fallback_to_http.swap(true, Ordering::Relaxed)
|
||||
pub fn activate_http_fallback(&self, websocket_enabled: bool) -> bool {
|
||||
websocket_enabled && !self.disable_websockets.swap(true, Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = match TempDir::new() {
|
||||
|
||||
@@ -59,6 +59,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = match TempDir::new() {
|
||||
|
||||
@@ -8,7 +8,6 @@ use codex_core::CodexAuth;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::ModelProviderInfo;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::built_in_model_providers;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::features::Feature;
|
||||
@@ -127,7 +126,7 @@ impl TestCodexBuilder {
|
||||
let base_url_clone = base_url.clone();
|
||||
self.config_mutators.push(Box::new(move |config| {
|
||||
config.model_provider.base_url = Some(base_url_clone);
|
||||
config.model_provider.wire_api = WireApi::ResponsesWebsocket;
|
||||
config.features.enable(Feature::ResponsesWebsockets);
|
||||
}));
|
||||
self.build_with_home_and_base_url(base_url, home, None)
|
||||
.await
|
||||
|
||||
@@ -57,6 +57,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
@@ -154,6 +155,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
@@ -307,6 +309,7 @@ async fn responses_respects_model_info_overrides_from_config() {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
|
||||
@@ -1151,6 +1151,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
@@ -1671,6 +1672,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() {
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
// Init session
|
||||
@@ -1751,6 +1753,7 @@ async fn env_var_overrides_loaded_auth() {
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
// Init session
|
||||
|
||||
@@ -10,6 +10,7 @@ use codex_core::ResponseEvent;
|
||||
use codex_core::ResponseItem;
|
||||
use codex_core::TransportManager;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_otel::OtelManager;
|
||||
@@ -188,7 +189,7 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
wire_api: WireApi::ResponsesWebsocket,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
@@ -196,6 +197,7 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,6 +206,7 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model = Some(MODEL.to_string());
|
||||
config.features.enable(Feature::ResponsesWebsockets);
|
||||
let config = Arc::new(config);
|
||||
let model_info = ModelsManager::construct_model_info_offline(MODEL, &config);
|
||||
let conversation_id = ThreadId::new();
|
||||
|
||||
@@ -73,6 +73,7 @@ async fn continue_after_stream_error() {
|
||||
stream_max_retries: Some(1),
|
||||
stream_idle_timeout_ms: Some(2_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let TestCodex { codex, .. } = test_codex()
|
||||
|
||||
@@ -81,6 +81,7 @@ async fn retries_on_early_close() {
|
||||
stream_max_retries: Some(1),
|
||||
stream_idle_timeout_ms: Some(2000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let TestCodex { codex, .. } = test_codex()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::features::Feature;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
@@ -26,7 +26,8 @@ async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result
|
||||
let base_url = format!("{}/v1", server.uri());
|
||||
move |config| {
|
||||
config.model_provider.base_url = Some(base_url);
|
||||
config.model_provider.wire_api = WireApi::ResponsesWebsocket;
|
||||
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
||||
config.features.enable(Feature::ResponsesWebsockets);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
}
|
||||
@@ -70,7 +71,8 @@ async fn websocket_fallback_is_sticky_across_turns() -> Result<()> {
|
||||
let base_url = format!("{}/v1", server.uri());
|
||||
move |config| {
|
||||
config.model_provider.base_url = Some(base_url);
|
||||
config.model_provider.wire_api = WireApi::ResponsesWebsocket;
|
||||
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
||||
config.features.enable(Feature::ResponsesWebsockets);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user