Clean-up 2

This commit is contained in:
jif-oai
2025-11-13 16:49:59 +01:00
parent 1cd9e24232
commit 29d93176b6
5 changed files with 85 additions and 51 deletions

View File

@@ -20,10 +20,8 @@ use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use eventsource_stream::Eventsource;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use reqwest::StatusCode;
use serde_json::json;
@@ -31,7 +29,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tracing::trace;
/// Implementation for the classic Chat Completions API.
@@ -357,14 +354,7 @@ pub(crate) async fn stream_chat_completions(
}
if let SessionSource::SubAgent(sub) = session_source {
let subagent = if let SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
let subagent = crate::client::types::subagent_label(sub);
body["metadata"] = json!({
"x-openai-subagent": subagent,
});
@@ -557,21 +547,15 @@ async fn process_chat_sse<S>(
let mut reasoning_item: Option<ResponseItem> = None;
loop {
let start = std::time::Instant::now();
let response = timeout(idle_timeout, stream.next()).await;
let duration = start.elapsed();
otel_event_manager.log_sse_event(&response, duration);
let sse = match response {
Ok(Some(Ok(ev))) => ev,
Ok(Some(Err(e))) => {
let _ = tx_event
.send(Err(CodexErr::Stream(e.to_string(), None)))
.await;
return;
}
Ok(None) => {
// Stream closed gracefully emit Completed with dummy id.
let sse = match crate::client::sse::next_sse_event(
&mut stream,
idle_timeout,
&otel_event_manager,
)
.await
{
crate::client::sse::SseNext::Event(ev) => ev,
crate::client::sse::SseNext::Eof => {
let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id: String::new(),
@@ -580,7 +564,11 @@ async fn process_chat_sse<S>(
.await;
return;
}
Err(_) => {
crate::client::sse::SseNext::StreamError(message) => {
let _ = tx_event.send(Err(CodexErr::Stream(message, None))).await;
return;
}
crate::client::sse::SseNext::Timeout => {
let _ = tx_event
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),

View File

@@ -1,6 +1,7 @@
mod chat_completions;
pub mod http;
mod responses;
mod sse;
pub mod types;
pub(crate) use chat_completions::AggregateStreamExt;

View File

@@ -20,7 +20,6 @@ use reqwest::header::HeaderMap;
use serde::Deserialize;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tracing::debug;
use tracing::trace;
use tracing::warn;
@@ -311,14 +310,7 @@ impl ModelClient {
// Include subagent header only for subagent sessions.
if let SessionSource::SubAgent(sub) = &self.session_source {
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
let subagent = crate::client::types::subagent_label(sub);
req_builder = req_builder.header("x-openai-subagent", subagent);
}
@@ -700,13 +692,10 @@ async fn process_sse(
let mut response_error: Option<CodexErr> = None;
loop {
let start = tokio::time::Instant::now();
let next_event = timeout(idle_timeout, stream.next()).await;
let duration = start.elapsed();
otel_event_manager.log_sse_event(&next_event, duration);
match next_event {
Ok(Some(Ok(ev))) => {
match crate::client::sse::next_sse_event(&mut stream, idle_timeout, &otel_event_manager)
.await
{
crate::client::sse::SseNext::Event(ev) => {
if let Err(e) =
handle_sse_event(ev, &mut response_completed, &mut response_error, &tx_event)
.await
@@ -715,16 +704,14 @@ async fn process_sse(
break;
}
}
Ok(Some(Err(e))) => {
let _ = tx_event
.send(Err(CodexErr::Stream(e.to_string(), None)))
.await;
return;
}
Ok(None) => {
crate::client::sse::SseNext::Eof => {
break;
}
Err(_) => {
crate::client::sse::SseNext::StreamError(message) => {
let _ = tx_event.send(Err(CodexErr::Stream(message, None))).await;
return;
}
crate::client::sse::SseNext::Timeout => {
let _ = tx_event
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".to_string(),

View File

@@ -0,0 +1,46 @@
use std::time::Duration;
use codex_otel::otel_event_manager::OtelEventManager;
use eventsource_stream::Event;
use eventsource_stream::EventStreamError as StreamError;
use futures::Stream;
use futures::StreamExt;
use tokio::time::timeout;
/// Result of polling the next SSE event with timeout and logging applied.
pub(crate) enum SseNext {
Event(Event),
Eof,
StreamError(String),
Timeout,
}
/// Read the next SSE event from `stream`, applying an idle timeout and recording
/// telemetry via `otel_event_manager`.
///
/// This helper centralizes the boilerplate for:
/// - `tokio::time::timeout`
/// - calling `log_sse_event`
/// - mapping the different outcomes into a small enum that callers can
/// interpret according to their own protocol semantics.
pub(crate) async fn next_sse_event<S, E>(
stream: &mut S,
idle_timeout: Duration,
otel_event_manager: &OtelEventManager,
) -> SseNext
where
S: Stream<Item = Result<Event, StreamError<E>>> + Unpin,
E: std::fmt::Display,
{
let start = tokio::time::Instant::now();
let next_event = timeout(idle_timeout, stream.next()).await;
let duration = start.elapsed();
otel_event_manager.log_sse_event(&next_event, duration);
match next_event {
Ok(Some(Ok(ev))) => SseNext::Event(ev),
Ok(Some(Err(e))) => SseNext::StreamError(e.to_string()),
Ok(None) => SseNext::Eof,
Err(_) => SseNext::Timeout,
}
}

View File

@@ -1,5 +1,6 @@
use crate::error::Result;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::SubAgentSource;
use crate::protocol::TokenUsage;
use crate::tools::spec::JsonSchema;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
@@ -30,6 +31,17 @@ pub enum ResponseEvent {
RateLimits(RateLimitSnapshot),
}
pub(crate) fn subagent_label(sub: &SubAgentSource) -> String {
if let SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
}
}
#[derive(Debug, Serialize)]
pub(crate) struct Reasoning {
#[serde(skip_serializing_if = "Option::is_none")]