Compare commits

...

1 Commits

Author SHA1 Message Date
shijie-openai
d461135b5c [DNM]: force err 2025-10-08 09:03:21 -07:00

View File

@@ -643,70 +643,66 @@ async fn process_sse<S>(
{
let mut stream = stream.eventsource();
// Quick debug: after 1s, emit a generic error and stop processing.
let cutoff = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(cutoff);
// If the stream stays completely silent for an extended period treat it as disconnected.
// The response id returned from the "complete" message.
let mut response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<CodexErr> = None;
loop {
let sse = match otel_event_manager
.log_sse_event(|| timeout(idle_timeout, stream.next()))
.await
{
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
let event = CodexErr::Stream(e.to_string(), None);
let _ = tx_event.send(Err(event)).await;
return;
}
Ok(None) => {
match response_completed {
Some(ResponseCompleted {
id: response_id,
usage,
}) => {
if let Some(token_usage) = &usage {
otel_event_manager.sse_event_completed(
token_usage.input_tokens,
token_usage.output_tokens,
token_usage
.input_tokens_details
.as_ref()
.map(|d| d.cached_tokens),
token_usage
.output_tokens_details
.as_ref()
.map(|d| d.reasoning_tokens),
token_usage.total_tokens,
);
let sse = {
let next_fut =
otel_event_manager.log_sse_event(|| timeout(idle_timeout, stream.next()));
tokio::select! {
_ = &mut cutoff => {
let _ = tx_event
.send(Err(CodexErr::Stream("forced cutoff after 1s (debug)".into(), None)))
.await;
return;
}
res = next_fut => {
match res {
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
let event = CodexErr::Stream(e.to_string(), None);
let _ = tx_event.send(Err(event)).await;
return;
}
Ok(None) => {
match response_completed {
Some(ResponseCompleted { id: response_id, usage }) => {
if let Some(token_usage) = &usage {
otel_event_manager.sse_event_completed(
token_usage.input_tokens,
token_usage.output_tokens,
token_usage.input_tokens_details.as_ref().map(|d| d.cached_tokens),
token_usage.output_tokens_details.as_ref().map(|d| d.reasoning_tokens),
token_usage.total_tokens,
);
}
let event = ResponseEvent::Completed { response_id, token_usage: usage.map(Into::into) };
let _ = tx_event.send(Ok(event)).await;
}
None => {
let error = response_error.unwrap_or(CodexErr::Stream("stream closed before response.completed".into(), None));
otel_event_manager.see_event_completed_failed(&error);
let _ = tx_event.send(Err(error)).await;
}
}
return;
}
Err(_) => {
let _ = tx_event
.send(Err(CodexErr::Stream("idle timeout waiting for SSE".into(), None)))
.await;
return;
}
let event = ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
};
let _ = tx_event.send(Ok(event)).await;
}
None => {
let error = response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
otel_event_manager.see_event_completed_failed(&error);
let _ = tx_event.send(Err(error)).await;
}
}
return;
}
Err(_) => {
let _ = tx_event
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),
None,
)))
.await;
return;
}
};