Simplification

This commit is contained in:
jif-oai
2025-11-12 15:25:20 +00:00
parent 34b57eff3f
commit f5918d7e1b
7 changed files with 62 additions and 58 deletions

View File

@@ -10,13 +10,10 @@ use futures::TryStreamExt;
///
/// - `http_client`: Reqwest client used for HTTP requests.
/// - `provider`: Provider configuration (base URL, headers, retries, etc.).
/// - `model`: Model identifier to use.
/// - `otel_event_manager`: Telemetry event manager for request/stream instrumentation.
/// - `session_source`: Session metadata, used to set subagent headers when applicable.
pub struct ChatCompletionsApiClientConfig {
pub http_client: reqwest::Client,
pub provider: ModelProviderInfo,
pub model: String,
pub otel_event_manager: OtelEventManager,
pub extra_headers: Vec<(String, String)>,
}
@@ -34,7 +31,6 @@ impl ChatCompletionsApiClient {
pub async fn stream_payload_wire(
&self,
payload_json: &serde_json::Value,
_session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
if self.config.provider.wire_api != codex_provider_config::WireApi::Chat {
return Err(crate::error::Error::UnsupportedOperation(
@@ -42,7 +38,6 @@ impl ChatCompletionsApiClient {
));
}
let auth = crate::client::http::resolve_auth(&None).await;
let extra_headers: Vec<(&str, String)> = self
.config
.extra_headers
@@ -52,7 +47,7 @@ impl ChatCompletionsApiClient {
let mut req_builder = crate::client::http::build_request(
&self.config.http_client,
&self.config.provider,
&auth,
&None,
&extra_headers,
)
.await?;

View File

@@ -1,4 +1,3 @@
use std::io::BufRead;
use std::path::Path;
use codex_otel::otel_event_manager::OtelEventManager;
@@ -17,18 +16,20 @@ pub async fn stream_from_fixture_wire(
) -> Result<crate::stream::WireResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
let display_path = path.as_ref().display().to_string();
let file = std::fs::File::open(path.as_ref())
.map_err(|err| Error::Other(format!("failed to open fixture {display_path}: {err}")))?;
let lines = std::io::BufReader::new(file).lines();
let mut content = String::new();
for line in lines {
let line = line
.map_err(|err| Error::Other(format!("failed to read fixture {display_path}: {err}")))?;
content.push_str(&line);
content.push('\n');
content.push('\n');
}
let content = std::fs::read_to_string(path.as_ref()).map_err(|err| {
Error::Other(format!(
"failed to read fixture text from {display_path}: {err}"
))
})?;
let content = content
.lines()
.map(|line| {
let mut line_with_spacing = line.to_string();
line_with_spacing.push('\n');
line_with_spacing.push('\n');
line_with_spacing
})
.collect::<String>();
let rdr = std::io::Cursor::new(content);
let stream = ReaderStream::new(rdr).map_err(|err| Error::Other(err.to_string()));

View File

@@ -17,6 +17,10 @@ pub fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option<RateLimitSnapsho
"x-codex-secondary-reset-at",
);
if primary.is_none() && secondary.is_none() {
return None;
}
Some(RateLimitSnapshot { primary, secondary })
}

View File

@@ -25,6 +25,36 @@ struct StreamEvent {
delta: Option<String>,
}
#[derive(Default, Deserialize)]
struct WireUsage {
#[serde(default)]
input_tokens: i64,
#[serde(default)]
cached_input_tokens: Option<i64>,
#[serde(default)]
output_tokens: i64,
#[serde(default)]
reasoning_output_tokens: Option<i64>,
#[serde(default)]
total_tokens: i64,
#[serde(default)]
input_tokens_details: Option<WireInputTokensDetails>,
#[serde(default)]
output_tokens_details: Option<WireOutputTokensDetails>,
}
#[derive(Default, Deserialize)]
struct WireInputTokensDetails {
#[serde(default)]
cached_tokens: Option<i64>,
}
#[derive(Default, Deserialize)]
struct WireOutputTokensDetails {
#[serde(default)]
reasoning_tokens: Option<i64>,
}
pub struct WireResponsesSseDecoder;
#[async_trait]
@@ -122,45 +152,29 @@ impl WireResponseDecoder for WireResponsesSseDecoder {
}
fn parse_wire_usage(resp: &Value) -> Option<WireTokenUsage> {
let usage = resp.get("usage").cloned()?;
let input_tokens = usage
.get("input_tokens")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
let usage: WireUsage = serde_json::from_value(resp.get("usage")?.clone()).ok()?;
let cached_input_tokens = usage
.get("cached_input_tokens")
.and_then(serde_json::Value::as_i64)
.cached_input_tokens
.or_else(|| {
usage
.get("input_tokens_details")
.and_then(|d| d.get("cached_tokens"))
.and_then(serde_json::Value::as_i64)
.input_tokens_details
.and_then(|details| details.cached_tokens)
})
.unwrap_or(0);
let output_tokens = usage
.get("output_tokens")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
let reasoning_output_tokens = usage
.get("reasoning_output_tokens")
.and_then(serde_json::Value::as_i64)
.reasoning_output_tokens
.or_else(|| {
usage
.get("output_tokens_details")
.and_then(|d| d.get("reasoning_tokens"))
.and_then(serde_json::Value::as_i64)
.output_tokens_details
.and_then(|details| details.reasoning_tokens)
})
.unwrap_or(0);
let total_tokens = usage
.get("total_tokens")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
Some(WireTokenUsage {
input_tokens,
input_tokens: usage.input_tokens,
cached_input_tokens,
output_tokens,
output_tokens: usage.output_tokens,
reasoning_output_tokens,
total_tokens,
total_tokens: usage.total_tokens,
})
}

View File

@@ -20,14 +20,12 @@ use codex_provider_config::ModelProviderInfo;
///
/// - `http_client`: Reqwest client used for HTTP requests.
/// - `provider`: Provider configuration (base URL, headers, retries, etc.).
/// - `model`: Model identifier to use.
/// - `conversation_id`: Used to set conversation/session headers and cache keys.
/// - `auth_provider`: Optional provider of auth context (e.g., ChatGPT login token).
/// - `otel_event_manager`: Telemetry event manager for request/stream instrumentation.
pub struct ResponsesApiClientConfig {
pub http_client: reqwest::Client,
pub provider: ModelProviderInfo,
pub model: String,
pub conversation_id: ConversationId,
pub auth_provider: Option<Arc<dyn AuthProvider>>,
pub otel_event_manager: OtelEventManager,
@@ -46,11 +44,7 @@ impl ResponsesApiClient {
}
impl ResponsesApiClient {
pub async fn stream_payload_wire(
&self,
payload_json: &Value,
_session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
pub async fn stream_payload_wire(&self, payload_json: &Value) -> Result<WireResponseStream> {
if self.config.provider.wire_api != codex_provider_config::WireApi::Responses {
return Err(Error::UnsupportedOperation(
"ResponsesApiClient requires a Responses provider".to_string(),

View File

@@ -19,7 +19,6 @@ use codex_provider_config::ModelProviderInfo;
pub struct RoutedApiClientConfig {
pub http_client: reqwest::Client,
pub provider: ModelProviderInfo,
pub model: String,
pub conversation_id: ConversationId,
pub auth_provider: Option<Arc<dyn AuthProvider>>,
pub otel_event_manager: OtelEventManager,
@@ -46,7 +45,6 @@ impl RoutedApiClient {
let cfg = ResponsesApiClientConfig {
http_client: self.config.http_client.clone(),
provider: self.config.provider.clone(),
model: self.config.model.clone(),
conversation_id: self.config.conversation_id,
auth_provider: self.config.auth_provider.clone(),
otel_event_manager: self.config.otel_event_manager.clone(),
@@ -61,18 +59,17 @@ impl RoutedApiClient {
.await;
}
let client = ResponsesApiClient::new(cfg)?;
client.stream_payload_wire(payload_json, None).await
client.stream_payload_wire(payload_json).await
}
WireApi::Chat => {
let cfg = ChatCompletionsApiClientConfig {
http_client: self.config.http_client.clone(),
provider: self.config.provider.clone(),
model: self.config.model.clone(),
otel_event_manager: self.config.otel_event_manager.clone(),
extra_headers: self.config.extra_headers.clone(),
};
let client = ChatCompletionsApiClient::new(cfg)?;
client.stream_payload_wire(payload_json, None).await
client.stream_payload_wire(payload_json).await
}
}
}

View File

@@ -192,7 +192,6 @@ impl ModelClient {
let config = RoutedApiClientConfig {
http_client,
provider: self.provider.clone(),
model: self.config.model.clone(),
conversation_id: self.conversation_id,
auth_provider,
otel_event_manager: self.otel_event_manager.clone(),