Compare commits

...

1 Commits

Author SHA1 Message Date
easong-openai
f985670ee9 initial stab 2025-09-09 17:13:14 -07:00
4 changed files with 269 additions and 23 deletions

View File

@@ -441,6 +441,179 @@ fn open_fixture(name: &str) -> File {
File::open(name).expect("open fixture file")
}
#[test]
fn streaming_then_final_message_does_not_duplicate_bullet_line() {
// This reproduces a user report where a specific bullet line was shown twice.
// We simulate streaming deltas for a markdown list, then send the final
// assistant message. The expected behavior is that each logical line appears once.
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// The full assistant message (single final message in the fixture)
let full = concat!(
"1. **Summary**\n",
" - Updated the chat composer footer to highlight the remaining context percentage in yellow whenever the available context drops below 20%.\n",
" - Left the indicator dimmed when the remaining context is at normal levels to maintain the existing appearance in non-critical situations.\n\n",
"2. **Testing**\n",
" - `cargo test -p codex-tui`\n\n",
"3. **Next steps**\n",
" - Let me know if you would like me to run `just fix -p codex-tui` to apply the project-specific lint fixes.\n\n",
"**Quick recap:** The context-left indicator now stands out in yellow when less than 20% of the context window remains, helping users notice the low-context condition.\n",
);
// Stream the content in realistic chunks; notably split the second bullet
// so that it crosses delta boundaries (mirrors typical network trickle).
let deltas = [
"1. **Summary**\n",
" - Updated the chat composer footer to highlight the remaining context percentage in yellow whenever the available context drops below 20%.\n",
" - Left the indicator ",
"dimmed when the remaining context is at normal levels to maintain the existing appearance in non-critical situations.\n\n",
"2. **Testing**\n",
" - `cargo test -p codex-tui`\n\n",
"3. **Next steps**\n",
" - Let me know if you would like me to run `just fix -p codex-tui` to apply the project-specific lint fixes.\n\n",
"**Quick recap:** The context-left indicator now stands out in yellow when less than 20% of the context window remains, helping users notice the low-context condition.\n",
];
// Drive streaming with commit ticks.
let mut transcript = String::new();
for d in deltas {
chat.handle_codex_event(Event {
id: "s".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: d.into() }),
});
// Allow at most one line to be committed per tick.
chat.on_commit_tick();
// Drain and accumulate history lines during streaming.
for lines in drain_insert_history(&mut rx) {
transcript.push_str(&lines_to_single_string(&lines));
}
}
// Send the final message, which should flush any tail without duplication.
chat.handle_codex_event(Event {
id: "s".into(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: full.to_string(),
}),
});
// Collect the entire transcript emitted into history.
let cells = drain_insert_history(&mut rx);
for lines in cells {
transcript.push_str(&lines_to_single_string(&lines));
}
// Count occurrences of the specific bullet (case-sensitive prefix makes the check robust).
let needle = "Left the indicator dimmed";
let count = transcript.matches(needle).count();
// Assert once; this is red if current behavior duplicates the line.
assert_eq!(
count, 1,
"expected bullet line to appear once (got {count}) in:\n{transcript}"
);
}
#[test]
fn streaming_preserves_legit_duplicate_lines() {
// Two identical bullet lines are legitimate and must both appear.
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
let deltas = ["- duplicate\n", "- duplicate\n"];
for d in deltas {
chat.handle_codex_event(Event {
id: "dup".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: d.into() }),
});
chat.on_commit_tick();
// drain incremental
let _ = drain_insert_history(&mut rx);
}
// Finalize with empty final message (no new content) to close the stream.
chat.handle_codex_event(Event {
id: "dup".into(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "".to_string(),
}),
});
let cells = drain_insert_history(&mut rx);
let mut transcript = String::new();
for lines in cells {
transcript.push_str(&lines_to_single_string(&lines));
}
let needle = "- duplicate";
let count = transcript.matches(needle).count();
assert_eq!(
count, 2,
"expected both duplicate lines to be present: {transcript}"
);
}
#[test]
fn double_line_repro_fixture_renders_line_once() {
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use pretty_assertions::assert_eq;
// Build a chat widget and channels without spawning the agent.
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Load the user-provided network log JSON and extract the final assistant text.
let file = open_fixture("double-line-repro.json");
let v: serde_json::Value = serde_json::from_reader(file).expect("parse JSON");
let outputs = v
.get("response")
.and_then(|r| r.get("output"))
.and_then(|o| o.as_array())
.expect("response.output array present");
let mut assistant_text = String::new();
for item in outputs {
if item.get("type").and_then(|t| t.as_str()) == Some("message") {
if let Some(content) = item.get("content").and_then(|c| c.as_array()) {
for c in content {
if c.get("type").and_then(|t| t.as_str()) == Some("output_text") {
if let Some(text) = c.get("text").and_then(|t| t.as_str()) {
assistant_text.push_str(text);
}
}
}
}
}
}
assert!(
!assistant_text.is_empty(),
"expected assistant text extracted from fixture"
);
// Inject as a final assistant message (no streaming deltas) to mirror the log.
chat.handle_codex_event(Event {
id: "sub-repro".into(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: assistant_text.clone(),
}),
});
// Drain history insertions and build a single plain-text blob for searching.
let cells = drain_insert_history(&mut rx);
let mut blob = String::new();
for lines in cells {
blob.push_str(&lines_to_single_string(&lines));
}
// The specific line reported as duplicated should appear exactly once.
let needle = "Left the indicator dimmed";
let count = blob.matches(needle).count();
assert_eq!(
count, 1,
"expected line to appear once, found {count} in output:\n{blob}"
);
}
#[test]
fn empty_enter_during_task_does_not_queue() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual();

View File

@@ -65,21 +65,6 @@ impl MarkdownStreamCollector {
{
complete_line_count -= 1;
}
// Heuristic: if the buffer ends with a double newline and the last non-blank
// rendered line looks like a list bullet with inline content (e.g., "- item"),
// defer committing that line. Subsequent context (e.g., another list item)
// can cause the renderer to split the bullet marker and text into separate
// logical lines ("- " then "item"), which would otherwise duplicate content.
if self.buffer.ends_with("\n\n") && complete_line_count > 0 {
let last = &rendered[complete_line_count - 1];
let mut text = String::new();
for s in &last.spans {
text.push_str(&s.content);
}
if text.starts_with("- ") && text.trim() != "-" {
complete_line_count = complete_line_count.saturating_sub(1);
}
}
if !self.buffer.ends_with('\n') {
complete_line_count = complete_line_count.saturating_sub(1);
// If we're inside an unclosed fenced code block, also drop the
@@ -143,6 +128,9 @@ impl MarkdownStreamCollector {
s.push_str(&sp.content);
}
if is_short_plain_word(&s) {
// Advance commit pointer for the safe portion, but withhold the
// single ambiguous short token to avoid duplication on finalize.
self.committed_line_count = complete_line_count.saturating_sub(1);
return Vec::new();
}
}

View File

@@ -115,11 +115,64 @@ impl StreamController {
let mut out_lines: Lines = Vec::new();
{
let state = &mut self.state;
if !remaining.is_empty() {
state.enqueue(remaining);
// Drain queued lines.
let mut step = state.drain_all();
if let Some(first) = step.history.first() {
let mut s = String::new();
for sp in &first.spans {
s.push_str(&sp.content);
}
let key = s.trim_end().to_string();
if state.last_emitted_line_text.as_deref() == Some(&key) {
step.history.remove(0);
}
}
let step = state.drain_all();
out_lines.extend(step.history);
// At the seam between drained and remaining, avoid a single duplicate.
let mut remaining = remaining;
if let (Some(last_out), Some(first_rem)) = (out_lines.last(), remaining.first()) {
let mut a = String::new();
for sp in &last_out.spans {
a.push_str(&sp.content);
}
let mut b = String::new();
for sp in &first_rem.spans {
b.push_str(&sp.content);
}
if a.trim_end() == b.trim_end() {
remaining.remove(0);
}
} else if out_lines.is_empty() {
// No drained lines; compare remaining against last emitted.
if let Some(first_rem) = remaining.first() {
let mut b = String::new();
for sp in &first_rem.spans {
b.push_str(&sp.content);
}
if state
.last_emitted_line_text
.as_deref()
.map(|t| t == b.trim_end())
.unwrap_or(false)
{
remaining.remove(0);
}
}
}
out_lines.extend(remaining);
// Update last emitted tracker for the stream.
if let Some(last) = out_lines.last() {
let mut s = String::new();
for sp in &last.spans {
s.push_str(&sp.content);
}
let key = s.trim_end().to_string();
if !key.is_empty() {
state.last_emitted_line_text = Some(key);
}
}
}
if !out_lines.is_empty() {
// Insert as a HistoryCell so display drops the header while transcript keeps it.
@@ -129,6 +182,10 @@ impl StreamController {
)));
}
// Ensure commit animation (status ticker) is stopped when we flush immediately
// at the end of a stream so the timer does not bleed into the UI.
sink.stop_commit_animation();
// Cleanup
self.state.clear();
// Allow a subsequent block in this turn to emit its header.
@@ -156,12 +213,37 @@ impl StreamController {
if !self.active {
return false;
}
let step = { self.state.step() };
let mut step = { self.state.step() };
if !step.history.is_empty() {
sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new(
step.history,
self.header.maybe_emit_header(),
)));
// Drop a single duplicate at the seam with the previous tick.
if let Some(first) = step.history.first() {
let mut s = String::new();
for sp in &first.spans {
s.push_str(&sp.content);
}
let key = s.trim_end().to_string();
if self.state.last_emitted_line_text.as_deref() == Some(&key) {
step.history.remove(0);
}
}
// Set to last non-empty line's text if present.
for l in step.history.iter().rev() {
let mut s = String::new();
for sp in &l.spans {
s.push_str(&sp.content);
}
let key = s.trim_end().to_string();
if !key.is_empty() {
self.state.last_emitted_line_text = Some(key);
break;
}
}
if !step.history.is_empty() {
sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new(
step.history,
self.header.maybe_emit_header(),
)));
}
}
let is_idle = self.state.is_idle();

View File

@@ -6,6 +6,7 @@ pub(crate) struct StreamState {
pub(crate) collector: MarkdownStreamCollector,
pub(crate) streamer: AnimatedLineStreamer,
pub(crate) has_seen_delta: bool,
pub(crate) last_emitted_line_text: Option<String>,
}
impl StreamState {
@@ -14,12 +15,14 @@ impl StreamState {
collector: MarkdownStreamCollector::new(),
streamer: AnimatedLineStreamer::new(),
has_seen_delta: false,
last_emitted_line_text: None,
}
}
pub(crate) fn clear(&mut self) {
self.collector.clear();
self.streamer.clear();
self.has_seen_delta = false;
self.last_emitted_line_text = None;
}
pub(crate) fn step(&mut self) -> crate::markdown_stream::StepResult {
self.streamer.step()