Compare commits

...

2 Commits

Author SHA1 Message Date
easong-openai
1f3c7a5588 parseable event 2025-08-19 20:08:38 -07:00
easong-openai
19bd3513fe show background stream error 2025-08-19 01:13:58 -07:00
8 changed files with 22528 additions and 18 deletions

View File

@@ -94,6 +94,7 @@ use crate::protocol::PatchApplyEndEvent;
use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::StreamRetryEvent;
use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnDiffEvent;
@@ -1501,16 +1502,17 @@ async fn run_turn(
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);
// Surface retry information to any UI/frontend so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
),
)
.await;
// Surface retry information to any UI/frontend in a structured way.
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::StreamRetry(StreamRetryEvent {
attempt: retries as u32,
max_attempts: max_retries as u32,
delay,
cause: e.to_string(),
}),
};
let _ = sess.tx_event.send(event).await;
tokio::time::sleep(delay).await;
} else {
@@ -1739,13 +1741,16 @@ async fn run_compact_task(
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
),
)
.await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::StreamRetry(StreamRetryEvent {
attempt: retries as u32,
max_attempts: max_retries as u32,
delay,
cause: e.to_string(),
}),
};
sess.send_event(event).await;
tokio::time::sleep(delay).await;
continue;
} else {

View File

@@ -20,6 +20,7 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::StreamRetryEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
@@ -174,6 +175,21 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::StreamRetry(StreamRetryEvent {
attempt,
max_attempts,
delay,
cause,
}) => {
ts_println!(
self,
"{}",
format!(
"stream error: {cause}; retrying {attempt}/{max_attempts} in {delay:?}"
)
.style(self.dimmed)
);
}
EventMsg::TaskStarted => {
// Ignore.
}

View File

@@ -267,6 +267,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::StreamRetry(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)

View File

@@ -439,6 +439,9 @@ pub enum EventMsg {
ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent),
/// Notification that a streaming response failed transiently and will be retried.
StreamRetry(StreamRetryEvent),
BackgroundEvent(BackgroundEventEvent),
/// Notification that the agent is about to apply a code patch. Mirrors
@@ -681,6 +684,14 @@ pub struct ApplyPatchApprovalRequestEvent {
pub grant_root: Option<PathBuf>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamRetryEvent {
pub attempt: u32,
pub max_attempts: u32,
pub delay: Duration,
pub cause: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BackgroundEventEvent {
pub message: String,

View File

@@ -11,7 +11,9 @@ use crate::slash_command::SlashCommand;
use crate::tui;
use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use color_eyre::eyre::Result;
use crossterm::SynchronizedUpdate;
@@ -274,6 +276,25 @@ impl App<'_> {
}
AppEvent::KeyEvent(key_event) => {
match key_event {
KeyEvent {
code: KeyCode::Char('e'),
modifiers: crossterm::event::KeyModifiers::CONTROL,
kind: KeyEventKind::Press,
..
} => {
let env = std::env::var("MANUALLY_DEBUG_TUI_BACKGROUND_RETRY")
.unwrap_or_default();
if env == "1" {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "manual".to_string(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "stream error: stream disconnected before completion: idle timeout waiting for SSE; retrying 1/5 in 200ms…".to_string(),
}),
}));
} else {
self.dispatch_key_event(key_event);
}
}
KeyEvent {
code: KeyCode::Char('c'),
modifiers: crossterm::event::KeyModifiers::CONTROL,

View File

@@ -22,6 +22,7 @@ use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::StreamRetryEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TokenUsage;
use codex_core::protocol::TurnDiffEvent;
@@ -297,6 +298,7 @@ impl ChatWidget<'_> {
fn on_background_event(&mut self, message: String) {
debug!("BackgroundEvent: {message}");
self.add_to_history(&history_cell::new_background_event(message));
}
/// Periodic tick to commit at most one queued line to history with a small delay,
/// animating the output.
@@ -652,6 +654,17 @@ impl ChatWidget<'_> {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
self.on_background_event(message)
}
EventMsg::StreamRetry(StreamRetryEvent {
attempt,
max_attempts,
delay,
cause,
}) => {
let text = format!(
"stream error: {cause}; retrying {attempt}/{max_attempts} in {delay:?}"
);
self.on_background_event(text)
}
}
// Coalesce redraws: issue at most one after handling the event
if self.needs_redraw {

View File

@@ -181,7 +181,7 @@ fn open_fixture(name: &str) -> std::fs::File {
return f;
}
}
// 2) Fallback to parent (workspace root)
// 2) Fallback to parent (workspace crate root)
{
let mut p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
p.push("..");
@@ -979,3 +979,117 @@ fn deltas_then_same_final_message_are_rendered_snapshot() {
.collect::<String>();
assert_snapshot!(combined);
}
#[tokio::test(flavor = "current_thread")]
async fn timeout_session_transcript_shows_background_errors() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
// Set up a VT100 test terminal to capture ANSI visual output
let width: u16 = 80;
let height: u16 = 2000;
let viewport = ratatui::layout::Rect::new(0, height - 1, width, 1);
let backend = ratatui::backend::TestBackend::new(width, height);
let mut terminal = crate::custom_terminal::Terminal::with_options(backend)
.expect("failed to construct terminal");
terminal.set_viewport_area(viewport);
// Replay the recorded session into the widget and collect transcript
let file = open_fixture("timeout-session-log.jsonl");
let reader = BufReader::new(file);
let mut ansi: Vec<u8> = Vec::new();
for line in reader.lines() {
let line = line.expect("read line");
if line.trim().is_empty() || line.starts_with('#') {
continue;
}
let Ok(v): Result<serde_json::Value, _> = serde_json::from_str(&line) else {
continue;
};
let Some(dir) = v.get("dir").and_then(|d| d.as_str()) else {
continue;
};
if dir != "to_tui" {
continue;
}
let Some(kind) = v.get("kind").and_then(|k| k.as_str()) else {
continue;
};
match kind {
"codex_event" => {
if let Some(payload) = v.get("payload") {
let ev: Event = serde_json::from_value(payload.clone()).expect("parse");
chat.handle_codex_event(ev);
while let Ok(app_ev) = rx.try_recv() {
if let AppEvent::InsertHistory(lines) = app_ev {
crate::insert_history::insert_history_lines_to_writer(
&mut terminal,
&mut ansi,
lines,
);
}
}
}
}
"app_event" => {
if let Some(variant) = v.get("variant").and_then(|s| s.as_str()) {
if variant == "CommitTick" {
chat.on_commit_tick();
while let Ok(app_ev) = rx.try_recv() {
if let AppEvent::InsertHistory(lines) = app_ev {
crate::insert_history::insert_history_lines_to_writer(
&mut terminal,
&mut ansi,
lines,
);
}
}
}
}
}
_ => {}
}
}
// Build the final VT100 visual by parsing the ANSI stream. Trim trailing spaces per line
// and drop trailing empty lines for stable comparisons.
let mut parser = vt100::Parser::new(height, width, 0);
parser.process(&ansi);
let mut lines: Vec<String> = Vec::with_capacity(height as usize);
for row in 0..height {
let mut s = String::with_capacity(width as usize);
for col in 0..width {
if let Some(cell) = parser.screen().cell(row, col) {
if let Some(ch) = cell.contents().chars().next() {
s.push(ch);
} else {
s.push(' ');
}
} else {
s.push(' ');
}
}
lines.push(s.trim_end().to_string());
}
while lines.last().is_some_and(|l| l.is_empty()) {
lines.pop();
}
let visible_after = lines.join("\n");
let visible_flat = visible_after.replace('\n', " ");
// Assertions: ensure background events are visible and contain timeout info.
assert!(
visible_flat.contains("stream error:"),
"missing 'stream error:' in vt100 output:\n{visible_after}"
);
assert!(
visible_flat.contains("idle timeout waiting for SSE"),
"missing timeout detail in vt100 output:\n{visible_after}"
);
assert!(
visible_flat.contains("retrying 1/"),
"missing retry indicator in vt100 output:\n{visible_after}"
);
}

File diff suppressed because one or more lines are too long