mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
web_search can now be updated per-turn, for things like changes to sandbox policy. `SandboxPolicy::DangerFullAccess` now sets web_search to `live`, and the default is still `cached`. Added integration tests.
805 lines
29 KiB
Rust
805 lines
29 KiB
Rust
use std::sync::Arc;
|
|
use std::sync::OnceLock;
|
|
|
|
use crate::api_bridge::CoreAuthProvider;
|
|
use crate::api_bridge::auth_provider_from_auth;
|
|
use crate::api_bridge::map_api_error;
|
|
use crate::auth::UnauthorizedRecovery;
|
|
use codex_api::AggregateStreamExt;
|
|
use codex_api::ChatClient as ApiChatClient;
|
|
use codex_api::CompactClient as ApiCompactClient;
|
|
use codex_api::CompactionInput as ApiCompactionInput;
|
|
use codex_api::Prompt as ApiPrompt;
|
|
use codex_api::RequestTelemetry;
|
|
use codex_api::ReqwestTransport;
|
|
use codex_api::ResponseAppendWsRequest;
|
|
use codex_api::ResponseCreateWsRequest;
|
|
use codex_api::ResponseStream as ApiResponseStream;
|
|
use codex_api::ResponsesClient as ApiResponsesClient;
|
|
use codex_api::ResponsesOptions as ApiResponsesOptions;
|
|
use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient;
|
|
use codex_api::ResponsesWebsocketConnection as ApiWebSocketConnection;
|
|
use codex_api::SseTelemetry;
|
|
use codex_api::TransportError;
|
|
use codex_api::build_conversation_headers;
|
|
use codex_api::common::Reasoning;
|
|
use codex_api::common::ResponsesWsRequest;
|
|
use codex_api::create_text_param_for_request;
|
|
use codex_api::error::ApiError;
|
|
use codex_api::requests::responses::Compression;
|
|
use codex_app_server_protocol::AuthMode;
|
|
use codex_otel::OtelManager;
|
|
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
|
use codex_protocol::config_types::WebSearchMode;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::openai_models::ModelInfo;
|
|
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use eventsource_stream::Event;
|
|
use eventsource_stream::EventStreamError;
|
|
use futures::StreamExt;
|
|
use http::HeaderMap as ApiHeaderMap;
|
|
use http::HeaderValue;
|
|
use http::StatusCode as HttpStatusCode;
|
|
use reqwest::StatusCode;
|
|
use serde_json::Value;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tracing::warn;
|
|
|
|
use crate::AuthManager;
|
|
use crate::auth::RefreshTokenError;
|
|
use crate::client_common::Prompt;
|
|
use crate::client_common::ResponseEvent;
|
|
use crate::client_common::ResponseStream;
|
|
use crate::config::Config;
|
|
use crate::default_client::build_reqwest_client;
|
|
use crate::error::CodexErr;
|
|
use crate::error::Result;
|
|
use crate::features::FEATURES;
|
|
use crate::features::Feature;
|
|
use crate::flags::CODEX_RS_SSE_FIXTURE;
|
|
use crate::model_provider_info::ModelProviderInfo;
|
|
use crate::model_provider_info::WireApi;
|
|
use crate::tools::spec::create_tools_json_for_chat_completions_api;
|
|
use crate::tools::spec::create_tools_json_for_responses_api;
|
|
|
|
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
|
|
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
|
|
|
#[derive(Debug)]
|
|
struct ModelClientState {
|
|
config: Arc<Config>,
|
|
auth_manager: Option<Arc<AuthManager>>,
|
|
model_info: ModelInfo,
|
|
otel_manager: OtelManager,
|
|
provider: ModelProviderInfo,
|
|
conversation_id: ThreadId,
|
|
effort: Option<ReasoningEffortConfig>,
|
|
summary: ReasoningSummaryConfig,
|
|
session_source: SessionSource,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct ModelClient {
|
|
state: Arc<ModelClientState>,
|
|
}
|
|
|
|
pub struct ModelClientSession {
|
|
state: Arc<ModelClientState>,
|
|
connection: Option<ApiWebSocketConnection>,
|
|
websocket_last_items: Vec<ResponseItem>,
|
|
/// Turn state for sticky routing.
|
|
///
|
|
/// This is an `OnceLock` that stores the turn state value received from the server
|
|
/// on turn start via the `x-codex-turn-state` response header. Once set, this value
|
|
/// should be sent back to the server in the `x-codex-turn-state` request header for
|
|
/// all subsequent requests within the same turn to maintain sticky routing.
|
|
///
|
|
/// This is a contract between the client and server: we receive it at turn start,
|
|
/// keep sending it unchanged between turn requests (e.g., for retries, incremental
|
|
/// appends, or continuation requests), and must not send it between different turns.
|
|
turn_state: Arc<OnceLock<String>>,
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
impl ModelClient {
|
|
pub fn new(
|
|
config: Arc<Config>,
|
|
auth_manager: Option<Arc<AuthManager>>,
|
|
model_info: ModelInfo,
|
|
otel_manager: OtelManager,
|
|
provider: ModelProviderInfo,
|
|
effort: Option<ReasoningEffortConfig>,
|
|
summary: ReasoningSummaryConfig,
|
|
conversation_id: ThreadId,
|
|
session_source: SessionSource,
|
|
) -> Self {
|
|
Self {
|
|
state: Arc::new(ModelClientState {
|
|
config,
|
|
auth_manager,
|
|
model_info,
|
|
otel_manager,
|
|
provider,
|
|
conversation_id,
|
|
effort,
|
|
summary,
|
|
session_source,
|
|
}),
|
|
}
|
|
}
|
|
|
|
pub fn new_session(&self) -> ModelClientSession {
|
|
ModelClientSession {
|
|
state: Arc::clone(&self.state),
|
|
connection: None,
|
|
websocket_last_items: Vec::new(),
|
|
turn_state: Arc::new(OnceLock::new()),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ModelClient {
|
|
pub fn get_model_context_window(&self) -> Option<i64> {
|
|
let model_info = &self.state.model_info;
|
|
let effective_context_window_percent = model_info.effective_context_window_percent;
|
|
model_info.context_window.map(|context_window| {
|
|
context_window.saturating_mul(effective_context_window_percent) / 100
|
|
})
|
|
}
|
|
|
|
pub fn config(&self) -> Arc<Config> {
|
|
Arc::clone(&self.state.config)
|
|
}
|
|
|
|
pub fn provider(&self) -> &ModelProviderInfo {
|
|
&self.state.provider
|
|
}
|
|
|
|
pub fn get_provider(&self) -> ModelProviderInfo {
|
|
self.state.provider.clone()
|
|
}
|
|
|
|
pub fn get_otel_manager(&self) -> OtelManager {
|
|
self.state.otel_manager.clone()
|
|
}
|
|
|
|
pub fn get_session_source(&self) -> SessionSource {
|
|
self.state.session_source.clone()
|
|
}
|
|
|
|
/// Returns the currently configured model slug.
|
|
pub fn get_model(&self) -> String {
|
|
self.state.model_info.slug.clone()
|
|
}
|
|
|
|
pub fn get_model_info(&self) -> ModelInfo {
|
|
self.state.model_info.clone()
|
|
}
|
|
|
|
/// Returns the current reasoning effort setting.
|
|
pub fn get_reasoning_effort(&self) -> Option<ReasoningEffortConfig> {
|
|
self.state.effort
|
|
}
|
|
|
|
/// Returns the current reasoning summary setting.
|
|
pub fn get_reasoning_summary(&self) -> ReasoningSummaryConfig {
|
|
self.state.summary
|
|
}
|
|
|
|
pub fn get_auth_manager(&self) -> Option<Arc<AuthManager>> {
|
|
self.state.auth_manager.clone()
|
|
}
|
|
|
|
/// Compacts the current conversation history using the Compact endpoint.
|
|
///
|
|
/// This is a unary call (no streaming) that returns a new list of
|
|
/// `ResponseItem`s representing the compacted transcript.
|
|
pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result<Vec<ResponseItem>> {
|
|
if prompt.input.is_empty() {
|
|
return Ok(Vec::new());
|
|
}
|
|
let auth_manager = self.state.auth_manager.clone();
|
|
let auth = match auth_manager.as_ref() {
|
|
Some(manager) => manager.auth().await,
|
|
None => None,
|
|
};
|
|
let api_provider = self
|
|
.state
|
|
.provider
|
|
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
|
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
|
let transport = ReqwestTransport::new(build_reqwest_client());
|
|
let request_telemetry = self.build_request_telemetry();
|
|
let client = ApiCompactClient::new(transport, api_provider, api_auth)
|
|
.with_telemetry(Some(request_telemetry));
|
|
|
|
let instructions = prompt.base_instructions.text.clone();
|
|
let payload = ApiCompactionInput {
|
|
model: &self.state.model_info.slug,
|
|
input: &prompt.input,
|
|
instructions: &instructions,
|
|
};
|
|
|
|
let mut extra_headers = ApiHeaderMap::new();
|
|
if let SessionSource::SubAgent(sub) = &self.state.session_source {
|
|
let subagent = match sub {
|
|
crate::protocol::SubAgentSource::Review => "review".to_string(),
|
|
crate::protocol::SubAgentSource::Compact => "compact".to_string(),
|
|
crate::protocol::SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(),
|
|
crate::protocol::SubAgentSource::Other(label) => label.clone(),
|
|
};
|
|
if let Ok(val) = HeaderValue::from_str(&subagent) {
|
|
extra_headers.insert("x-openai-subagent", val);
|
|
}
|
|
}
|
|
client
|
|
.compact_input(&payload, extra_headers)
|
|
.await
|
|
.map_err(map_api_error)
|
|
}
|
|
}
|
|
|
|
impl ModelClientSession {
|
|
/// Streams a single model turn using either the Responses or Chat
|
|
/// Completions wire API, depending on the configured provider.
|
|
///
|
|
/// For Chat providers, the underlying stream is optionally aggregated
|
|
/// based on the `show_raw_agent_reasoning` flag in the config.
|
|
pub async fn stream(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
|
|
match self.state.provider.wire_api {
|
|
WireApi::Responses => self.stream_responses_api(prompt).await,
|
|
WireApi::ResponsesWebsocket => self.stream_responses_websocket(prompt).await,
|
|
WireApi::Chat => {
|
|
let api_stream = self.stream_chat_completions(prompt).await?;
|
|
|
|
if self.state.config.show_raw_agent_reasoning {
|
|
Ok(map_response_stream(
|
|
api_stream.streaming_mode(),
|
|
self.state.otel_manager.clone(),
|
|
))
|
|
} else {
|
|
Ok(map_response_stream(
|
|
api_stream.aggregate(),
|
|
self.state.otel_manager.clone(),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn build_responses_request(&self, prompt: &Prompt) -> Result<ApiPrompt> {
|
|
let instructions = prompt.base_instructions.text.clone();
|
|
let tools_json: Vec<Value> = create_tools_json_for_responses_api(&prompt.tools)?;
|
|
Ok(build_api_prompt(prompt, instructions, tools_json))
|
|
}
|
|
|
|
fn build_responses_options(
|
|
&self,
|
|
prompt: &Prompt,
|
|
compression: Compression,
|
|
) -> ApiResponsesOptions {
|
|
let model_info = &self.state.model_info;
|
|
|
|
let default_reasoning_effort = model_info.default_reasoning_level;
|
|
let reasoning = if model_info.supports_reasoning_summaries {
|
|
Some(Reasoning {
|
|
effort: self.state.effort.or(default_reasoning_effort),
|
|
summary: if self.state.summary == ReasoningSummaryConfig::None {
|
|
None
|
|
} else {
|
|
Some(self.state.summary)
|
|
},
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let include = if reasoning.is_some() {
|
|
vec!["reasoning.encrypted_content".to_string()]
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
let verbosity = if model_info.support_verbosity {
|
|
self.state
|
|
.config
|
|
.model_verbosity
|
|
.or(model_info.default_verbosity)
|
|
} else {
|
|
if self.state.config.model_verbosity.is_some() {
|
|
warn!(
|
|
"model_verbosity is set but ignored as the model does not support verbosity: {}",
|
|
model_info.slug
|
|
);
|
|
}
|
|
None
|
|
};
|
|
|
|
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
|
|
let conversation_id = self.state.conversation_id.to_string();
|
|
|
|
ApiResponsesOptions {
|
|
reasoning,
|
|
include,
|
|
prompt_cache_key: Some(conversation_id.clone()),
|
|
text,
|
|
store_override: None,
|
|
conversation_id: Some(conversation_id),
|
|
session_source: Some(self.state.session_source.clone()),
|
|
extra_headers: build_responses_headers(&self.state.config, Some(&self.turn_state)),
|
|
compression,
|
|
turn_state: Some(Arc::clone(&self.turn_state)),
|
|
}
|
|
}
|
|
|
|
fn get_incremental_items(&self, input_items: &[ResponseItem]) -> Option<Vec<ResponseItem>> {
|
|
// Checks whether the current request input is an incremental append to the previous request.
|
|
// If items in the new request contain all the items from the previous request we build
|
|
// a response.append request otherwise we start with a fresh response.create request.
|
|
let previous_len = self.websocket_last_items.len();
|
|
let can_append = previous_len > 0
|
|
&& input_items.starts_with(&self.websocket_last_items)
|
|
&& previous_len < input_items.len();
|
|
if can_append {
|
|
Some(input_items[previous_len..].to_vec())
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
fn prepare_websocket_request(
|
|
&self,
|
|
api_prompt: &ApiPrompt,
|
|
options: &ApiResponsesOptions,
|
|
) -> ResponsesWsRequest {
|
|
if let Some(append_items) = self.get_incremental_items(&api_prompt.input) {
|
|
return ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest {
|
|
input: append_items,
|
|
});
|
|
}
|
|
|
|
let ApiResponsesOptions {
|
|
reasoning,
|
|
include,
|
|
prompt_cache_key,
|
|
text,
|
|
store_override,
|
|
..
|
|
} = options;
|
|
|
|
let store = store_override.unwrap_or(false);
|
|
let payload = ResponseCreateWsRequest {
|
|
model: self.state.model_info.slug.clone(),
|
|
instructions: api_prompt.instructions.clone(),
|
|
input: api_prompt.input.clone(),
|
|
tools: api_prompt.tools.clone(),
|
|
tool_choice: "auto".to_string(),
|
|
parallel_tool_calls: api_prompt.parallel_tool_calls,
|
|
reasoning: reasoning.clone(),
|
|
store,
|
|
stream: true,
|
|
include: include.clone(),
|
|
prompt_cache_key: prompt_cache_key.clone(),
|
|
text: text.clone(),
|
|
};
|
|
|
|
ResponsesWsRequest::ResponseCreate(payload)
|
|
}
|
|
|
|
async fn websocket_connection(
|
|
&mut self,
|
|
api_provider: codex_api::Provider,
|
|
api_auth: CoreAuthProvider,
|
|
options: &ApiResponsesOptions,
|
|
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
|
let needs_new = match self.connection.as_ref() {
|
|
Some(conn) => conn.is_closed().await,
|
|
None => true,
|
|
};
|
|
|
|
if needs_new {
|
|
let mut headers = options.extra_headers.clone();
|
|
headers.extend(build_conversation_headers(options.conversation_id.clone()));
|
|
let new_conn: ApiWebSocketConnection =
|
|
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
|
.connect(headers, options.turn_state.clone())
|
|
.await?;
|
|
self.connection = Some(new_conn);
|
|
}
|
|
|
|
self.connection.as_ref().ok_or(ApiError::Stream(
|
|
"websocket connection is unavailable".to_string(),
|
|
))
|
|
}
|
|
|
|
fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression {
|
|
if self
|
|
.state
|
|
.config
|
|
.features
|
|
.enabled(Feature::EnableRequestCompression)
|
|
&& auth.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
|
|
&& self.state.provider.is_openai()
|
|
{
|
|
Compression::Zstd
|
|
} else {
|
|
Compression::None
|
|
}
|
|
}
|
|
|
|
/// Streams a turn via the OpenAI Chat Completions API.
|
|
///
|
|
/// This path is only used when the provider is configured with
|
|
/// `WireApi::Chat`; it does not support `output_schema` today.
|
|
async fn stream_chat_completions(&self, prompt: &Prompt) -> Result<ApiResponseStream> {
|
|
if prompt.output_schema.is_some() {
|
|
return Err(CodexErr::UnsupportedOperation(
|
|
"output_schema is not supported for Chat Completions API".to_string(),
|
|
));
|
|
}
|
|
|
|
let auth_manager = self.state.auth_manager.clone();
|
|
let instructions = prompt.base_instructions.text.clone();
|
|
let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?;
|
|
let api_prompt = build_api_prompt(prompt, instructions, tools_json);
|
|
let conversation_id = self.state.conversation_id.to_string();
|
|
let session_source = self.state.session_source.clone();
|
|
|
|
let mut auth_recovery = auth_manager
|
|
.as_ref()
|
|
.map(super::auth::AuthManager::unauthorized_recovery);
|
|
loop {
|
|
let auth = match auth_manager.as_ref() {
|
|
Some(manager) => manager.auth().await,
|
|
None => None,
|
|
};
|
|
let api_provider = self
|
|
.state
|
|
.provider
|
|
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
|
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
|
let transport = ReqwestTransport::new(build_reqwest_client());
|
|
let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry();
|
|
let client = ApiChatClient::new(transport, api_provider, api_auth)
|
|
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
|
|
|
|
let stream_result = client
|
|
.stream_prompt(
|
|
&self.state.model_info.slug,
|
|
&api_prompt,
|
|
Some(conversation_id.clone()),
|
|
Some(session_source.clone()),
|
|
)
|
|
.await;
|
|
|
|
match stream_result {
|
|
Ok(stream) => return Ok(stream),
|
|
Err(ApiError::Transport(TransportError::Http { status, .. }))
|
|
if status == StatusCode::UNAUTHORIZED =>
|
|
{
|
|
handle_unauthorized(status, &mut auth_recovery).await?;
|
|
continue;
|
|
}
|
|
Err(err) => return Err(map_api_error(err)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Streams a turn via the OpenAI Responses API.
|
|
///
|
|
/// Handles SSE fixtures, reasoning summaries, verbosity, and the
|
|
/// `text` controls used for output schemas.
|
|
async fn stream_responses_api(&self, prompt: &Prompt) -> Result<ResponseStream> {
|
|
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
|
|
warn!(path, "Streaming from fixture");
|
|
let stream =
|
|
codex_api::stream_from_fixture(path, self.state.provider.stream_idle_timeout())
|
|
.map_err(map_api_error)?;
|
|
return Ok(map_response_stream(stream, self.state.otel_manager.clone()));
|
|
}
|
|
|
|
let auth_manager = self.state.auth_manager.clone();
|
|
let api_prompt = self.build_responses_request(prompt)?;
|
|
|
|
let mut auth_recovery = auth_manager
|
|
.as_ref()
|
|
.map(super::auth::AuthManager::unauthorized_recovery);
|
|
loop {
|
|
let auth = match auth_manager.as_ref() {
|
|
Some(manager) => manager.auth().await,
|
|
None => None,
|
|
};
|
|
let api_provider = self
|
|
.state
|
|
.provider
|
|
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
|
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
|
let transport = ReqwestTransport::new(build_reqwest_client());
|
|
let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry();
|
|
let compression = self.responses_request_compression(auth.as_ref());
|
|
|
|
let client = ApiResponsesClient::new(transport, api_provider, api_auth)
|
|
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
|
|
|
|
let options = self.build_responses_options(prompt, compression);
|
|
|
|
let stream_result = client
|
|
.stream_prompt(&self.state.model_info.slug, &api_prompt, options)
|
|
.await;
|
|
|
|
match stream_result {
|
|
Ok(stream) => {
|
|
return Ok(map_response_stream(stream, self.state.otel_manager.clone()));
|
|
}
|
|
Err(ApiError::Transport(TransportError::Http { status, .. }))
|
|
if status == StatusCode::UNAUTHORIZED =>
|
|
{
|
|
handle_unauthorized(status, &mut auth_recovery).await?;
|
|
continue;
|
|
}
|
|
Err(err) => return Err(map_api_error(err)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Streams a turn via the Responses API over WebSocket transport.
|
|
async fn stream_responses_websocket(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
|
|
let auth_manager = self.state.auth_manager.clone();
|
|
let api_prompt = self.build_responses_request(prompt)?;
|
|
|
|
let mut auth_recovery = auth_manager
|
|
.as_ref()
|
|
.map(super::auth::AuthManager::unauthorized_recovery);
|
|
loop {
|
|
let auth = match auth_manager.as_ref() {
|
|
Some(manager) => manager.auth().await,
|
|
None => None,
|
|
};
|
|
let api_provider = self
|
|
.state
|
|
.provider
|
|
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
|
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
|
let compression = self.responses_request_compression(auth.as_ref());
|
|
|
|
let options = self.build_responses_options(prompt, compression);
|
|
let request = self.prepare_websocket_request(&api_prompt, &options);
|
|
|
|
let connection = match self
|
|
.websocket_connection(api_provider.clone(), api_auth.clone(), &options)
|
|
.await
|
|
{
|
|
Ok(connection) => connection,
|
|
Err(ApiError::Transport(TransportError::Http { status, .. }))
|
|
if status == StatusCode::UNAUTHORIZED =>
|
|
{
|
|
handle_unauthorized(status, &mut auth_recovery).await?;
|
|
continue;
|
|
}
|
|
Err(err) => return Err(map_api_error(err)),
|
|
};
|
|
|
|
let stream_result = connection
|
|
.stream_request(request)
|
|
.await
|
|
.map_err(map_api_error)?;
|
|
self.websocket_last_items = api_prompt.input.clone();
|
|
|
|
return Ok(map_response_stream(
|
|
stream_result,
|
|
self.state.otel_manager.clone(),
|
|
));
|
|
}
|
|
}
|
|
|
|
/// Builds request and SSE telemetry for streaming API calls (Chat/Responses).
|
|
fn build_streaming_telemetry(&self) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
|
|
let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone()));
|
|
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
|
|
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
|
|
(request_telemetry, sse_telemetry)
|
|
}
|
|
}
|
|
|
|
impl ModelClient {
|
|
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
|
|
fn build_request_telemetry(&self) -> Arc<dyn RequestTelemetry> {
|
|
let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone()));
|
|
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
|
|
request_telemetry
|
|
}
|
|
}
|
|
|
|
/// Adapts the core `Prompt` type into the `codex-api` payload shape.
|
|
fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec<Value>) -> ApiPrompt {
|
|
ApiPrompt {
|
|
instructions,
|
|
input: prompt.get_formatted_input(),
|
|
tools: tools_json,
|
|
parallel_tool_calls: prompt.parallel_tool_calls,
|
|
output_schema: prompt.output_schema.clone(),
|
|
}
|
|
}
|
|
|
|
fn experimental_feature_headers(config: &Config) -> ApiHeaderMap {
|
|
let enabled = FEATURES
|
|
.iter()
|
|
.filter_map(|spec| {
|
|
if spec.stage.experimental_menu_description().is_some()
|
|
&& config.features.enabled(spec.id)
|
|
{
|
|
Some(spec.key)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
let value = enabled.join(",");
|
|
let mut headers = ApiHeaderMap::new();
|
|
if !value.is_empty()
|
|
&& let Ok(header_value) = HeaderValue::from_str(value.as_str())
|
|
{
|
|
headers.insert("x-codex-beta-features", header_value);
|
|
}
|
|
headers
|
|
}
|
|
|
|
fn build_responses_headers(
|
|
config: &Config,
|
|
turn_state: Option<&Arc<OnceLock<String>>>,
|
|
) -> ApiHeaderMap {
|
|
let mut headers = experimental_feature_headers(config);
|
|
headers.insert(
|
|
WEB_SEARCH_ELIGIBLE_HEADER,
|
|
HeaderValue::from_static(
|
|
if matches!(config.web_search_mode, Some(WebSearchMode::Disabled)) {
|
|
"false"
|
|
} else {
|
|
"true"
|
|
},
|
|
),
|
|
);
|
|
if let Some(turn_state) = turn_state
|
|
&& let Some(state) = turn_state.get()
|
|
&& let Ok(header_value) = HeaderValue::from_str(state)
|
|
{
|
|
headers.insert(X_CODEX_TURN_STATE_HEADER, header_value);
|
|
}
|
|
headers
|
|
}
|
|
|
|
fn map_response_stream<S>(api_stream: S, otel_manager: OtelManager) -> ResponseStream
|
|
where
|
|
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
|
|
+ Unpin
|
|
+ Send
|
|
+ 'static,
|
|
{
|
|
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
|
|
|
tokio::spawn(async move {
|
|
let mut logged_error = false;
|
|
let mut api_stream = api_stream;
|
|
while let Some(event) = api_stream.next().await {
|
|
match event {
|
|
Ok(ResponseEvent::Completed {
|
|
response_id,
|
|
token_usage,
|
|
}) => {
|
|
if let Some(usage) = &token_usage {
|
|
otel_manager.sse_event_completed(
|
|
usage.input_tokens,
|
|
usage.output_tokens,
|
|
Some(usage.cached_input_tokens),
|
|
Some(usage.reasoning_output_tokens),
|
|
usage.total_tokens,
|
|
);
|
|
}
|
|
if tx_event
|
|
.send(Ok(ResponseEvent::Completed {
|
|
response_id,
|
|
token_usage,
|
|
}))
|
|
.await
|
|
.is_err()
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
Ok(event) => {
|
|
if tx_event.send(Ok(event)).await.is_err() {
|
|
return;
|
|
}
|
|
}
|
|
Err(err) => {
|
|
let mapped = map_api_error(err);
|
|
if !logged_error {
|
|
otel_manager.see_event_completed_failed(&mapped);
|
|
logged_error = true;
|
|
}
|
|
if tx_event.send(Err(mapped)).await.is_err() {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
ResponseStream { rx_event }
|
|
}
|
|
|
|
/// Handles a 401 response by optionally refreshing ChatGPT tokens once.
|
|
///
|
|
/// When refresh succeeds, the caller should retry the API call; otherwise
|
|
/// the mapped `CodexErr` is returned to the caller.
|
|
async fn handle_unauthorized(
|
|
status: StatusCode,
|
|
auth_recovery: &mut Option<UnauthorizedRecovery>,
|
|
) -> Result<()> {
|
|
if let Some(recovery) = auth_recovery
|
|
&& recovery.has_next()
|
|
{
|
|
return match recovery.next().await {
|
|
Ok(_) => Ok(()),
|
|
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
|
|
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
|
|
};
|
|
}
|
|
|
|
Err(map_unauthorized_status(status))
|
|
}
|
|
|
|
fn map_unauthorized_status(status: StatusCode) -> CodexErr {
|
|
map_api_error(ApiError::Transport(TransportError::Http {
|
|
status,
|
|
url: None,
|
|
headers: None,
|
|
body: None,
|
|
}))
|
|
}
|
|
|
|
struct ApiTelemetry {
|
|
otel_manager: OtelManager,
|
|
}
|
|
|
|
impl ApiTelemetry {
|
|
fn new(otel_manager: OtelManager) -> Self {
|
|
Self { otel_manager }
|
|
}
|
|
}
|
|
|
|
impl RequestTelemetry for ApiTelemetry {
|
|
fn on_request(
|
|
&self,
|
|
attempt: u64,
|
|
status: Option<HttpStatusCode>,
|
|
error: Option<&TransportError>,
|
|
duration: Duration,
|
|
) {
|
|
let error_message = error.map(std::string::ToString::to_string);
|
|
self.otel_manager.record_api_request(
|
|
attempt,
|
|
status.map(|s| s.as_u16()),
|
|
error_message.as_deref(),
|
|
duration,
|
|
);
|
|
}
|
|
}
|
|
|
|
impl SseTelemetry for ApiTelemetry {
|
|
fn on_sse_poll(
|
|
&self,
|
|
result: &std::result::Result<
|
|
Option<std::result::Result<Event, EventStreamError<TransportError>>>,
|
|
tokio::time::error::Elapsed,
|
|
>,
|
|
duration: Duration,
|
|
) {
|
|
self.otel_manager.log_sse_event(result, duration);
|
|
}
|
|
}
|