mirror of
https://github.com/openai/codex.git
synced 2026-05-05 03:47:01 +00:00
Compare commits
6 Commits
codex-wind
...
etraut/tui
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c903ca29c | ||
|
|
dfb4e002eb | ||
|
|
533dbd5b69 | ||
|
|
02d2c8231c | ||
|
|
1c141e5c62 | ||
|
|
03a23b57b9 |
@@ -7374,6 +7374,9 @@ async fn try_run_sampling_request(
|
||||
let mut should_emit_turn_diff = false;
|
||||
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
|
||||
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
|
||||
// Some streams send output text deltas before the assistant message item is announced.
|
||||
// Hold those early bytes so we can replay them once `output_item.added` arrives.
|
||||
let mut pending_output_text_delta = String::new();
|
||||
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
|
||||
let receiving_span = trace_span!("receiving_stream");
|
||||
let outcome: CodexResult<SamplingRequestResult> = loop {
|
||||
@@ -7413,6 +7416,8 @@ async fn try_run_sampling_request(
|
||||
match event {
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
let completed_assistant_message =
|
||||
matches!(&item, ResponseItem::Message { role, .. } if role == "assistant");
|
||||
let previously_active_item = active_item.take();
|
||||
if let Some(previous) = previously_active_item.as_ref()
|
||||
&& matches!(previous, TurnItem::AgentMessage(_))
|
||||
@@ -7470,6 +7475,17 @@ async fn try_run_sampling_request(
|
||||
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
|
||||
.instrument(handle_responses)
|
||||
.await?;
|
||||
if completed_assistant_message && !pending_output_text_delta.is_empty() {
|
||||
// If an assistant message completed while this buffer is still non-empty,
|
||||
// those bytes could not be matched to that message item. Clear them so they
|
||||
// don't leak into a later assistant item, but keep the buffer across
|
||||
// reasoning/tool completions so a delayed message item can still claim it.
|
||||
warn!(
|
||||
buffered_len = pending_output_text_delta.len(),
|
||||
"dropping buffered output text deltas after item completion"
|
||||
);
|
||||
pending_output_text_delta.clear();
|
||||
}
|
||||
if let Some(tool_future) = output_result.tool_future {
|
||||
in_flight.push_back(tool_future);
|
||||
}
|
||||
@@ -7495,26 +7511,46 @@ async fn try_run_sampling_request(
|
||||
.await
|
||||
{
|
||||
let mut turn_item = turn_item;
|
||||
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
|
||||
let mut seeded_item_id: Option<String> = None;
|
||||
if matches!(turn_item, TurnItem::AgentMessage(_))
|
||||
&& let Some(raw_text) = raw_assistant_output_text_from_item(&item)
|
||||
{
|
||||
let mut seeded_emit: Option<(String, ParsedAssistantTextDelta)> = None;
|
||||
let mut pending_emit: Option<(String, ParsedAssistantTextDelta)> = None;
|
||||
let raw_output_text = matches!(turn_item, TurnItem::AgentMessage(_))
|
||||
.then(|| raw_assistant_output_text_from_item(&item))
|
||||
.flatten();
|
||||
if matches!(turn_item, TurnItem::AgentMessage(_)) {
|
||||
let item_id = turn_item.id();
|
||||
let mut seeded =
|
||||
assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text);
|
||||
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
|
||||
agent_message.content =
|
||||
vec![codex_protocol::items::AgentMessageContent::Text {
|
||||
text: if plan_mode {
|
||||
String::new()
|
||||
} else {
|
||||
std::mem::take(&mut seeded.visible_text)
|
||||
},
|
||||
}];
|
||||
if let Some(raw_text) = raw_output_text.as_deref() {
|
||||
let mut seeded =
|
||||
assistant_message_stream_parsers.seed_item_text(&item_id, raw_text);
|
||||
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
|
||||
agent_message.content =
|
||||
vec![codex_protocol::items::AgentMessageContent::Text {
|
||||
text: if plan_mode {
|
||||
String::new()
|
||||
} else {
|
||||
std::mem::take(&mut seeded.visible_text)
|
||||
},
|
||||
}];
|
||||
}
|
||||
seeded_emit = plan_mode.then_some((item_id.clone(), seeded));
|
||||
}
|
||||
if !pending_output_text_delta.is_empty() {
|
||||
let mut pending_delta = std::mem::take(&mut pending_output_text_delta);
|
||||
if let Some(raw_text) = raw_output_text.as_deref() {
|
||||
// The item payload may already contain all or part of the text we
|
||||
// buffered from pre-item deltas. Trim the overlap so replayed
|
||||
// deltas don't duplicate visible output.
|
||||
if pending_delta.starts_with(raw_text) {
|
||||
pending_delta.drain(..raw_text.len());
|
||||
} else if raw_text.starts_with(&pending_delta) {
|
||||
pending_delta.clear();
|
||||
}
|
||||
}
|
||||
if !pending_delta.is_empty() {
|
||||
let parsed = assistant_message_stream_parsers
|
||||
.parse_delta(&item_id, &pending_delta);
|
||||
pending_emit = Some((item_id, parsed));
|
||||
}
|
||||
}
|
||||
seeded_parsed = plan_mode.then_some(seeded);
|
||||
seeded_item_id = Some(item_id);
|
||||
}
|
||||
if let Some(state) = plan_mode_state.as_mut()
|
||||
&& matches!(turn_item, TurnItem::AgentMessage(_))
|
||||
@@ -7526,16 +7562,22 @@ async fn try_run_sampling_request(
|
||||
} else {
|
||||
sess.emit_turn_item_started(&turn_context, &turn_item).await;
|
||||
}
|
||||
if let (Some(state), Some(item_id), Some(parsed)) = (
|
||||
plan_mode_state.as_mut(),
|
||||
seeded_item_id.as_deref(),
|
||||
seeded_parsed,
|
||||
) {
|
||||
if let Some((item_id, parsed)) = seeded_emit {
|
||||
emit_streamed_assistant_text_delta(
|
||||
&sess,
|
||||
&turn_context,
|
||||
Some(state),
|
||||
item_id,
|
||||
plan_mode_state.as_mut(),
|
||||
&item_id,
|
||||
parsed,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
if let Some((item_id, parsed)) = pending_emit {
|
||||
emit_streamed_assistant_text_delta(
|
||||
&sess,
|
||||
&turn_context,
|
||||
plan_mode_state.as_mut(),
|
||||
&item_id,
|
||||
parsed,
|
||||
)
|
||||
.await;
|
||||
@@ -7612,7 +7654,12 @@ async fn try_run_sampling_request(
|
||||
.await;
|
||||
}
|
||||
} else {
|
||||
error_or_panic("OutputTextDelta without active item".to_string());
|
||||
// This event should normally follow `output_item.added`, but tolerate the
|
||||
// reversed ordering by buffering text until the item metadata arrives.
|
||||
if pending_output_text_delta.is_empty() {
|
||||
warn!("buffering OutputTextDelta without active item");
|
||||
}
|
||||
pending_output_text_delta.push_str(&delta);
|
||||
}
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta {
|
||||
@@ -7630,7 +7677,12 @@ async fn try_run_sampling_request(
|
||||
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
|
||||
.await;
|
||||
} else {
|
||||
error_or_panic("ReasoningSummaryDelta without active item".to_string());
|
||||
// Without an active reasoning item there is nowhere safe to attach this delta.
|
||||
// Drop the orphan event and rely on the eventual completed item snapshot.
|
||||
warn!(
|
||||
summary_index,
|
||||
"dropping ReasoningSummaryDelta without active item"
|
||||
);
|
||||
}
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
|
||||
@@ -7642,7 +7694,12 @@ async fn try_run_sampling_request(
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
} else {
|
||||
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
|
||||
// Keep section-break handling consistent with summary deltas: a break without
|
||||
// an active reasoning item is orphaned stream state, not a fatal parser error.
|
||||
warn!(
|
||||
summary_index,
|
||||
"dropping ReasoningSummaryPartAdded without active item"
|
||||
);
|
||||
}
|
||||
}
|
||||
ResponseEvent::ReasoningContentDelta {
|
||||
@@ -7660,7 +7717,12 @@ async fn try_run_sampling_request(
|
||||
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
|
||||
.await;
|
||||
} else {
|
||||
error_or_panic("ReasoningRawContentDelta without active item".to_string());
|
||||
// Raw reasoning deltas can also arrive before their item in malformed or
|
||||
// reordered streams. Preserve liveness by dropping the orphan event.
|
||||
warn!(
|
||||
content_index,
|
||||
"dropping ReasoningRawContentDelta without active item"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -719,6 +719,13 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ev_reasoning_summary_part_added(summary_index: i64) -> Value {
|
||||
serde_json::json!({
|
||||
"type": "response.reasoning_summary_part.added",
|
||||
"summary_index": summary_index,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ev_reasoning_text_delta(delta: &str) -> Value {
|
||||
serde_json::json!({
|
||||
"type": "response.reasoning_text.delta",
|
||||
|
||||
@@ -21,6 +21,7 @@ use core_test_support::responses::ev_message_item_added;
|
||||
use core_test_support::responses::ev_output_text_delta;
|
||||
use core_test_support::responses::ev_reasoning_item;
|
||||
use core_test_support::responses::ev_reasoning_item_added;
|
||||
use core_test_support::responses::ev_reasoning_summary_part_added;
|
||||
use core_test_support::responses::ev_reasoning_summary_text_delta;
|
||||
use core_test_support::responses::ev_reasoning_text_delta;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
@@ -1061,6 +1062,186 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn output_text_delta_before_output_item_added_is_buffered() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let TestCodex {
|
||||
codex,
|
||||
session_configured,
|
||||
..
|
||||
} = test_codex().build(&server).await?;
|
||||
|
||||
let stream = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_output_text_delta("Hello "),
|
||||
ev_message_item_added("msg-1", ""),
|
||||
ev_output_text_delta("world"),
|
||||
ev_assistant_message("msg-1", "Hello world"),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
mount_sse_once(&server, stream).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: std::env::current_dir()?,
|
||||
approval_policy: codex_protocol::protocol::AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
|
||||
model: session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut agent_deltas = Vec::new();
|
||||
let mut completed = None;
|
||||
loop {
|
||||
match wait_for_event(&codex, |_| true).await {
|
||||
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
|
||||
EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
item: TurnItem::AgentMessage(item),
|
||||
..
|
||||
}) => completed = Some(item),
|
||||
EventMsg::TurnComplete(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(agent_deltas.concat(), "Hello world");
|
||||
let completed_text: String = completed
|
||||
.expect("assistant item completion should be emitted")
|
||||
.content
|
||||
.iter()
|
||||
.map(|entry| match entry {
|
||||
AgentMessageContent::Text { text } => text.as_str(),
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(completed_text, "Hello world");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn buffered_output_text_survives_intervening_reasoning_item_done() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let TestCodex {
|
||||
codex,
|
||||
session_configured,
|
||||
..
|
||||
} = test_codex().build(&server).await?;
|
||||
|
||||
let stream = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_output_text_delta("Hello "),
|
||||
ev_reasoning_item("reasoning-1", &["thinking"], &[]),
|
||||
ev_message_item_added("msg-1", ""),
|
||||
ev_output_text_delta("world"),
|
||||
ev_assistant_message("msg-1", "Hello world"),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
mount_sse_once(&server, stream).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: std::env::current_dir()?,
|
||||
approval_policy: codex_protocol::protocol::AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
|
||||
model: session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut agent_deltas = Vec::new();
|
||||
let mut completed = None;
|
||||
loop {
|
||||
match wait_for_event(&codex, |_| true).await {
|
||||
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
|
||||
EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
item: TurnItem::AgentMessage(item),
|
||||
..
|
||||
}) => completed = Some(item),
|
||||
EventMsg::TurnComplete(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(agent_deltas.concat(), "Hello world");
|
||||
let completed_text: String = completed
|
||||
.expect("assistant item completion should be emitted")
|
||||
.content
|
||||
.iter()
|
||||
.map(|entry| match entry {
|
||||
AgentMessageContent::Text { text } => text.as_str(),
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(completed_text, "Hello world");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn orphan_reasoning_summary_events_do_not_break_completed_reasoning_item()
|
||||
-> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let TestCodex { codex, .. } = test_codex().build(&server).await?;
|
||||
|
||||
let stream = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_reasoning_summary_part_added(/*summary_index*/ 0),
|
||||
ev_reasoning_summary_text_delta("Summary only"),
|
||||
ev_reasoning_item("reasoning-1", &["Summary only"], &[]),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
mount_sse_once(&server, stream).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "think".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let completed = wait_for_event_match(&codex, |ev| match ev {
|
||||
EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
item: TurnItem::Reasoning(item),
|
||||
..
|
||||
}) => Some(item.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_eq!(completed.summary_text, vec!["Summary only".to_string()]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -6016,7 +6016,16 @@ impl ChatWidget {
|
||||
ThreadItem::Reasoning {
|
||||
summary, content, ..
|
||||
} => {
|
||||
if from_replay {
|
||||
let mut completed_reasoning = summary.join("\n\n");
|
||||
if self.config.show_raw_agent_reasoning {
|
||||
if !completed_reasoning.is_empty() && !content.is_empty() {
|
||||
completed_reasoning.push_str("\n\n");
|
||||
}
|
||||
completed_reasoning.push_str(&content.join("\n\n"));
|
||||
}
|
||||
let streamed_reasoning =
|
||||
format!("{}{}", self.full_reasoning_buffer, self.reasoning_buffer);
|
||||
if from_replay || streamed_reasoning.trim().is_empty() {
|
||||
for delta in summary {
|
||||
self.on_agent_reasoning_delta(delta);
|
||||
}
|
||||
@@ -6025,6 +6034,17 @@ impl ChatWidget {
|
||||
self.on_agent_reasoning_delta(delta);
|
||||
}
|
||||
}
|
||||
} else if !completed_reasoning.is_empty()
|
||||
&& completed_reasoning != streamed_reasoning
|
||||
&& let Some(missing_prefix) = completed_reasoning
|
||||
.strip_suffix(&streamed_reasoning)
|
||||
.filter(|missing_prefix| !missing_prefix.is_empty())
|
||||
{
|
||||
// If early summary deltas were dropped before the item became active, prepend
|
||||
// the missing prefix from the completed item snapshot and keep the streamed
|
||||
// suffix so we still avoid double-rendering.
|
||||
self.full_reasoning_buffer =
|
||||
format!("{missing_prefix}{}", self.full_reasoning_buffer);
|
||||
}
|
||||
self.on_agent_reasoning_final();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests/history_replay.rs
|
||||
expression: rendered
|
||||
---
|
||||
• prefix middle
|
||||
|
||||
tail
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests/history_replay.rs
|
||||
expression: rendered
|
||||
---
|
||||
• prefix suffix
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests/history_replay.rs
|
||||
expression: rendered
|
||||
---
|
||||
• Summary only
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -725,6 +726,183 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() {
|
||||
assert_eq!(rendered.matches("Summary only").count(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_reasoning_item_completed_renders_summary_without_prior_delta() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
chat.show_welcome_banner = false;
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::TurnStarted(TurnStartedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: AppServerTurn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ItemCompleted(ItemCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: AppServerThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["Summary only".to_string()],
|
||||
content: Vec::new(),
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
let rendered = match rx.try_recv() {
|
||||
Ok(AppEvent::InsertHistoryCell(cell)) => {
|
||||
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
|
||||
}
|
||||
other => panic!("expected InsertHistoryCell, got {other:?}"),
|
||||
};
|
||||
assert!(rendered.contains("Summary only"));
|
||||
assert_chatwidget_snapshot!(
|
||||
"live_reasoning_item_completed_renders_summary_without_prior_delta",
|
||||
rendered
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
chat.show_welcome_banner = false;
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::TurnStarted(TurnStartedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: AppServerTurn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "reasoning-1".to_string(),
|
||||
delta: "suffix".to_string(),
|
||||
summary_index: 0,
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ItemCompleted(ItemCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: AppServerThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["prefix suffix".to_string()],
|
||||
content: Vec::new(),
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
let rendered = match rx.try_recv() {
|
||||
Ok(AppEvent::InsertHistoryCell(cell)) => {
|
||||
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
|
||||
}
|
||||
other => panic!("expected InsertHistoryCell, got {other:?}"),
|
||||
};
|
||||
assert_eq!(rendered.matches("prefix suffix").count(), 1);
|
||||
assert_chatwidget_snapshot!(
|
||||
"live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix",
|
||||
rendered
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
chat.show_welcome_banner = false;
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::TurnStarted(TurnStartedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: AppServerTurn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "reasoning-1".to_string(),
|
||||
delta: "middle".to_string(),
|
||||
summary_index: 0,
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ReasoningSummaryPartAdded(ReasoningSummaryPartAddedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "reasoning-1".to_string(),
|
||||
summary_index: 1,
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "reasoning-1".to_string(),
|
||||
delta: "tail".to_string(),
|
||||
summary_index: 1,
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
chat.handle_server_notification(
|
||||
ServerNotification::ItemCompleted(ItemCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: AppServerThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["prefix middle".to_string(), "tail".to_string()],
|
||||
content: Vec::new(),
|
||||
},
|
||||
}),
|
||||
/*replay_kind*/ None,
|
||||
);
|
||||
|
||||
let rendered = match rx.try_recv() {
|
||||
Ok(AppEvent::InsertHistoryCell(cell)) => {
|
||||
lines_to_single_string(&cell.transcript_lines(/*width*/ 80))
|
||||
}
|
||||
other => panic!("expected InsertHistoryCell, got {other:?}"),
|
||||
};
|
||||
assert_eq!(rendered.matches("prefix middle").count(), 1);
|
||||
assert!(rendered.contains("tail"));
|
||||
assert_chatwidget_snapshot!(
|
||||
"live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks",
|
||||
rendered
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replayed_turn_started_does_not_mark_task_running() {
|
||||
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
Reference in New Issue
Block a user