mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix
This commit is contained in:
@@ -680,6 +680,33 @@ fn parse_header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
|
||||
headers.get(name)?.to_str().ok()
|
||||
}
|
||||
|
||||
async fn emit_completed(
|
||||
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
|
||||
otel_event_manager: &OtelEventManager,
|
||||
completed: ResponseCompleted,
|
||||
) {
|
||||
if let Some(token_usage) = &completed.usage {
|
||||
otel_event_manager.sse_event_completed(
|
||||
token_usage.input_tokens,
|
||||
token_usage.output_tokens,
|
||||
token_usage
|
||||
.input_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.cached_tokens),
|
||||
token_usage
|
||||
.output_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.reasoning_tokens),
|
||||
token_usage.total_tokens,
|
||||
);
|
||||
}
|
||||
let event = ResponseEvent::Completed {
|
||||
response_id: completed.id.clone(),
|
||||
token_usage: completed.usage.map(Into::into),
|
||||
};
|
||||
let _ = tx_event.send(Ok(event)).await;
|
||||
}
|
||||
|
||||
async fn process_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
@@ -692,7 +719,7 @@ async fn process_sse<S>(
|
||||
|
||||
// If the stream stays completely silent for an extended period treat it as disconnected.
|
||||
// The response id returned from the "complete" message.
|
||||
let mut response_completed: Option<ResponseCompleted> = None;
|
||||
let response_completed: Option<ResponseCompleted> = None;
|
||||
let mut response_error: Option<CodexErr> = None;
|
||||
|
||||
loop {
|
||||
@@ -711,30 +738,8 @@ async fn process_sse<S>(
|
||||
}
|
||||
Ok(None) => {
|
||||
match response_completed {
|
||||
Some(ResponseCompleted {
|
||||
id: response_id,
|
||||
usage,
|
||||
}) => {
|
||||
if let Some(token_usage) = &usage {
|
||||
otel_event_manager.sse_event_completed(
|
||||
token_usage.input_tokens,
|
||||
token_usage.output_tokens,
|
||||
token_usage
|
||||
.input_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.cached_tokens),
|
||||
token_usage
|
||||
.output_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.reasoning_tokens),
|
||||
token_usage.total_tokens,
|
||||
);
|
||||
}
|
||||
let event = ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage: usage.map(Into::into),
|
||||
};
|
||||
let _ = tx_event.send(Ok(event)).await;
|
||||
Some(completed) => {
|
||||
emit_completed(&tx_event, &otel_event_manager, completed).await
|
||||
}
|
||||
None => {
|
||||
let error = response_error.unwrap_or(CodexErr::Stream(
|
||||
@@ -864,26 +869,7 @@ async fn process_sse<S>(
|
||||
if let Some(resp_val) = event.response {
|
||||
match serde_json::from_value::<ResponseCompleted>(resp_val) {
|
||||
Ok(r) => {
|
||||
if let Some(token_usage) = &r.usage {
|
||||
otel_event_manager.sse_event_completed(
|
||||
token_usage.input_tokens,
|
||||
token_usage.output_tokens,
|
||||
token_usage
|
||||
.input_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.cached_tokens),
|
||||
token_usage
|
||||
.output_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.reasoning_tokens),
|
||||
token_usage.total_tokens,
|
||||
);
|
||||
}
|
||||
let event = ResponseEvent::Completed {
|
||||
response_id: r.id.clone(),
|
||||
token_usage: r.usage.map(Into::into),
|
||||
};
|
||||
let _ = tx_event.send(Ok(event)).await;
|
||||
emit_completed(&tx_event, &otel_event_manager, r).await;
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
Reference in New Issue
Block a user