try something

This commit is contained in:
jif-oai
2025-11-12 11:57:13 +00:00
parent 769e9cc92c
commit fe95c24442
2 changed files with 38 additions and 2 deletions

View File

@@ -77,7 +77,20 @@ pub async fn process_sse_wire<S, D>(
}
}
}
Ok(None) => return,
Ok(None) => {
// If the stream ended without a trailing blank line, flush any
// buffered JSON frame to the decoder before returning.
if !data_buffer.is_empty() {
let json = std::mem::take(&mut data_buffer);
if let Err(e) = decoder
.on_frame(&json, &tx_event, &otel_event_manager)
.await
{
let _ = tx_event.send(Err(e)).await;
}
}
return;
}
}
}
}

View File

@@ -2013,6 +2013,9 @@ async fn try_run_turn(
);
let mut output: FuturesOrdered<BoxFuture<CodexResult<ProcessedResponseItem>>> =
FuturesOrdered::new();
// Track whether any tool calls have been scheduled so we can salvage
// their outputs if the stream disconnects before `response.completed`.
let mut saw_tool_call = false;
let mut active_item: Option<TurnItem> = None;
@@ -2031,8 +2034,27 @@ async fn try_run_turn(
};
let event = match event {
Some(res) => res?,
Some(res) => match res {
Ok(ev) => ev,
Err(e) => {
if saw_tool_call {
let processed_items = output.try_collect().await?;
return Ok(TurnRunResult {
processed_items,
total_token_usage: None,
});
}
return Err(e);
}
},
None => {
if saw_tool_call {
let processed_items = output.try_collect().await?;
return Ok(TurnRunResult {
processed_items,
total_token_usage: None,
});
}
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
@@ -2060,6 +2082,7 @@ async fn try_run_turn(
let response =
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
saw_tool_call = true;
output.push_back(
async move {
Ok(ProcessedResponseItem {