mirror of
https://github.com/openai/codex.git
synced 2026-02-02 15:03:38 +00:00
Compare commits
2 Commits
patch-squa
...
handle-tim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f3c7a5588 | ||
|
|
19bd3513fe |
@@ -94,6 +94,7 @@ use crate::protocol::PatchApplyEndEvent;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::protocol::StreamRetryEvent;
|
||||
use crate::protocol::Submission;
|
||||
use crate::protocol::TaskCompleteEvent;
|
||||
use crate::protocol::TurnDiffEvent;
|
||||
@@ -1501,16 +1502,17 @@ async fn run_turn(
|
||||
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_background_event(
|
||||
&sub_id,
|
||||
format!(
|
||||
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
|
||||
),
|
||||
)
|
||||
.await;
|
||||
// Surface retry information to any UI/front‑end in a structured way.
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::StreamRetry(StreamRetryEvent {
|
||||
attempt: retries as u32,
|
||||
max_attempts: max_retries as u32,
|
||||
delay,
|
||||
cause: e.to_string(),
|
||||
}),
|
||||
};
|
||||
let _ = sess.tx_event.send(event).await;
|
||||
|
||||
tokio::time::sleep(delay).await;
|
||||
} else {
|
||||
@@ -1739,13 +1741,16 @@ async fn run_compact_task(
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
let delay = backoff(retries);
|
||||
sess.notify_background_event(
|
||||
&sub_id,
|
||||
format!(
|
||||
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::StreamRetry(StreamRetryEvent {
|
||||
attempt: retries as u32,
|
||||
max_attempts: max_retries as u32,
|
||||
delay,
|
||||
cause: e.to_string(),
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
tokio::time::sleep(delay).await;
|
||||
continue;
|
||||
} else {
|
||||
|
||||
@@ -20,6 +20,7 @@ use codex_core::protocol::McpToolCallEndEvent;
|
||||
use codex_core::protocol::PatchApplyBeginEvent;
|
||||
use codex_core::protocol::PatchApplyEndEvent;
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
use codex_core::protocol::StreamRetryEvent;
|
||||
use codex_core::protocol::TaskCompleteEvent;
|
||||
use codex_core::protocol::TurnAbortReason;
|
||||
use codex_core::protocol::TurnDiffEvent;
|
||||
@@ -174,6 +175,21 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
|
||||
ts_println!(self, "{}", message.style(self.dimmed));
|
||||
}
|
||||
EventMsg::StreamRetry(StreamRetryEvent {
|
||||
attempt,
|
||||
max_attempts,
|
||||
delay,
|
||||
cause,
|
||||
}) => {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}",
|
||||
format!(
|
||||
"stream error: {cause}; retrying {attempt}/{max_attempts} in {delay:?}…"
|
||||
)
|
||||
.style(self.dimmed)
|
||||
);
|
||||
}
|
||||
EventMsg::TaskStarted => {
|
||||
// Ignore.
|
||||
}
|
||||
|
||||
@@ -267,6 +267,7 @@ async fn run_codex_tool_session_inner(
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::ExecCommandEnd(_)
|
||||
| EventMsg::BackgroundEvent(_)
|
||||
| EventMsg::StreamRetry(_)
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
| EventMsg::PatchApplyEnd(_)
|
||||
| EventMsg::TurnDiff(_)
|
||||
|
||||
@@ -439,6 +439,9 @@ pub enum EventMsg {
|
||||
|
||||
ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent),
|
||||
|
||||
/// Notification that a streaming response failed transiently and will be retried.
|
||||
StreamRetry(StreamRetryEvent),
|
||||
|
||||
BackgroundEvent(BackgroundEventEvent),
|
||||
|
||||
/// Notification that the agent is about to apply a code patch. Mirrors
|
||||
@@ -681,6 +684,14 @@ pub struct ApplyPatchApprovalRequestEvent {
|
||||
pub grant_root: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct StreamRetryEvent {
|
||||
pub attempt: u32,
|
||||
pub max_attempts: u32,
|
||||
pub delay: Duration,
|
||||
pub cause: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct BackgroundEventEvent {
|
||||
pub message: String,
|
||||
|
||||
@@ -11,7 +11,9 @@ use crate::slash_command::SlashCommand;
|
||||
use crate::tui;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::BackgroundEventEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use color_eyre::eyre::Result;
|
||||
use crossterm::SynchronizedUpdate;
|
||||
@@ -274,6 +276,25 @@ impl App<'_> {
|
||||
}
|
||||
AppEvent::KeyEvent(key_event) => {
|
||||
match key_event {
|
||||
KeyEvent {
|
||||
code: KeyCode::Char('e'),
|
||||
modifiers: crossterm::event::KeyModifiers::CONTROL,
|
||||
kind: KeyEventKind::Press,
|
||||
..
|
||||
} => {
|
||||
let env = std::env::var("MANUALLY_DEBUG_TUI_BACKGROUND_RETRY")
|
||||
.unwrap_or_default();
|
||||
if env == "1" {
|
||||
self.app_event_tx.send(AppEvent::CodexEvent(Event {
|
||||
id: "manual".to_string(),
|
||||
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
|
||||
message: "stream error: stream disconnected before completion: idle timeout waiting for SSE; retrying 1/5 in 200ms…".to_string(),
|
||||
}),
|
||||
}));
|
||||
} else {
|
||||
self.dispatch_key_event(key_event);
|
||||
}
|
||||
}
|
||||
KeyEvent {
|
||||
code: KeyCode::Char('c'),
|
||||
modifiers: crossterm::event::KeyModifiers::CONTROL,
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_core::protocol::McpToolCallBeginEvent;
|
||||
use codex_core::protocol::McpToolCallEndEvent;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::PatchApplyBeginEvent;
|
||||
use codex_core::protocol::StreamRetryEvent;
|
||||
use codex_core::protocol::TaskCompleteEvent;
|
||||
use codex_core::protocol::TokenUsage;
|
||||
use codex_core::protocol::TurnDiffEvent;
|
||||
@@ -297,6 +298,7 @@ impl ChatWidget<'_> {
|
||||
|
||||
fn on_background_event(&mut self, message: String) {
|
||||
debug!("BackgroundEvent: {message}");
|
||||
self.add_to_history(&history_cell::new_background_event(message));
|
||||
}
|
||||
/// Periodic tick to commit at most one queued line to history with a small delay,
|
||||
/// animating the output.
|
||||
@@ -652,6 +654,17 @@ impl ChatWidget<'_> {
|
||||
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
|
||||
self.on_background_event(message)
|
||||
}
|
||||
EventMsg::StreamRetry(StreamRetryEvent {
|
||||
attempt,
|
||||
max_attempts,
|
||||
delay,
|
||||
cause,
|
||||
}) => {
|
||||
let text = format!(
|
||||
"stream error: {cause}; retrying {attempt}/{max_attempts} in {delay:?}…"
|
||||
);
|
||||
self.on_background_event(text)
|
||||
}
|
||||
}
|
||||
// Coalesce redraws: issue at most one after handling the event
|
||||
if self.needs_redraw {
|
||||
|
||||
@@ -181,7 +181,7 @@ fn open_fixture(name: &str) -> std::fs::File {
|
||||
return f;
|
||||
}
|
||||
}
|
||||
// 2) Fallback to parent (workspace root)
|
||||
// 2) Fallback to parent (workspace crate root)
|
||||
{
|
||||
let mut p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
p.push("..");
|
||||
@@ -979,3 +979,117 @@ fn deltas_then_same_final_message_are_rendered_snapshot() {
|
||||
.collect::<String>();
|
||||
assert_snapshot!(combined);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn timeout_session_transcript_shows_background_errors() {
|
||||
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
// Set up a VT100 test terminal to capture ANSI visual output
|
||||
let width: u16 = 80;
|
||||
let height: u16 = 2000;
|
||||
let viewport = ratatui::layout::Rect::new(0, height - 1, width, 1);
|
||||
let backend = ratatui::backend::TestBackend::new(width, height);
|
||||
let mut terminal = crate::custom_terminal::Terminal::with_options(backend)
|
||||
.expect("failed to construct terminal");
|
||||
terminal.set_viewport_area(viewport);
|
||||
|
||||
// Replay the recorded session into the widget and collect transcript
|
||||
let file = open_fixture("timeout-session-log.jsonl");
|
||||
let reader = BufReader::new(file);
|
||||
let mut ansi: Vec<u8> = Vec::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line.expect("read line");
|
||||
if line.trim().is_empty() || line.starts_with('#') {
|
||||
continue;
|
||||
}
|
||||
let Ok(v): Result<serde_json::Value, _> = serde_json::from_str(&line) else {
|
||||
continue;
|
||||
};
|
||||
let Some(dir) = v.get("dir").and_then(|d| d.as_str()) else {
|
||||
continue;
|
||||
};
|
||||
if dir != "to_tui" {
|
||||
continue;
|
||||
}
|
||||
let Some(kind) = v.get("kind").and_then(|k| k.as_str()) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match kind {
|
||||
"codex_event" => {
|
||||
if let Some(payload) = v.get("payload") {
|
||||
let ev: Event = serde_json::from_value(payload.clone()).expect("parse");
|
||||
chat.handle_codex_event(ev);
|
||||
while let Ok(app_ev) = rx.try_recv() {
|
||||
if let AppEvent::InsertHistory(lines) = app_ev {
|
||||
crate::insert_history::insert_history_lines_to_writer(
|
||||
&mut terminal,
|
||||
&mut ansi,
|
||||
lines,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"app_event" => {
|
||||
if let Some(variant) = v.get("variant").and_then(|s| s.as_str()) {
|
||||
if variant == "CommitTick" {
|
||||
chat.on_commit_tick();
|
||||
while let Ok(app_ev) = rx.try_recv() {
|
||||
if let AppEvent::InsertHistory(lines) = app_ev {
|
||||
crate::insert_history::insert_history_lines_to_writer(
|
||||
&mut terminal,
|
||||
&mut ansi,
|
||||
lines,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Build the final VT100 visual by parsing the ANSI stream. Trim trailing spaces per line
|
||||
// and drop trailing empty lines for stable comparisons.
|
||||
let mut parser = vt100::Parser::new(height, width, 0);
|
||||
parser.process(&ansi);
|
||||
let mut lines: Vec<String> = Vec::with_capacity(height as usize);
|
||||
for row in 0..height {
|
||||
let mut s = String::with_capacity(width as usize);
|
||||
for col in 0..width {
|
||||
if let Some(cell) = parser.screen().cell(row, col) {
|
||||
if let Some(ch) = cell.contents().chars().next() {
|
||||
s.push(ch);
|
||||
} else {
|
||||
s.push(' ');
|
||||
}
|
||||
} else {
|
||||
s.push(' ');
|
||||
}
|
||||
}
|
||||
lines.push(s.trim_end().to_string());
|
||||
}
|
||||
while lines.last().is_some_and(|l| l.is_empty()) {
|
||||
lines.pop();
|
||||
}
|
||||
|
||||
let visible_after = lines.join("\n");
|
||||
let visible_flat = visible_after.replace('\n', " ");
|
||||
|
||||
// Assertions: ensure background events are visible and contain timeout info.
|
||||
assert!(
|
||||
visible_flat.contains("stream error:"),
|
||||
"missing 'stream error:' in vt100 output:\n{visible_after}"
|
||||
);
|
||||
assert!(
|
||||
visible_flat.contains("idle timeout waiting for SSE"),
|
||||
"missing timeout detail in vt100 output:\n{visible_after}"
|
||||
);
|
||||
assert!(
|
||||
visible_flat.contains("retrying 1/"),
|
||||
"missing retry indicator in vt100 output:\n{visible_after}"
|
||||
);
|
||||
}
|
||||
|
||||
22329
codex-rs/tui/tests/fixtures/timeout-session-log.jsonl
vendored
Normal file
22329
codex-rs/tui/tests/fixtures/timeout-session-log.jsonl
vendored
Normal file
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user