use std::sync::Arc; use std::sync::OnceLock; use crate::api_bridge::CoreAuthProvider; use crate::api_bridge::auth_provider_from_auth; use crate::api_bridge::map_api_error; use crate::auth::UnauthorizedRecovery; use codex_api::AggregateStreamExt; use codex_api::ChatClient as ApiChatClient; use codex_api::CompactClient as ApiCompactClient; use codex_api::CompactionInput as ApiCompactionInput; use codex_api::Prompt as ApiPrompt; use codex_api::RequestTelemetry; use codex_api::ReqwestTransport; use codex_api::ResponseAppendWsRequest; use codex_api::ResponseCreateWsRequest; use codex_api::ResponseStream as ApiResponseStream; use codex_api::ResponsesClient as ApiResponsesClient; use codex_api::ResponsesOptions as ApiResponsesOptions; use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient; use codex_api::ResponsesWebsocketConnection as ApiWebSocketConnection; use codex_api::SseTelemetry; use codex_api::TransportError; use codex_api::build_conversation_headers; use codex_api::common::Reasoning; use codex_api::common::ResponsesWsRequest; use codex_api::create_text_param_for_request; use codex_api::error::ApiError; use codex_api::requests::responses::Compression; use codex_app_server_protocol::AuthMode; use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::config_types::WebSearchMode; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::SessionSource; use eventsource_stream::Event; use eventsource_stream::EventStreamError; use futures::StreamExt; use http::HeaderMap as ApiHeaderMap; use http::HeaderValue; use http::StatusCode as HttpStatusCode; use reqwest::StatusCode; use serde_json::Value; use std::time::Duration; use tokio::sync::mpsc; use tracing::warn; use crate::AuthManager; use crate::auth::RefreshTokenError; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::client_common::ResponseStream; use crate::config::Config; use crate::default_client::build_reqwest_client; use crate::error::CodexErr; use crate::error::Result; use crate::features::FEATURES; use crate::features::Feature; use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; use crate::tools::spec::create_tools_json_for_chat_completions_api; use crate::tools::spec::create_tools_json_for_responses_api; pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible"; pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state"; #[derive(Debug)] struct ModelClientState { config: Arc, auth_manager: Option>, model_info: ModelInfo, otel_manager: OtelManager, provider: ModelProviderInfo, conversation_id: ThreadId, effort: Option, summary: ReasoningSummaryConfig, session_source: SessionSource, } #[derive(Debug, Clone)] pub struct ModelClient { state: Arc, } pub struct ModelClientSession { state: Arc, connection: Option, websocket_last_items: Vec, /// Turn state for sticky routing. /// /// This is an `OnceLock` that stores the turn state value received from the server /// on turn start via the `x-codex-turn-state` response header. Once set, this value /// should be sent back to the server in the `x-codex-turn-state` request header for /// all subsequent requests within the same turn to maintain sticky routing. /// /// This is a contract between the client and server: we receive it at turn start, /// keep sending it unchanged between turn requests (e.g., for retries, incremental /// appends, or continuation requests), and must not send it between different turns. turn_state: Arc>, } #[allow(clippy::too_many_arguments)] impl ModelClient { pub fn new( config: Arc, auth_manager: Option>, model_info: ModelInfo, otel_manager: OtelManager, provider: ModelProviderInfo, effort: Option, summary: ReasoningSummaryConfig, conversation_id: ThreadId, session_source: SessionSource, ) -> Self { Self { state: Arc::new(ModelClientState { config, auth_manager, model_info, otel_manager, provider, conversation_id, effort, summary, session_source, }), } } pub fn new_session(&self) -> ModelClientSession { ModelClientSession { state: Arc::clone(&self.state), connection: None, websocket_last_items: Vec::new(), turn_state: Arc::new(OnceLock::new()), } } } impl ModelClient { pub fn get_model_context_window(&self) -> Option { let model_info = &self.state.model_info; let effective_context_window_percent = model_info.effective_context_window_percent; model_info.context_window.map(|context_window| { context_window.saturating_mul(effective_context_window_percent) / 100 }) } pub fn config(&self) -> Arc { Arc::clone(&self.state.config) } pub fn provider(&self) -> &ModelProviderInfo { &self.state.provider } pub fn get_provider(&self) -> ModelProviderInfo { self.state.provider.clone() } pub fn get_otel_manager(&self) -> OtelManager { self.state.otel_manager.clone() } pub fn get_session_source(&self) -> SessionSource { self.state.session_source.clone() } /// Returns the currently configured model slug. pub fn get_model(&self) -> String { self.state.model_info.slug.clone() } pub fn get_model_info(&self) -> ModelInfo { self.state.model_info.clone() } /// Returns the current reasoning effort setting. pub fn get_reasoning_effort(&self) -> Option { self.state.effort } /// Returns the current reasoning summary setting. pub fn get_reasoning_summary(&self) -> ReasoningSummaryConfig { self.state.summary } pub fn get_auth_manager(&self) -> Option> { self.state.auth_manager.clone() } /// Compacts the current conversation history using the Compact endpoint. /// /// This is a unary call (no streaming) that returns a new list of /// `ResponseItem`s representing the compacted transcript. pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result> { if prompt.input.is_empty() { return Ok(Vec::new()); } let auth_manager = self.state.auth_manager.clone(); let auth = match auth_manager.as_ref() { Some(manager) => manager.auth().await, None => None, }; let api_provider = self .state .provider .to_api_provider(auth.as_ref().map(|a| a.mode))?; let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); let request_telemetry = self.build_request_telemetry(); let client = ApiCompactClient::new(transport, api_provider, api_auth) .with_telemetry(Some(request_telemetry)); let instructions = prompt.base_instructions.text.clone(); let payload = ApiCompactionInput { model: &self.state.model_info.slug, input: &prompt.input, instructions: &instructions, }; let mut extra_headers = ApiHeaderMap::new(); if let SessionSource::SubAgent(sub) = &self.state.session_source { let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub { label.clone() } else { serde_json::to_value(sub) .ok() .and_then(|v| v.as_str().map(std::string::ToString::to_string)) .unwrap_or_else(|| "other".to_string()) }; if let Ok(val) = HeaderValue::from_str(&subagent) { extra_headers.insert("x-openai-subagent", val); } } client .compact_input(&payload, extra_headers) .await .map_err(map_api_error) } } impl ModelClientSession { /// Streams a single model turn using either the Responses or Chat /// Completions wire API, depending on the configured provider. /// /// For Chat providers, the underlying stream is optionally aggregated /// based on the `show_raw_agent_reasoning` flag in the config. pub async fn stream(&mut self, prompt: &Prompt) -> Result { match self.state.provider.wire_api { WireApi::Responses => self.stream_responses_api(prompt).await, WireApi::ResponsesWebsocket => self.stream_responses_websocket(prompt).await, WireApi::Chat => { let api_stream = self.stream_chat_completions(prompt).await?; if self.state.config.show_raw_agent_reasoning { Ok(map_response_stream( api_stream.streaming_mode(), self.state.otel_manager.clone(), )) } else { Ok(map_response_stream( api_stream.aggregate(), self.state.otel_manager.clone(), )) } } } } fn build_responses_request(&self, prompt: &Prompt) -> Result { let instructions = prompt.base_instructions.text.clone(); let tools_json: Vec = create_tools_json_for_responses_api(&prompt.tools)?; Ok(build_api_prompt(prompt, instructions, tools_json)) } fn build_responses_options( &self, prompt: &Prompt, compression: Compression, ) -> ApiResponsesOptions { let model_info = &self.state.model_info; let default_reasoning_effort = model_info.default_reasoning_level; let reasoning = if model_info.supports_reasoning_summaries { Some(Reasoning { effort: self.state.effort.or(default_reasoning_effort), summary: if self.state.summary == ReasoningSummaryConfig::None { None } else { Some(self.state.summary) }, }) } else { None }; let include = if reasoning.is_some() { vec!["reasoning.encrypted_content".to_string()] } else { Vec::new() }; let verbosity = if model_info.support_verbosity { self.state .config .model_verbosity .or(model_info.default_verbosity) } else { if self.state.config.model_verbosity.is_some() { warn!( "model_verbosity is set but ignored as the model does not support verbosity: {}", model_info.slug ); } None }; let text = create_text_param_for_request(verbosity, &prompt.output_schema); let conversation_id = self.state.conversation_id.to_string(); ApiResponsesOptions { reasoning, include, prompt_cache_key: Some(conversation_id.clone()), text, store_override: None, conversation_id: Some(conversation_id), session_source: Some(self.state.session_source.clone()), extra_headers: build_responses_headers(&self.state.config, Some(&self.turn_state)), compression, turn_state: Some(Arc::clone(&self.turn_state)), } } fn get_incremental_items(&self, input_items: &[ResponseItem]) -> Option> { // Checks whether the current request input is an incremental append to the previous request. // If items in the new request contain all the items from the previous request we build // a response.append request otherwise we start with a fresh response.create request. let previous_len = self.websocket_last_items.len(); let can_append = previous_len > 0 && input_items.starts_with(&self.websocket_last_items) && previous_len < input_items.len(); if can_append { Some(input_items[previous_len..].to_vec()) } else { None } } fn prepare_websocket_request( &self, api_prompt: &ApiPrompt, options: &ApiResponsesOptions, ) -> ResponsesWsRequest { if let Some(append_items) = self.get_incremental_items(&api_prompt.input) { return ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest { input: append_items, }); } let ApiResponsesOptions { reasoning, include, prompt_cache_key, text, store_override, .. } = options; let store = store_override.unwrap_or(false); let payload = ResponseCreateWsRequest { model: self.state.model_info.slug.clone(), instructions: api_prompt.instructions.clone(), input: api_prompt.input.clone(), tools: api_prompt.tools.clone(), tool_choice: "auto".to_string(), parallel_tool_calls: api_prompt.parallel_tool_calls, reasoning: reasoning.clone(), store, stream: true, include: include.clone(), prompt_cache_key: prompt_cache_key.clone(), text: text.clone(), }; ResponsesWsRequest::ResponseCreate(payload) } async fn websocket_connection( &mut self, api_provider: codex_api::Provider, api_auth: CoreAuthProvider, options: &ApiResponsesOptions, ) -> std::result::Result<&ApiWebSocketConnection, ApiError> { let needs_new = match self.connection.as_ref() { Some(conn) => conn.is_closed().await, None => true, }; if needs_new { let mut headers = options.extra_headers.clone(); headers.extend(build_conversation_headers(options.conversation_id.clone())); let new_conn: ApiWebSocketConnection = ApiWebSocketResponsesClient::new(api_provider, api_auth) .connect(headers, options.turn_state.clone()) .await?; self.connection = Some(new_conn); } self.connection.as_ref().ok_or(ApiError::Stream( "websocket connection is unavailable".to_string(), )) } fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression { if self .state .config .features .enabled(Feature::EnableRequestCompression) && auth.is_some_and(|auth| auth.mode == AuthMode::ChatGPT) && self.state.provider.is_openai() { Compression::Zstd } else { Compression::None } } /// Streams a turn via the OpenAI Chat Completions API. /// /// This path is only used when the provider is configured with /// `WireApi::Chat`; it does not support `output_schema` today. async fn stream_chat_completions(&self, prompt: &Prompt) -> Result { if prompt.output_schema.is_some() { return Err(CodexErr::UnsupportedOperation( "output_schema is not supported for Chat Completions API".to_string(), )); } let auth_manager = self.state.auth_manager.clone(); let instructions = prompt.base_instructions.text.clone(); let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?; let api_prompt = build_api_prompt(prompt, instructions, tools_json); let conversation_id = self.state.conversation_id.to_string(); let session_source = self.state.session_source.clone(); let mut auth_recovery = auth_manager .as_ref() .map(super::auth::AuthManager::unauthorized_recovery); loop { let auth = match auth_manager.as_ref() { Some(manager) => manager.auth().await, None => None, }; let api_provider = self .state .provider .to_api_provider(auth.as_ref().map(|a| a.mode))?; let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry(); let client = ApiChatClient::new(transport, api_provider, api_auth) .with_telemetry(Some(request_telemetry), Some(sse_telemetry)); let stream_result = client .stream_prompt( &self.state.model_info.slug, &api_prompt, Some(conversation_id.clone()), Some(session_source.clone()), ) .await; match stream_result { Ok(stream) => return Ok(stream), Err(ApiError::Transport(TransportError::Http { status, .. })) if status == StatusCode::UNAUTHORIZED => { handle_unauthorized(status, &mut auth_recovery).await?; continue; } Err(err) => return Err(map_api_error(err)), } } } /// Streams a turn via the OpenAI Responses API. /// /// Handles SSE fixtures, reasoning summaries, verbosity, and the /// `text` controls used for output schemas. async fn stream_responses_api(&self, prompt: &Prompt) -> Result { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { warn!(path, "Streaming from fixture"); let stream = codex_api::stream_from_fixture(path, self.state.provider.stream_idle_timeout()) .map_err(map_api_error)?; return Ok(map_response_stream(stream, self.state.otel_manager.clone())); } let auth_manager = self.state.auth_manager.clone(); let api_prompt = self.build_responses_request(prompt)?; let mut auth_recovery = auth_manager .as_ref() .map(super::auth::AuthManager::unauthorized_recovery); loop { let auth = match auth_manager.as_ref() { Some(manager) => manager.auth().await, None => None, }; let api_provider = self .state .provider .to_api_provider(auth.as_ref().map(|a| a.mode))?; let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry(); let compression = self.responses_request_compression(auth.as_ref()); let client = ApiResponsesClient::new(transport, api_provider, api_auth) .with_telemetry(Some(request_telemetry), Some(sse_telemetry)); let options = self.build_responses_options(prompt, compression); let stream_result = client .stream_prompt(&self.state.model_info.slug, &api_prompt, options) .await; match stream_result { Ok(stream) => { return Ok(map_response_stream(stream, self.state.otel_manager.clone())); } Err(ApiError::Transport(TransportError::Http { status, .. })) if status == StatusCode::UNAUTHORIZED => { handle_unauthorized(status, &mut auth_recovery).await?; continue; } Err(err) => return Err(map_api_error(err)), } } } /// Streams a turn via the Responses API over WebSocket transport. async fn stream_responses_websocket(&mut self, prompt: &Prompt) -> Result { let auth_manager = self.state.auth_manager.clone(); let api_prompt = self.build_responses_request(prompt)?; let mut auth_recovery = auth_manager .as_ref() .map(super::auth::AuthManager::unauthorized_recovery); loop { let auth = match auth_manager.as_ref() { Some(manager) => manager.auth().await, None => None, }; let api_provider = self .state .provider .to_api_provider(auth.as_ref().map(|a| a.mode))?; let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; let compression = self.responses_request_compression(auth.as_ref()); let options = self.build_responses_options(prompt, compression); let request = self.prepare_websocket_request(&api_prompt, &options); let connection = match self .websocket_connection(api_provider.clone(), api_auth.clone(), &options) .await { Ok(connection) => connection, Err(ApiError::Transport(TransportError::Http { status, .. })) if status == StatusCode::UNAUTHORIZED => { handle_unauthorized(status, &mut auth_recovery).await?; continue; } Err(err) => return Err(map_api_error(err)), }; let stream_result = connection .stream_request(request) .await .map_err(map_api_error)?; self.websocket_last_items = api_prompt.input.clone(); return Ok(map_response_stream( stream_result, self.state.otel_manager.clone(), )); } } /// Builds request and SSE telemetry for streaming API calls (Chat/Responses). fn build_streaming_telemetry(&self) -> (Arc, Arc) { let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); let request_telemetry: Arc = telemetry.clone(); let sse_telemetry: Arc = telemetry; (request_telemetry, sse_telemetry) } } impl ModelClient { /// Builds request telemetry for unary API calls (e.g., Compact endpoint). fn build_request_telemetry(&self) -> Arc { let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); let request_telemetry: Arc = telemetry; request_telemetry } } /// Adapts the core `Prompt` type into the `codex-api` payload shape. fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec) -> ApiPrompt { ApiPrompt { instructions, input: prompt.get_formatted_input(), tools: tools_json, parallel_tool_calls: prompt.parallel_tool_calls, output_schema: prompt.output_schema.clone(), } } fn beta_feature_headers(config: &Config) -> ApiHeaderMap { let enabled = FEATURES .iter() .filter_map(|spec| { if spec.stage.beta_menu_description().is_some() && config.features.enabled(spec.id) { Some(spec.key) } else { None } }) .collect::>(); let value = enabled.join(","); let mut headers = ApiHeaderMap::new(); if !value.is_empty() && let Ok(header_value) = HeaderValue::from_str(value.as_str()) { headers.insert("x-codex-beta-features", header_value); } headers } fn build_responses_headers( config: &Config, turn_state: Option<&Arc>>, ) -> ApiHeaderMap { let mut headers = beta_feature_headers(config); headers.insert( WEB_SEARCH_ELIGIBLE_HEADER, HeaderValue::from_static( if matches!(config.web_search_mode, Some(WebSearchMode::Disabled)) { "false" } else { "true" }, ), ); if let Some(turn_state) = turn_state && let Some(state) = turn_state.get() && let Ok(header_value) = HeaderValue::from_str(state) { headers.insert(X_CODEX_TURN_STATE_HEADER, header_value); } headers } fn map_response_stream(api_stream: S, otel_manager: OtelManager) -> ResponseStream where S: futures::Stream> + Unpin + Send + 'static, { let (tx_event, rx_event) = mpsc::channel::>(1600); tokio::spawn(async move { let mut logged_error = false; let mut api_stream = api_stream; while let Some(event) = api_stream.next().await { match event { Ok(ResponseEvent::Completed { response_id, token_usage, }) => { if let Some(usage) = &token_usage { otel_manager.sse_event_completed( usage.input_tokens, usage.output_tokens, Some(usage.cached_input_tokens), Some(usage.reasoning_output_tokens), usage.total_tokens, ); } if tx_event .send(Ok(ResponseEvent::Completed { response_id, token_usage, })) .await .is_err() { return; } } Ok(event) => { if tx_event.send(Ok(event)).await.is_err() { return; } } Err(err) => { let mapped = map_api_error(err); if !logged_error { otel_manager.see_event_completed_failed(&mapped); logged_error = true; } if tx_event.send(Err(mapped)).await.is_err() { return; } } } } }); ResponseStream { rx_event } } /// Handles a 401 response by optionally refreshing ChatGPT tokens once. /// /// When refresh succeeds, the caller should retry the API call; otherwise /// the mapped `CodexErr` is returned to the caller. async fn handle_unauthorized( status: StatusCode, auth_recovery: &mut Option, ) -> Result<()> { if let Some(recovery) = auth_recovery && recovery.has_next() { return match recovery.next().await { Ok(_) => Ok(()), Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)), Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)), }; } Err(map_unauthorized_status(status)) } fn map_unauthorized_status(status: StatusCode) -> CodexErr { map_api_error(ApiError::Transport(TransportError::Http { status, url: None, headers: None, body: None, })) } struct ApiTelemetry { otel_manager: OtelManager, } impl ApiTelemetry { fn new(otel_manager: OtelManager) -> Self { Self { otel_manager } } } impl RequestTelemetry for ApiTelemetry { fn on_request( &self, attempt: u64, status: Option, error: Option<&TransportError>, duration: Duration, ) { let error_message = error.map(std::string::ToString::to_string); self.otel_manager.record_api_request( attempt, status.map(|s| s.as_u16()), error_message.as_deref(), duration, ); } } impl SseTelemetry for ApiTelemetry { fn on_sse_poll( &self, result: &std::result::Result< Option>>, tokio::time::error::Elapsed, >, duration: Duration, ) { self.otel_manager.log_sse_event(result, duration); } }