mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Simplification 2
This commit is contained in:
@@ -38,12 +38,7 @@ impl ChatCompletionsApiClient {
|
||||
));
|
||||
}
|
||||
|
||||
let extra_headers: Vec<(&str, String)> = self
|
||||
.config
|
||||
.extra_headers
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str(), v.clone()))
|
||||
.collect();
|
||||
let extra_headers = crate::client::http::header_pairs(&self.config.extra_headers);
|
||||
let mut req_builder = crate::client::http::build_request(
|
||||
&self.config.http_client,
|
||||
&self.config.provider,
|
||||
@@ -62,24 +57,19 @@ impl ChatCompletionsApiClient {
|
||||
.log_request(0, || req_builder.send())
|
||||
.await?;
|
||||
|
||||
let (tx_event, rx_event) =
|
||||
tokio::sync::mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
|
||||
let stream = res
|
||||
.bytes_stream()
|
||||
.map_err(|err| Error::ResponseStreamFailed {
|
||||
source: err,
|
||||
request_id: None,
|
||||
});
|
||||
let idle_timeout = self.config.provider.stream_idle_timeout();
|
||||
let otel = self.config.otel_event_manager.clone();
|
||||
tokio::spawn(crate::client::sse::process_sse_wire(
|
||||
let (_, rx_event) = crate::client::sse::spawn_wire_stream(
|
||||
stream,
|
||||
tx_event,
|
||||
idle_timeout,
|
||||
otel,
|
||||
&self.config.provider,
|
||||
self.config.otel_event_manager.clone(),
|
||||
crate::decode_wire::chat::WireChatSseDecoder::new(),
|
||||
));
|
||||
);
|
||||
|
||||
Ok(crate::stream::EventStream::from_receiver(rx_event))
|
||||
Ok(rx_event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::path::Path;
|
||||
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use futures::TryStreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use crate::error::Error;
|
||||
@@ -14,7 +13,6 @@ pub async fn stream_from_fixture_wire(
|
||||
provider: ModelProviderInfo,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) -> Result<crate::stream::WireResponseStream> {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
|
||||
let display_path = path.as_ref().display().to_string();
|
||||
let content = std::fs::read_to_string(path.as_ref()).map_err(|err| {
|
||||
Error::Other(format!(
|
||||
@@ -33,12 +31,11 @@ pub async fn stream_from_fixture_wire(
|
||||
|
||||
let rdr = std::io::Cursor::new(content);
|
||||
let stream = ReaderStream::new(rdr).map_err(|err| Error::Other(err.to_string()));
|
||||
tokio::spawn(crate::client::sse::process_sse_wire(
|
||||
let (_, rx_event) = crate::client::sse::spawn_wire_stream(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
&provider,
|
||||
otel_event_manager,
|
||||
crate::decode_wire::responses::WireResponsesSseDecoder,
|
||||
));
|
||||
Ok(crate::stream::EventStream::from_receiver(rx_event))
|
||||
);
|
||||
Ok(rx_event)
|
||||
}
|
||||
|
||||
@@ -46,3 +46,11 @@ pub async fn resolve_auth(auth_provider: &Option<Arc<dyn AuthProvider>>) -> Opti
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert owned header pairs into borrowed key/value tuples for reqwest.
|
||||
pub fn header_pairs(headers: &[(String, String)]) -> Vec<(&str, String)> {
|
||||
headers
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str(), v.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::time::Instant;
|
||||
|
||||
use bytes::Bytes;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_provider_config::ModelProviderInfo;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -15,6 +16,51 @@ use crate::stream::WireEvent;
|
||||
|
||||
// Legacy ResponseEvent-based SSE framer removed
|
||||
|
||||
async fn send_stream_error(
|
||||
otel_event_manager: &OtelEventManager,
|
||||
tx_event: &mpsc::Sender<Result<WireEvent>>,
|
||||
event: Option<String>,
|
||||
duration: Duration,
|
||||
log_reason: impl std::fmt::Display,
|
||||
error: Error,
|
||||
) {
|
||||
otel_event_manager.sse_event_failed(event.as_ref(), duration, &log_reason);
|
||||
let _ = tx_event.send(Err(error)).await;
|
||||
}
|
||||
|
||||
/// Spawn an SSE processing task and return a sender/stream pair for wire events.
|
||||
pub fn spawn_wire_stream<S, D>(
|
||||
stream: S,
|
||||
provider: &ModelProviderInfo,
|
||||
otel_event_manager: OtelEventManager,
|
||||
decoder: D,
|
||||
) -> (
|
||||
mpsc::Sender<Result<WireEvent>>,
|
||||
crate::stream::WireResponseStream,
|
||||
)
|
||||
where
|
||||
S: Stream<Item = Result<Bytes>> + Send + 'static + Unpin,
|
||||
D: crate::client::WireResponseDecoder + Send + 'static,
|
||||
{
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<WireEvent>>(1600);
|
||||
let idle_timeout = provider.stream_idle_timeout();
|
||||
let otel = otel_event_manager;
|
||||
let tx_for_task = tx_event.clone();
|
||||
|
||||
tokio::spawn(process_sse_wire(
|
||||
stream,
|
||||
tx_for_task,
|
||||
idle_timeout,
|
||||
otel,
|
||||
decoder,
|
||||
));
|
||||
|
||||
(
|
||||
tx_event,
|
||||
crate::stream::EventStream::from_receiver(rx_event),
|
||||
)
|
||||
}
|
||||
|
||||
/// Generic SSE framer for wire events: Byte stream -> framed JSON -> WireResponseDecoder.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn process_sse_wire<S, D>(
|
||||
@@ -36,57 +82,45 @@ pub async fn process_sse_wire<S, D>(
|
||||
let duration = start.elapsed();
|
||||
match result {
|
||||
Err(_) => {
|
||||
otel_event_manager.sse_event_failed(
|
||||
send_stream_error(
|
||||
&otel_event_manager,
|
||||
&tx_event,
|
||||
None,
|
||||
duration,
|
||||
&"idle timeout waiting for SSE",
|
||||
);
|
||||
let _ = tx_event
|
||||
.send(Err(Error::Stream(
|
||||
"idle timeout waiting for SSE",
|
||||
Error::Stream(
|
||||
"stream idle timeout fired before Completed event".to_string(),
|
||||
None,
|
||||
)))
|
||||
.await;
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(Some(Err(err))) => {
|
||||
otel_event_manager.sse_event_failed(None, duration, &err);
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
let message = format!("{err}");
|
||||
send_stream_error(
|
||||
&otel_event_manager,
|
||||
&tx_event,
|
||||
None,
|
||||
duration,
|
||||
&message,
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(Some(Ok(chunk))) => {
|
||||
let chunk_str = match std::str::from_utf8(&chunk) {
|
||||
Ok(s) => s,
|
||||
Err(err) => {
|
||||
otel_event_manager.sse_event_failed(
|
||||
None,
|
||||
duration,
|
||||
&format!("UTF8 error: {err}"),
|
||||
);
|
||||
let _ = tx_event
|
||||
.send(Err(Error::Other(format!(
|
||||
"Invalid UTF-8 in SSE chunk: {err}"
|
||||
))))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
.replace("\r\n", "\n")
|
||||
.replace('\r', "\n");
|
||||
|
||||
buffer.push_str(&chunk_str);
|
||||
while let Some(frame) = next_frame(&mut buffer) {
|
||||
if !handle_frame(
|
||||
frame,
|
||||
&mut decoder,
|
||||
&tx_event,
|
||||
&otel_event_manager,
|
||||
duration,
|
||||
)
|
||||
.await
|
||||
{
|
||||
return;
|
||||
}
|
||||
if !process_chunk(
|
||||
chunk,
|
||||
duration,
|
||||
&mut buffer,
|
||||
&mut decoder,
|
||||
&tx_event,
|
||||
&otel_event_manager,
|
||||
)
|
||||
.await
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
@@ -107,6 +141,45 @@ pub async fn process_sse_wire<S, D>(
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_chunk<D>(
|
||||
chunk: Bytes,
|
||||
duration: Duration,
|
||||
buffer: &mut String,
|
||||
decoder: &mut D,
|
||||
tx_event: &mpsc::Sender<Result<WireEvent>>,
|
||||
otel_event_manager: &OtelEventManager,
|
||||
) -> bool
|
||||
where
|
||||
D: crate::client::WireResponseDecoder + Send,
|
||||
{
|
||||
let chunk_str = match std::str::from_utf8(&chunk) {
|
||||
Ok(s) => s,
|
||||
Err(err) => {
|
||||
send_stream_error(
|
||||
otel_event_manager,
|
||||
tx_event,
|
||||
None,
|
||||
duration,
|
||||
&format!("UTF8 error: {err}"),
|
||||
Error::Other(format!("Invalid UTF-8 in SSE chunk: {err}")),
|
||||
)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
.replace("\r\n", "\n")
|
||||
.replace('\r', "\n");
|
||||
|
||||
buffer.push_str(&chunk_str);
|
||||
while let Some(frame) = next_frame(buffer) {
|
||||
if !handle_frame(frame, decoder, tx_event, otel_event_manager, duration).await {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn next_frame(buffer: &mut String) -> Option<String> {
|
||||
loop {
|
||||
let idx = buffer.find("\n\n")?;
|
||||
@@ -170,8 +243,16 @@ where
|
||||
otel_event_manager.sse_event_kind(&frame.event);
|
||||
}
|
||||
Err(e) => {
|
||||
otel_event_manager.sse_event_failed(Some(&frame.event), duration, &e);
|
||||
let _ = tx_event.send(Err(e)).await;
|
||||
let reason = format!("{e}");
|
||||
send_stream_error(
|
||||
otel_event_manager,
|
||||
tx_event,
|
||||
Some(frame.event.clone()),
|
||||
duration,
|
||||
&reason,
|
||||
e,
|
||||
)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
pub mod chat;
|
||||
pub mod responses;
|
||||
@@ -12,6 +12,14 @@ use crate::error::Error;
|
||||
use crate::error::Result;
|
||||
use crate::stream::WireEvent;
|
||||
|
||||
async fn send_wire_event(tx: &mpsc::Sender<crate::error::Result<WireEvent>>, event: WireEvent) {
|
||||
let _ = tx.send(Ok(event)).await;
|
||||
}
|
||||
|
||||
fn serialize_response_item(item: ResponseItem) -> Value {
|
||||
serde_json::to_value(item).unwrap_or_else(|_| Value::String(String::new()))
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct FunctionCallState {
|
||||
active: bool,
|
||||
@@ -34,6 +42,171 @@ impl WireChatSseDecoder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
async fn emit_created_once(&mut self, tx: &mpsc::Sender<crate::error::Result<WireEvent>>) {
|
||||
if self.created_emitted {
|
||||
return;
|
||||
}
|
||||
send_wire_event(tx, WireEvent::Created).await;
|
||||
self.created_emitted = true;
|
||||
}
|
||||
|
||||
async fn handle_content_delta(
|
||||
&mut self,
|
||||
delta: &Value,
|
||||
tx: &mpsc::Sender<crate::error::Result<WireEvent>>,
|
||||
) {
|
||||
if let Some(content) = delta.get("content").and_then(|c| c.as_array()) {
|
||||
for piece in content {
|
||||
if let Some(text) = piece.get("text").and_then(|t| t.as_str()) {
|
||||
self.push_assistant_text(text, tx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(reasoning) = delta.get("reasoning_content").and_then(|c| c.as_array()) {
|
||||
for entry in reasoning {
|
||||
if let Some(text) = entry.get("text").and_then(|t| t.as_str()) {
|
||||
self.push_reasoning_text(text, tx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn push_assistant_text(
|
||||
&mut self,
|
||||
text: &str,
|
||||
tx: &mpsc::Sender<crate::error::Result<WireEvent>>,
|
||||
) {
|
||||
self.start_assistant(tx).await;
|
||||
self.assistant_text.push_str(text);
|
||||
send_wire_event(tx, WireEvent::OutputTextDelta(text.to_string())).await;
|
||||
}
|
||||
|
||||
async fn push_reasoning_text(
|
||||
&mut self,
|
||||
text: &str,
|
||||
tx: &mpsc::Sender<crate::error::Result<WireEvent>>,
|
||||
) {
|
||||
self.start_reasoning(tx).await;
|
||||
self.reasoning_text.push_str(text);
|
||||
send_wire_event(tx, WireEvent::ReasoningContentDelta(text.to_string())).await;
|
||||
}
|
||||
|
||||
async fn start_assistant(&mut self, tx: &mpsc::Sender<crate::error::Result<WireEvent>>) {
|
||||
if self.assistant_started {
|
||||
return;
|
||||
}
|
||||
self.assistant_started = true;
|
||||
let message = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: String::new(),
|
||||
}],
|
||||
};
|
||||
send_wire_event(
|
||||
tx,
|
||||
WireEvent::OutputItemAdded(serialize_response_item(message)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn start_reasoning(&mut self, tx: &mpsc::Sender<crate::error::Result<WireEvent>>) {
|
||||
if self.reasoning_started {
|
||||
return;
|
||||
}
|
||||
self.reasoning_started = true;
|
||||
let reasoning_item = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: vec![],
|
||||
content: None,
|
||||
encrypted_content: None,
|
||||
};
|
||||
send_wire_event(
|
||||
tx,
|
||||
WireEvent::OutputItemAdded(serialize_response_item(reasoning_item)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn record_tool_calls(&mut self, delta: &Value) {
|
||||
if let Some(tool_calls) = delta.get("tool_calls").and_then(|c| c.as_array()) {
|
||||
for call in tool_calls {
|
||||
if let Some(id_val) = call.get("id").and_then(|id| id.as_str()) {
|
||||
self.fn_call_state.call_id = Some(id_val.to_string());
|
||||
}
|
||||
if let Some(function) = call.get("function") {
|
||||
if let Some(name) = function.get("name").and_then(|n| n.as_str()) {
|
||||
self.fn_call_state.name = Some(name.to_string());
|
||||
self.fn_call_state.active = true;
|
||||
}
|
||||
if let Some(args) = function.get("arguments").and_then(|a| a.as_str()) {
|
||||
self.fn_call_state.arguments.push_str(args);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_function_call(&mut self) -> Option<Value> {
|
||||
if !self.fn_call_state.active {
|
||||
return None;
|
||||
}
|
||||
let function_name = self.fn_call_state.name.take().unwrap_or_default();
|
||||
let call_id = self.fn_call_state.call_id.take().unwrap_or_default();
|
||||
let arguments = std::mem::take(&mut self.fn_call_state.arguments);
|
||||
self.fn_call_state = FunctionCallState::default();
|
||||
|
||||
Some(serde_json::json!({
|
||||
"type": "function_call",
|
||||
"id": call_id,
|
||||
"call_id": call_id,
|
||||
"name": function_name,
|
||||
"arguments": arguments,
|
||||
}))
|
||||
}
|
||||
|
||||
fn finish_reasoning(&mut self) -> Option<Value> {
|
||||
if !self.reasoning_started {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut content = Vec::new();
|
||||
let text = std::mem::take(&mut self.reasoning_text);
|
||||
if !text.is_empty() {
|
||||
content.push(ReasoningItemContent::ReasoningText { text });
|
||||
}
|
||||
self.reasoning_started = false;
|
||||
|
||||
Some(serialize_response_item(ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: vec![],
|
||||
content: Some(content),
|
||||
encrypted_content: None,
|
||||
}))
|
||||
}
|
||||
|
||||
fn finish_assistant(&mut self) -> Option<Value> {
|
||||
if !self.assistant_started {
|
||||
return None;
|
||||
}
|
||||
let text = std::mem::take(&mut self.assistant_text);
|
||||
self.assistant_started = false;
|
||||
|
||||
Some(serialize_response_item(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText { text }],
|
||||
}))
|
||||
}
|
||||
|
||||
fn reset_reasoning_and_assistant(&mut self) {
|
||||
self.assistant_started = false;
|
||||
self.assistant_text.clear();
|
||||
self.reasoning_started = false;
|
||||
self.reasoning_text.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -57,137 +230,39 @@ impl WireResponseDecoder for WireChatSseDecoder {
|
||||
.unwrap_or_default();
|
||||
|
||||
for choice in choices {
|
||||
if !self.created_emitted {
|
||||
let _ = tx.send(Ok(WireEvent::Created)).await;
|
||||
self.created_emitted = true;
|
||||
}
|
||||
self.emit_created_once(tx).await;
|
||||
|
||||
if let Some(delta) = choice.get("delta") {
|
||||
if let Some(content) = delta.get("content").and_then(|c| c.as_array()) {
|
||||
for piece in content {
|
||||
if let Some(text) = piece.get("text").and_then(|t| t.as_str()) {
|
||||
if !self.assistant_started {
|
||||
self.assistant_started = true;
|
||||
let message = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: String::new(),
|
||||
}],
|
||||
};
|
||||
let value = serde_json::to_value(message)
|
||||
.unwrap_or_else(|_| Value::String(String::new()));
|
||||
let _ = tx.send(Ok(WireEvent::OutputItemAdded(value))).await;
|
||||
}
|
||||
self.assistant_text.push_str(text);
|
||||
let _ = tx
|
||||
.send(Ok(WireEvent::OutputTextDelta(text.to_string())))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(tool_calls) = delta.get("tool_calls").and_then(|c| c.as_array()) {
|
||||
for call in tool_calls {
|
||||
if let Some(id_val) = call.get("id").and_then(|id| id.as_str()) {
|
||||
self.fn_call_state.call_id = Some(id_val.to_string());
|
||||
}
|
||||
if let Some(function) = call.get("function") {
|
||||
if let Some(name) = function.get("name").and_then(|n| n.as_str()) {
|
||||
self.fn_call_state.name = Some(name.to_string());
|
||||
self.fn_call_state.active = true;
|
||||
}
|
||||
if let Some(args) = function.get("arguments").and_then(|a| a.as_str()) {
|
||||
self.fn_call_state.arguments.push_str(args);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(reasoning) = delta.get("reasoning_content").and_then(|c| c.as_array()) {
|
||||
for entry in reasoning {
|
||||
if let Some(text) = entry.get("text").and_then(|t| t.as_str()) {
|
||||
if !self.reasoning_started {
|
||||
self.reasoning_started = true;
|
||||
let reasoning_item = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: vec![],
|
||||
content: None,
|
||||
encrypted_content: None,
|
||||
};
|
||||
let value = serde_json::to_value(reasoning_item)
|
||||
.unwrap_or_else(|_| Value::String(String::new()));
|
||||
let _ = tx.send(Ok(WireEvent::OutputItemAdded(value))).await;
|
||||
}
|
||||
self.reasoning_text.push_str(text);
|
||||
let _ = tx
|
||||
.send(Ok(WireEvent::ReasoningContentDelta(text.to_string())))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.handle_content_delta(delta, tx).await;
|
||||
self.record_tool_calls(delta);
|
||||
}
|
||||
|
||||
if let Some(finish_reason) = choice.get("finish_reason").and_then(|f| f.as_str()) {
|
||||
match finish_reason {
|
||||
"tool_calls" if self.fn_call_state.active => {
|
||||
let function_name = self.fn_call_state.name.take().unwrap_or_default();
|
||||
let call_id = self.fn_call_state.call_id.take().unwrap_or_default();
|
||||
let arguments = self.fn_call_state.arguments.clone();
|
||||
self.fn_call_state = FunctionCallState::default();
|
||||
|
||||
let item = serde_json::json!({
|
||||
"type": "function_call",
|
||||
"id": call_id,
|
||||
"call_id": call_id,
|
||||
"name": function_name,
|
||||
"arguments": arguments,
|
||||
});
|
||||
let _ = tx.send(Ok(WireEvent::OutputItemDone(item))).await;
|
||||
"tool_calls" => {
|
||||
if let Some(item) = self.finish_function_call() {
|
||||
send_wire_event(tx, WireEvent::OutputItemDone(item)).await;
|
||||
}
|
||||
}
|
||||
"stop" | "length" => {
|
||||
if self.reasoning_started {
|
||||
let mut content = Vec::new();
|
||||
if !self.reasoning_text.is_empty() {
|
||||
content.push(ReasoningItemContent::ReasoningText {
|
||||
text: self.reasoning_text.clone(),
|
||||
});
|
||||
}
|
||||
let reasoning_item = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: vec![],
|
||||
content: Some(content),
|
||||
encrypted_content: None,
|
||||
};
|
||||
let value = serde_json::to_value(reasoning_item)
|
||||
.unwrap_or_else(|_| Value::String(String::new()));
|
||||
let _ = tx.send(Ok(WireEvent::OutputItemDone(value))).await;
|
||||
if let Some(reasoning_item) = self.finish_reasoning() {
|
||||
send_wire_event(tx, WireEvent::OutputItemDone(reasoning_item)).await;
|
||||
}
|
||||
|
||||
if self.assistant_started {
|
||||
let message = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: self.assistant_text.clone(),
|
||||
}],
|
||||
};
|
||||
let value = serde_json::to_value(message)
|
||||
.unwrap_or_else(|_| Value::String(String::new()));
|
||||
let _ = tx.send(Ok(WireEvent::OutputItemDone(value))).await;
|
||||
if let Some(message) = self.finish_assistant() {
|
||||
send_wire_event(tx, WireEvent::OutputItemDone(message)).await;
|
||||
}
|
||||
|
||||
let _ = tx
|
||||
.send(Ok(WireEvent::Completed {
|
||||
send_wire_event(
|
||||
tx,
|
||||
WireEvent::Completed {
|
||||
response_id: String::new(),
|
||||
token_usage: None,
|
||||
}))
|
||||
.await;
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
self.assistant_started = false;
|
||||
self.assistant_text.clear();
|
||||
self.reasoning_started = false;
|
||||
self.reasoning_text.clear();
|
||||
self.reset_reasoning_and_assistant();
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::ConversationId;
|
||||
use futures::TryStreamExt;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
use tracing::trace;
|
||||
|
||||
@@ -79,10 +78,7 @@ impl ResponsesApiClient {
|
||||
),
|
||||
];
|
||||
owned_headers.extend(self.config.extra_headers.iter().cloned());
|
||||
let extra_headers: Vec<(&str, String)> = owned_headers
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str(), v.clone()))
|
||||
.collect();
|
||||
let extra_headers = crate::client::http::header_pairs(&owned_headers);
|
||||
let mut req_builder = crate::client::http::build_request(
|
||||
&self.config.http_client,
|
||||
&self.config.provider,
|
||||
@@ -112,32 +108,7 @@ impl ResponsesApiClient {
|
||||
request_id: None,
|
||||
})?;
|
||||
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
|
||||
if let Some(snapshot) = crate::client::rate_limits::parse_rate_limit_snapshot(res.headers())
|
||||
&& tx_event
|
||||
.send(Ok(crate::stream::WireEvent::RateLimits(
|
||||
crate::stream::WireRateLimitSnapshot {
|
||||
primary: snapshot
|
||||
.primary
|
||||
.map(|w| crate::stream::WireRateLimitWindow {
|
||||
used_percent: Some(w.used_percent),
|
||||
window_minutes: w.window_minutes,
|
||||
resets_at: w.resets_at,
|
||||
}),
|
||||
secondary: snapshot
|
||||
.secondary
|
||||
.map(|w| crate::stream::WireRateLimitWindow {
|
||||
used_percent: Some(w.used_percent),
|
||||
window_minutes: w.window_minutes,
|
||||
resets_at: w.resets_at,
|
||||
}),
|
||||
},
|
||||
)))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug!("receiver dropped rate limit snapshot event");
|
||||
}
|
||||
let snapshot = crate::client::rate_limits::parse_rate_limit_snapshot(res.headers());
|
||||
|
||||
let stream = res
|
||||
.bytes_stream()
|
||||
@@ -145,16 +116,22 @@ impl ResponsesApiClient {
|
||||
source: err,
|
||||
request_id: None,
|
||||
});
|
||||
let idle_timeout = self.config.provider.stream_idle_timeout();
|
||||
let otel = self.config.otel_event_manager.clone();
|
||||
tokio::spawn(crate::client::sse::process_sse_wire(
|
||||
stream,
|
||||
tx_event,
|
||||
idle_timeout,
|
||||
otel,
|
||||
crate::decode_wire::responses::WireResponsesSseDecoder,
|
||||
));
|
||||
|
||||
Ok(crate::stream::EventStream::from_receiver(rx_event))
|
||||
let (tx_event, rx_event) = crate::client::sse::spawn_wire_stream(
|
||||
stream,
|
||||
&self.config.provider,
|
||||
self.config.otel_event_manager.clone(),
|
||||
crate::decode_wire::responses::WireResponsesSseDecoder,
|
||||
);
|
||||
if let Some(snapshot) = snapshot
|
||||
&& tx_event
|
||||
.send(Ok(crate::stream::WireEvent::RateLimits(snapshot.into())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug!("receiver dropped rate limit snapshot event");
|
||||
}
|
||||
|
||||
Ok(rx_event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,14 +42,6 @@ impl RoutedApiClient {
|
||||
) -> Result<WireResponseStream> {
|
||||
match self.config.provider.wire_api {
|
||||
WireApi::Responses => {
|
||||
let cfg = ResponsesApiClientConfig {
|
||||
http_client: self.config.http_client.clone(),
|
||||
provider: self.config.provider.clone(),
|
||||
conversation_id: self.config.conversation_id,
|
||||
auth_provider: self.config.auth_provider.clone(),
|
||||
otel_event_manager: self.config.otel_event_manager.clone(),
|
||||
extra_headers: self.config.extra_headers.clone(),
|
||||
};
|
||||
if let Some(path) = &self.config.responses_fixture_path {
|
||||
return crate::client::fixtures::stream_from_fixture_wire(
|
||||
path,
|
||||
@@ -58,19 +50,35 @@ impl RoutedApiClient {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let client = ResponsesApiClient::new(cfg)?;
|
||||
let client = ResponsesApiClient::new(self.responses_config())?;
|
||||
client.stream_payload_wire(payload_json).await
|
||||
}
|
||||
WireApi::Chat => {
|
||||
let cfg = ChatCompletionsApiClientConfig {
|
||||
http_client: self.config.http_client.clone(),
|
||||
provider: self.config.provider.clone(),
|
||||
otel_event_manager: self.config.otel_event_manager.clone(),
|
||||
extra_headers: self.config.extra_headers.clone(),
|
||||
};
|
||||
let client = ChatCompletionsApiClient::new(cfg)?;
|
||||
let client = ChatCompletionsApiClient::new(self.chat_config())?;
|
||||
client.stream_payload_wire(payload_json).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RoutedApiClient {
|
||||
fn responses_config(&self) -> ResponsesApiClientConfig {
|
||||
ResponsesApiClientConfig {
|
||||
http_client: self.config.http_client.clone(),
|
||||
provider: self.config.provider.clone(),
|
||||
conversation_id: self.config.conversation_id,
|
||||
auth_provider: self.config.auth_provider.clone(),
|
||||
otel_event_manager: self.config.otel_event_manager.clone(),
|
||||
extra_headers: self.config.extra_headers.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn chat_config(&self) -> ChatCompletionsApiClientConfig {
|
||||
ChatCompletionsApiClientConfig {
|
||||
http_client: self.config.http_client.clone(),
|
||||
provider: self.config.provider.clone(),
|
||||
otel_event_manager: self.config.otel_event_manager.clone(),
|
||||
extra_headers: self.config.extra_headers.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use futures::Stream;
|
||||
use serde::Serialize;
|
||||
@@ -104,6 +105,25 @@ pub struct WireRateLimitSnapshot {
|
||||
pub secondary: Option<WireRateLimitWindow>,
|
||||
}
|
||||
|
||||
impl From<RateLimitWindow> for WireRateLimitWindow {
|
||||
fn from(window: RateLimitWindow) -> Self {
|
||||
Self {
|
||||
used_percent: Some(window.used_percent),
|
||||
window_minutes: window.window_minutes,
|
||||
resets_at: window.resets_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RateLimitSnapshot> for WireRateLimitSnapshot {
|
||||
fn from(snapshot: RateLimitSnapshot) -> Self {
|
||||
Self {
|
||||
primary: snapshot.primary.map(Into::into),
|
||||
secondary: snapshot.secondary.map(Into::into),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WireEvent {
|
||||
Created,
|
||||
|
||||
Reference in New Issue
Block a user