Compare commits

...

1 Commits

Author SHA1 Message Date
pap
7ae9033b19 wip when delta not streamed 2025-08-10 18:47:49 +01:00
3 changed files with 462 additions and 19 deletions

View File

@@ -314,13 +314,13 @@ struct SseEvent {
#[derive(Debug, Deserialize)]
struct ResponseCreated {}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
struct ResponseCompleted {
id: String,
usage: Option<ResponseCompletedUsage>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
struct ResponseCompletedUsage {
input_tokens: u64,
input_tokens_details: Option<ResponseCompletedInputTokensDetails>,
@@ -341,12 +341,12 @@ impl From<ResponseCompletedUsage> for TokenUsage {
}
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
struct ResponseCompletedInputTokensDetails {
cached_tokens: u64,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
struct ResponseCompletedOutputTokensDetails {
reasoning_tokens: u64,
}
@@ -363,6 +363,14 @@ async fn process_sse<S>(
// If the stream stays completely silent for an extended period treat it as disconnected.
// The response id returned from the "complete" message.
let mut response_completed: Option<ResponseCompleted> = None;
let mut completed_emitted: bool = false;
// Track whether we saw incremental output to decide if we need to fall back
// to parsing the final `response.output` array.
let mut saw_output_item_done = false;
let mut saw_any_streaming_delta = false;
let mut saw_reasoning_delta = false;
let mut saw_reasoning_item_done = false;
let mut deferred_completed_response: Option<serde_json::Value> = None;
loop {
let sse = match timeout(idle_timeout, stream.next()).await {
@@ -379,11 +387,35 @@ async fn process_sse<S>(
id: response_id,
usage,
}) => {
let event = ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
};
let _ = tx_event.send(Ok(event)).await;
// If the connection closed without an explicit completed event
// having been emitted earlier, emit it now (with fallback if needed).
if !completed_emitted {
// Fallback: if no items/deltas were forwarded but we captured a
// final response body, parse its `output` array now and emit
// synthetic OutputItemDone events so higher layers see a result.
if !saw_output_item_done && !saw_any_streaming_delta {
if let Some(resp) = deferred_completed_response.take() {
if let Some(output) =
resp.get("output").and_then(|v| v.as_array())
{
for v in output {
if let Ok(item) =
serde_json::from_value::<ResponseItem>(v.clone())
{
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(item)))
.await;
}
}
}
}
}
let event = ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
};
let _ = tx_event.send(Ok(event)).await;
}
}
None => {
let _ = tx_event
@@ -438,6 +470,10 @@ async fn process_sse<S>(
continue;
};
saw_output_item_done = true;
if matches!(item, ResponseItem::Reasoning { .. }) {
saw_reasoning_item_done = true;
}
let event = ResponseEvent::OutputItemDone(item);
if tx_event.send(Ok(event)).await.is_err() {
return;
@@ -445,6 +481,7 @@ async fn process_sse<S>(
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
saw_any_streaming_delta = true;
let event = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
@@ -453,6 +490,8 @@ async fn process_sse<S>(
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
saw_any_streaming_delta = true;
saw_reasoning_delta = true;
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
@@ -461,6 +500,8 @@ async fn process_sse<S>(
}
"response.reasoning_text.delta" => {
if let Some(delta) = event.delta {
saw_any_streaming_delta = true;
saw_reasoning_delta = true;
let event = ResponseEvent::ReasoningContentDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
@@ -487,10 +528,64 @@ async fn process_sse<S>(
}
// Final response completed includes array of output items & id
"response.completed" => {
if let Some(resp_val) = event.response {
match serde_json::from_value::<ResponseCompleted>(resp_val) {
if let Some(resp_val) = event.response.clone() {
match serde_json::from_value::<ResponseCompleted>(resp_val.clone()) {
Ok(r) => {
response_completed = Some(r);
deferred_completed_response = Some(resp_val);
// If we haven't streamed anything, emit output immediately and then Completed
// so UIs don't wait for the connection to fully close.
if !saw_output_item_done && !saw_any_streaming_delta {
if let Some(resp) = deferred_completed_response.clone() {
if let Some(output) =
resp.get("output").and_then(|v| v.as_array())
{
for v in output {
if let Ok(item) =
serde_json::from_value::<ResponseItem>(v.clone())
{
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(item)))
.await;
}
}
}
}
if let Some(ResponseCompleted { id, usage }) =
response_completed.clone()
{
let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id: id,
token_usage: usage.map(Into::into),
}))
.await;
completed_emitted = true;
}
} else if !(saw_reasoning_delta || saw_reasoning_item_done) {
// We streamed other content (e.g. assistant deltas), but never received
// reasoning deltas or reasoning items. Emit only the reasoning items from
// the final output so the UI can show the "thinking" block.
if let Some(resp) = deferred_completed_response.clone() {
if let Some(output) =
resp.get("output").and_then(|v| v.as_array())
{
for v in output {
if let Ok(item) =
serde_json::from_value::<ResponseItem>(v.clone())
{
if matches!(item, ResponseItem::Reasoning { .. }) {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(
item,
)))
.await;
}
}
}
}
}
}
}
Err(e) => {
debug!("failed to parse ResponseCompleted: {e}");

288
codex-rs/test-server.py Normal file
View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""
Minimal SSE proxy for the OpenAI Responses API at /v1/responses.
Two modes:
1) Default passthrough: forwards upstream SSE events as-is.
2) --final-only: suppresses streaming deltas and emits only one final
`response.output_item.done` (aggregated assistant text) followed by
`response.completed`. This is useful to reproduce the Codex TUI issue
where no messages render when only a final item is received.
Additionally logs the reconstructed assistant output on the server for
visibility.
Point Codex to this server by defining a provider with base_url pointing to
http://127.0.0.1:PORT/v1 and wire_api = "responses".
Example ~/.codex/config.toml snippet:
model = "o4-mini"
model_provider = "local-proxy"
[model_providers.local-proxy]
name = "Local Responses Proxy"
base_url = "http://127.0.0.1:18080/v1"
env_key = "OPENAI_API_KEY" # required by upstream; read by this server
wire_api = "responses"
Run:
pip install requests
python3 test-server.py --port 18080 [--final-only]
Server logs:
- Each SSE event type as it arrives
- Aggregated assistant text on completion
"""
from __future__ import annotations
import argparse
import json
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import List, Optional, Tuple
import time
import os
try:
import requests # type: ignore
except Exception:
requests = None # resolved at runtime; we print a helpful error
class ResponsesHandler(BaseHTTPRequestHandler):
server_version = "ResponsesProxy/0.2"
final_only: bool = False
upstream_base_url: str = "https://api.openai.com/v1"
bearer_env: str = "OPENAI_API_KEY"
def log_message(self, fmt: str, *args) -> None: # quieter logs
sys.stderr.write("%s - - [%s] " % (self.address_string(), self.log_date_time_string()))
sys.stderr.write((fmt % args) + "\n")
def _read_json(self) -> dict:
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length) if length > 0 else b"{}"
try:
return json.loads(body.decode("utf-8"))
except Exception:
return {}
def do_POST(self) -> None: # noqa: N802 required by BaseHTTPRequestHandler
# Accept both /responses and /v1/responses for convenience
if self.path not in ("/responses", "/v1/responses"):
self.send_error(404, "Not Found")
return
if requests is None:
self.send_error(500, "requests library is required. Run: pip install requests")
return
payload = self._read_json()
# Ensure streaming enabled upstream
payload["stream"] = True
self.log_message("Received POST %s; final-only=%s", self.path, ResponsesHandler.final_only)
api_key = os.environ.get(self.bearer_env, "").strip()
if not api_key:
self.send_error(401, f"Missing {self.bearer_env} in environment for upstream auth")
return
headers = {
"Authorization": f"Bearer {api_key}",
"OpenAI-Beta": "responses=experimental",
"Accept": "text/event-stream",
"Content-Type": "application/json",
}
upstream_url = f"{self.upstream_base_url}/responses"
try:
upstream = requests.post(
upstream_url,
headers=headers,
json=payload,
stream=True,
timeout=(10, None), # connect timeout, stream without read timeout
)
except Exception as e:
self.send_error(502, f"Upstream error: {e}")
return
if upstream.status_code < 200 or upstream.status_code >= 300:
# Try to surface upstream error body
try:
body = upstream.text
except Exception:
body = ""
self.send_response(upstream.status_code)
self.send_header("Content-Type", upstream.headers.get("Content-Type", "text/plain"))
self.end_headers()
self.wfile.write(body.encode("utf-8", errors="ignore"))
return
# Prepare SSE response headers to our client
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "close")
self.end_headers()
# Simple SSE framing: accumulate lines until a blank line terminates an event
buf_lines: List[str] = []
aggregated_text: List[str] = []
aggregated_reasoning_summary: List[str] = []
saw_reasoning_summary_done: bool = False
last_completed: Optional[dict] = None
def flush_downstream(event_type: str, data: Optional[dict]) -> None:
self.wfile.write(f"event: {event_type}\n".encode("utf-8"))
if data is not None:
payload = json.dumps(data, separators=(",", ":"))
self.wfile.write(f"data: {payload}\n\n".encode("utf-8"))
else:
self.wfile.write(b"\n")
self.wfile.flush()
self.log_message("SSE -> %s", event_type)
def handle_event(block: str) -> None:
nonlocal last_completed
# Parse a single SSE block (possibly multiple data: lines)
etype: Optional[str] = None
data_lines: List[str] = []
for line in block.splitlines():
if line.startswith("event:"):
etype = line[len("event:"):].strip()
elif line.startswith("data:"):
data_lines.append(line[len("data:"):].lstrip())
data_obj: Optional[dict] = None
if data_lines:
try:
data_obj = json.loads("\n".join(data_lines))
except Exception:
data_obj = None
if not etype:
return
# Logging and aggregation
self.log_message("SSE <- %s", etype)
if etype == "response.output_text.delta" and data_obj and "delta" in data_obj:
delta = data_obj.get("delta", "")
aggregated_text.append(delta)
elif etype == "response.output_item.done" and data_obj:
item = data_obj.get("item", {})
if item.get("type") == "message" and item.get("role") == "assistant":
for c in item.get("content", []) or []:
if c.get("type") == "output_text":
aggregated_text.append(c.get("text", ""))
elif etype == "response.reasoning_summary_text.delta" and data_obj and "delta" in data_obj:
aggregated_reasoning_summary.append(data_obj.get("delta", ""))
elif etype == "response.reasoning_summary_text.done":
if aggregated_reasoning_summary:
self.log_message(
"Reasoning summary finalized: %s",
"".join(aggregated_reasoning_summary),
)
else:
self.log_message("Reasoning summary finalized (no deltas captured)")
saw_reasoning_summary_done = True
elif etype == "response.completed" and data_obj:
last_completed = data_obj # capture id/usage
# Forwarding
if not ResponsesHandler.final_only:
# passthrough mode: forward all events
flush_downstream(etype, data_obj)
else:
# final-only mode: only forward created; suppress deltas and items until completed
if etype == "response.created":
flush_downstream(etype, data_obj)
elif etype == "response.completed":
# Emit one synthesized final message (if any), then completed
full_text = "".join(aggregated_text)
if full_text:
synthetic_item = {
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": full_text}],
},
}
flush_downstream("response.output_item.done", synthetic_item)
flush_downstream("response.completed", data_obj)
try:
for raw in upstream.iter_lines(decode_unicode=True):
# requests splits on \n preserve empty lines as block terminators
line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="ignore")
if line == "":
if buf_lines:
handle_event("\n".join(buf_lines))
buf_lines.clear()
# else: spurious blank
else:
buf_lines.append(line)
# Flush remaining
if buf_lines:
handle_event("\n".join(buf_lines))
buf_lines.clear()
finally:
# Summarize on server logs
final_text = "".join(aggregated_text)
self.log_message("Aggregated assistant output: %s", final_text)
final_reasoning = "".join(aggregated_reasoning_summary)
if saw_reasoning_summary_done or final_reasoning:
self.log_message("Aggregated reasoning summary: %s", final_reasoning)
# Ensure client connection closes cleanly
try:
self.wfile.flush()
except Exception:
pass
def do_GET(self) -> None: # simple health check
self.log_message("Received GET request with path: %s", self.path)
if self.path in ("/health", "/", "/v1/health"):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b"{\"ok\":true}")
self.log_message("Health check successful.")
else:
self.send_error(404, "Not Found")
self.log_message("Health check failed: Not Found")
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Minimal Responses SSE proxy for testing")
ap.add_argument("--host", default="127.0.0.1", help="bind host (default: 127.0.0.1)")
ap.add_argument("--port", type=int, default=18080, help="bind port (default: 18080)")
ap.add_argument("--upstream", default="https://api.openai.com/v1", help="upstream base URL (default: https://api.openai.com/v1)")
ap.add_argument("--bearer-env", default="OPENAI_API_KEY", help="env var for upstream API key (default: OPENAI_API_KEY)")
ap.add_argument("--final-only", action="store_true", help="suppress deltas and emit only a final message + completed")
args = ap.parse_args(argv)
# Configure class-level switches for the handler
ResponsesHandler.final_only = bool(args.final_only)
ResponsesHandler.upstream_base_url = str(args.upstream).rstrip("/")
ResponsesHandler.bearer_env = str(args.bearer_env)
httpd = HTTPServer((args.host, args.port), ResponsesHandler)
print(f"Test Responses server listening on http://{args.host}:{args.port}")
mode = "final-only" if args.final_only else "passthrough"
print(f"Mode: {mode}; Upstream: {ResponsesHandler.upstream_base_url}/responses; Auth env: {ResponsesHandler.bearer_env}")
print("Endpoints: POST /v1/responses (SSE), GET /health")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("Server interrupted by user.")
finally:
httpd.server_close()
print("Server closed.")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))

View File

@@ -79,6 +79,8 @@ pub(crate) struct ChatWidget<'a> {
current_stream: Option<StreamKind>,
stream_header_emitted: bool,
live_max_rows: u16,
// Avoid duplicate final-message rendering within a turn
printed_final_answer: bool,
}
struct UserMessage {
@@ -224,6 +226,7 @@ impl ChatWidget<'_> {
current_stream: None,
stream_header_emitted: false,
live_max_rows: 3,
printed_final_answer: false,
}
}
@@ -311,11 +314,36 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { message: _ }) => {
// Final assistant answer: commit all remaining rows and close with
// a blank line. Use the final text if provided, otherwise rely on
// streamed deltas already in the builder.
self.finalize_stream(StreamKind::Answer);
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// Final assistant answer received. If we streamed deltas for this
// answer, finalize and flush any remaining rows. If we did NOT get
// any deltas, surface the final text directly so the UI shows the
// reply even for providers that only emit a terminal message.
if !self.printed_final_answer
&& self.answer_buffer.is_empty()
&& self.current_stream != Some(StreamKind::Answer)
{
use ratatui::text::Line as RLine;
let mut lines: Vec<RLine<'static>> = Vec::new();
// Emit header once for the final message block.
lines.push(RLine::from("codex".magenta().bold()));
for l in message.lines() {
lines.push(RLine::from(l.to_string()));
}
// Close the block with a blank line for readability.
lines.push(RLine::from(""));
self.app_event_tx.send(AppEvent::InsertHistory(lines));
// Clear any residual live overlay/state to be safe.
self.bottom_pane.clear_live_ring();
self.live_builder = RowBuilder::new(self.live_builder.width());
self.current_stream = None;
self.stream_header_emitted = false;
self.printed_final_answer = true;
} else {
// We had streaming deltas; just finalize the stream.
self.finalize_stream(StreamKind::Answer);
self.printed_final_answer = true;
}
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
@@ -332,9 +360,32 @@ impl ChatWidget<'_> {
self.stream_push_and_maybe_commit(&delta);
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { text: _ }) => {
// Final reasoning: commit remaining rows and close with a blank.
self.finalize_stream(StreamKind::Reasoning);
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
// Final reasoning received. If no deltas were streamed for this
// reasoning block, surface the final text directly; otherwise
// just finalize the stream. Multiple final reasoning events can
// appear in a single turn; render each as its own block.
if self.reasoning_buffer.is_empty()
&& self.current_stream != Some(StreamKind::Reasoning)
{
use ratatui::text::Line as RLine;
let mut lines: Vec<RLine<'static>> = Vec::new();
lines.push(RLine::from("thinking".magenta().italic()));
// For summarized reasoning, the final text arrives via this event; we render
// a single block with a trailing blank line for readability.
for l in text.lines() {
lines.push(RLine::from(l.to_string()));
}
lines.push(RLine::from(""));
self.app_event_tx.send(AppEvent::InsertHistory(lines));
self.bottom_pane.clear_live_ring();
self.live_builder = RowBuilder::new(self.live_builder.width());
self.current_stream = None;
self.stream_header_emitted = false;
} else {
// We had streaming deltas; just finalize the reasoning stream.
self.finalize_stream(StreamKind::Reasoning);
}
self.request_redraw();
}
EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent {
@@ -357,6 +408,15 @@ impl ChatWidget<'_> {
// Replace composer with single-line spinner while waiting.
self.bottom_pane
.update_status_text("waiting for model".to_string());
// Reset duplicate-guard at the start of a turn
self.printed_final_answer = false;
// Also clear any stale live buffers from a previous turn.
self.answer_buffer.clear();
self.reasoning_buffer.clear();
self.content_buffer.clear();
self.live_builder = RowBuilder::new(self.live_builder.width());
self.current_stream = None;
self.stream_header_emitted = false;
self.request_redraw();
}
EventMsg::TaskComplete(TaskCompleteEvent {