Compare commits

...

5 Commits

Author SHA1 Message Date
Ahmed Ibrahim
9b7329699a Merge branch 'fix-timeout' of https://github.com/openai/codex into fix-timeout 2025-11-06 10:34:20 -08:00
Ahmed Ibrahim
8307d9bf3b fix 2025-11-06 10:34:11 -08:00
Ahmed Ibrahim
294dafcacf Merge branch 'main' into fix-timeout 2025-11-06 10:29:56 -08:00
Ahmed Ibrahim
0b26c76047 fix 2025-11-06 10:27:24 -08:00
Ahmed Ibrahim
ca9f9c6f5d usage 2025-11-06 09:25:43 -08:00

View File

@@ -680,6 +680,33 @@ fn parse_header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
headers.get(name)?.to_str().ok()
}
async fn emit_completed(
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
otel_event_manager: &OtelEventManager,
completed: ResponseCompleted,
) {
if let Some(token_usage) = &completed.usage {
otel_event_manager.sse_event_completed(
token_usage.input_tokens,
token_usage.output_tokens,
token_usage
.input_tokens_details
.as_ref()
.map(|d| d.cached_tokens),
token_usage
.output_tokens_details
.as_ref()
.map(|d| d.reasoning_tokens),
token_usage.total_tokens,
);
}
let event = ResponseEvent::Completed {
response_id: completed.id.clone(),
token_usage: completed.usage.map(Into::into),
};
let _ = tx_event.send(Ok(event)).await;
}
async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
@@ -692,7 +719,7 @@ async fn process_sse<S>(
// If the stream stays completely silent for an extended period treat it as disconnected.
// The response id returned from the "complete" message.
let mut response_completed: Option<ResponseCompleted> = None;
let response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<CodexErr> = None;
loop {
@@ -711,30 +738,8 @@ async fn process_sse<S>(
}
Ok(None) => {
match response_completed {
Some(ResponseCompleted {
id: response_id,
usage,
}) => {
if let Some(token_usage) = &usage {
otel_event_manager.sse_event_completed(
token_usage.input_tokens,
token_usage.output_tokens,
token_usage
.input_tokens_details
.as_ref()
.map(|d| d.cached_tokens),
token_usage
.output_tokens_details
.as_ref()
.map(|d| d.reasoning_tokens),
token_usage.total_tokens,
);
}
let event = ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
};
let _ = tx_event.send(Ok(event)).await;
Some(completed) => {
emit_completed(&tx_event, &otel_event_manager, completed).await
}
None => {
let error = response_error.unwrap_or(CodexErr::Stream(
@@ -864,7 +869,8 @@ async fn process_sse<S>(
if let Some(resp_val) = event.response {
match serde_json::from_value::<ResponseCompleted>(resp_val) {
Ok(r) => {
response_completed = Some(r);
emit_completed(&tx_event, &otel_event_manager, r).await;
return;
}
Err(e) => {
let error = format!("failed to parse ResponseCompleted: {e}");