mirror of
https://github.com/openai/codex.git
synced 2026-05-11 23:02:39 +00:00
Compare commits
1 Commits
dh--app-se
...
fcoury/str
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63c9a22e19 |
@@ -206,30 +206,6 @@ impl App {
|
||||
AppEvent::EndInitialHistoryReplayBuffer => {
|
||||
self.finish_initial_history_replay_buffer(tui);
|
||||
}
|
||||
AppEvent::ConsolidateAgentMessage { source, cwd } => {
|
||||
if !self.terminal_resize_reflow_enabled() {
|
||||
self.transcript_reflow.clear();
|
||||
return Ok(AppRunControl::Continue);
|
||||
}
|
||||
let end = self.transcript_cells.len();
|
||||
let start =
|
||||
trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells);
|
||||
if start < end {
|
||||
let consolidated: Arc<dyn HistoryCell> =
|
||||
Arc::new(history_cell::AgentMarkdownCell::new(source, &cwd));
|
||||
self.transcript_cells
|
||||
.splice(start..end, std::iter::once(consolidated.clone()));
|
||||
|
||||
if let Some(Overlay::Transcript(t)) = &mut self.overlay {
|
||||
t.consolidate_cells(start..end, consolidated.clone());
|
||||
tui.frame_requester().schedule_frame();
|
||||
}
|
||||
|
||||
self.maybe_finish_stream_reflow(tui)?;
|
||||
} else {
|
||||
self.maybe_finish_stream_reflow(tui)?;
|
||||
}
|
||||
}
|
||||
AppEvent::ConsolidateProposedPlan(source) => {
|
||||
if !self.terminal_resize_reflow_enabled() {
|
||||
self.transcript_reflow.clear();
|
||||
|
||||
@@ -355,9 +355,8 @@ impl App {
|
||||
|
||||
self.transcript_reflow.clear_pending_reflow();
|
||||
|
||||
// Track that a reflow happened during an active stream or while trailing
|
||||
// unconsolidated AgentMessageCells are still pending consolidation so
|
||||
// ConsolidateAgentMessage can schedule a follow-up reflow.
|
||||
// Track that a reflow happened during a stream with transient cells so stream
|
||||
// consolidation can schedule a follow-up reflow.
|
||||
let reflow_ran_during_stream =
|
||||
!self.transcript_cells.is_empty() && self.should_mark_reflow_as_stream_time();
|
||||
|
||||
@@ -468,14 +467,11 @@ impl App {
|
||||
|
||||
/// Return whether current transcript state should be treated as stream-time resize state.
|
||||
///
|
||||
/// The active stream controllers cover normal streaming. The trailing-cell checks cover the
|
||||
/// narrow window after a controller has stopped but before the app has processed the
|
||||
/// consolidation event that replaces transient stream cells with source-backed cells.
|
||||
/// The plan stream controller covers normal plan streaming. The trailing-cell check covers the
|
||||
/// narrow window after a plan controller has stopped but before the app has processed the
|
||||
/// consolidation event that replaces transient plan stream cells with source-backed cells.
|
||||
pub(super) fn should_mark_reflow_as_stream_time(&self) -> bool {
|
||||
self.chat_widget.has_active_agent_stream()
|
||||
|| self.chat_widget.has_active_plan_stream()
|
||||
|| trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells)
|
||||
< self.transcript_cells.len()
|
||||
self.chat_widget.has_active_plan_stream()
|
||||
|| trailing_run_start::<history_cell::ProposedPlanStreamCell>(&self.transcript_cells)
|
||||
< self.transcript_cells.len()
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::chatwidget::tests::make_chatwidget_manual_with_sender;
|
||||
use crate::chatwidget::tests::set_chatgpt_auth;
|
||||
use crate::chatwidget::tests::set_fast_mode_test_catalog;
|
||||
use crate::file_search::FileSearchManager;
|
||||
use crate::history_cell::AgentMarkdownCell;
|
||||
use crate::history_cell::AgentMessageCell;
|
||||
use crate::history_cell::HistoryCell;
|
||||
use crate::history_cell::PlainHistoryCell;
|
||||
@@ -68,14 +69,19 @@ use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Settings;
|
||||
use codex_protocol::items::AgentMessageContent;
|
||||
use codex_protocol::items::AgentMessageItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile;
|
||||
use codex_protocol::models::FileSystemPermissions;
|
||||
use codex_protocol::models::NetworkPermissions;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::protocol::AgentMessageDeltaEvent;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::FileChange;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::NetworkApprovalContext;
|
||||
use codex_protocol::protocol::NetworkApprovalProtocol;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -3961,6 +3967,69 @@ async fn height_shrink_schedules_resize_reflow() {
|
||||
assert!(app.transcript_reflow.has_pending_reflow());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn finalized_assistant_stream_is_source_backed_for_resize_reflow() {
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
enable_terminal_resize_reflow(&mut app);
|
||||
|
||||
app.chat_widget.handle_codex_event(Event {
|
||||
id: "delta-1".to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
|
||||
delta: "draft text".to_string(),
|
||||
}),
|
||||
});
|
||||
assert!(
|
||||
app_event_rx.try_recv().is_err(),
|
||||
"assistant deltas should stay in the active cell until completion"
|
||||
);
|
||||
|
||||
app.chat_widget.handle_codex_event(Event {
|
||||
id: "item-1".to_string(),
|
||||
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
thread_id: ThreadId::new(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: TurnItem::AgentMessage(AgentMessageItem {
|
||||
id: "assistant-1".to_string(),
|
||||
content: vec![AgentMessageContent::Text {
|
||||
text: "final source backed assistant text that wraps differently".to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}),
|
||||
}),
|
||||
});
|
||||
|
||||
let cell = match app_event_rx
|
||||
.try_recv()
|
||||
.expect("expected finalized assistant history")
|
||||
{
|
||||
AppEvent::InsertHistoryCell(cell) => cell,
|
||||
other => panic!("expected InsertHistoryCell, got {other:?}"),
|
||||
};
|
||||
let cell: Arc<dyn HistoryCell> = cell.into();
|
||||
assert!(
|
||||
cell.as_any().is::<AgentMarkdownCell>(),
|
||||
"finalized assistant stream should insert a source-backed cell"
|
||||
);
|
||||
app.transcript_cells.push(cell);
|
||||
|
||||
let wide = app.render_transcript_lines_for_reflow(/*width*/ 100);
|
||||
let narrow = app.render_transcript_lines_for_reflow(/*width*/ 28);
|
||||
assert!(
|
||||
narrow.lines.len() > wide.lines.len(),
|
||||
"source-backed transcript cell should re-render at resize width"
|
||||
);
|
||||
let narrow_text = narrow
|
||||
.lines
|
||||
.iter()
|
||||
.map(rendered_line_text)
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
let normalized_narrow_text = narrow_text.split_whitespace().collect::<Vec<_>>().join(" ");
|
||||
assert!(normalized_narrow_text.contains("final source backed assistant"));
|
||||
assert!(!normalized_narrow_text.contains("draft text"));
|
||||
}
|
||||
|
||||
fn test_turn(turn_id: &str, status: TurnStatus, items: Vec<ThreadItem>) -> Turn {
|
||||
Turn {
|
||||
id: turn_id.to_string(),
|
||||
|
||||
@@ -427,19 +427,6 @@ pub(crate) enum AppEvent {
|
||||
/// Finish buffering initial resume replay after all replay events have been queued.
|
||||
EndInitialHistoryReplayBuffer,
|
||||
|
||||
/// Replace the contiguous run of streaming `AgentMessageCell`s at the end of
|
||||
/// the transcript with a single `AgentMarkdownCell` that stores the raw
|
||||
/// markdown source and re-renders from it on resize.
|
||||
///
|
||||
/// Emitted by `ChatWidget::flush_answer_stream_with_separator` after stream
|
||||
/// finalization. The `App` handler walks backward through `transcript_cells`
|
||||
/// to find the `AgentMessageCell` run and splices in the consolidated cell.
|
||||
/// The `cwd` keeps local file-link display stable across the final re-render.
|
||||
ConsolidateAgentMessage {
|
||||
source: String,
|
||||
cwd: PathBuf,
|
||||
},
|
||||
|
||||
/// Replace the contiguous run of streaming `ProposedPlanStreamCell`s at the
|
||||
/// end of the transcript with a single source-backed `ProposedPlanCell`.
|
||||
///
|
||||
|
||||
@@ -412,7 +412,7 @@ use crate::streaming::chunking::AdaptiveChunkingPolicy;
|
||||
use crate::streaming::commit_tick::CommitTickScope;
|
||||
use crate::streaming::commit_tick::run_commit_tick;
|
||||
use crate::streaming::controller::PlanStreamController;
|
||||
use crate::streaming::controller::StreamController;
|
||||
use crate::streaming::controller::SourceBackedStreamController;
|
||||
|
||||
use chrono::Local;
|
||||
use codex_file_search::FileMatch;
|
||||
@@ -833,8 +833,8 @@ pub(crate) struct ChatWidget {
|
||||
rate_limit_switch_prompt: RateLimitSwitchPromptState,
|
||||
add_credits_nudge_email_in_flight: Option<AddCreditsNudgeCreditType>,
|
||||
adaptive_chunking: AdaptiveChunkingPolicy,
|
||||
// Stream lifecycle controller
|
||||
stream_controller: Option<StreamController>,
|
||||
// Source-backed lifecycle controller for assistant output.
|
||||
stream_controller: Option<SourceBackedStreamController>,
|
||||
// Stream lifecycle controller for proposed plan output.
|
||||
plan_stream_controller: Option<PlanStreamController>,
|
||||
/// Holds the platform clipboard lease so copied text remains available while supported.
|
||||
@@ -2038,32 +2038,18 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
fn flush_answer_stream_with_separator(&mut self) {
|
||||
let had_stream_controller = self.stream_controller.is_some();
|
||||
if let Some(mut controller) = self.stream_controller.take() {
|
||||
let (cell, source) = controller.finalize();
|
||||
if let Some(cell) = cell {
|
||||
if let Some(controller) = self.stream_controller.take() {
|
||||
self.active_cell = None;
|
||||
self.bump_active_cell_revision();
|
||||
if let Some(cell) = controller.finalize() {
|
||||
self.add_boxed_history(cell);
|
||||
}
|
||||
// Consolidate the run of streaming AgentMessageCells into a single AgentMarkdownCell
|
||||
// that can re-render from source on resize.
|
||||
if let Some(source) = source {
|
||||
self.app_event_tx.send(AppEvent::ConsolidateAgentMessage {
|
||||
source,
|
||||
cwd: self.config.cwd.to_path_buf(),
|
||||
});
|
||||
}
|
||||
}
|
||||
self.adaptive_chunking.reset();
|
||||
if had_stream_controller && self.stream_controllers_idle() {
|
||||
self.app_event_tx.send(AppEvent::StopCommitAnimation);
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_controllers_idle(&self) -> bool {
|
||||
self.stream_controller
|
||||
.as_ref()
|
||||
.map(|controller| controller.queued_lines() == 0)
|
||||
.unwrap_or(true)
|
||||
self.stream_controller.is_none()
|
||||
&& self
|
||||
.plan_stream_controller
|
||||
.as_ref()
|
||||
@@ -2594,13 +2580,14 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
fn finalize_completed_assistant_message(&mut self, message: Option<&str>) {
|
||||
// If we have a stream_controller, the finalized message payload is redundant because the
|
||||
// visible content has already been accumulated through deltas.
|
||||
if self.stream_controller.is_none()
|
||||
&& let Some(message) = message
|
||||
&& !message.is_empty()
|
||||
{
|
||||
self.handle_streaming_delta(message.to_string());
|
||||
if let Some(message) = message.filter(|message| !message.is_empty()) {
|
||||
if let Some(controller) = self.stream_controller.as_mut() {
|
||||
controller.set_markdown(message.to_string());
|
||||
self.active_cell = Some(Box::new(controller.active_cell()));
|
||||
self.bump_active_cell_revision();
|
||||
} else {
|
||||
self.handle_streaming_delta(message.to_string());
|
||||
}
|
||||
}
|
||||
self.flush_answer_stream_with_separator();
|
||||
self.handle_stream_finished();
|
||||
@@ -4757,7 +4744,6 @@ impl ChatWidget {
|
||||
let now = Instant::now();
|
||||
let outcome = run_commit_tick(
|
||||
&mut self.adaptive_chunking,
|
||||
self.stream_controller.as_mut(),
|
||||
self.plan_stream_controller.as_mut(),
|
||||
scope,
|
||||
now,
|
||||
@@ -4810,11 +4796,11 @@ impl ChatWidget {
|
||||
|
||||
#[inline]
|
||||
fn handle_streaming_delta(&mut self, delta: String) {
|
||||
// Before streaming agent content, flush any active exec cell group.
|
||||
self.flush_unified_exec_wait_streak();
|
||||
self.flush_active_cell();
|
||||
|
||||
if self.stream_controller.is_none() {
|
||||
// Before streaming agent content, flush any active exec cell group.
|
||||
self.flush_unified_exec_wait_streak();
|
||||
self.flush_active_cell();
|
||||
|
||||
// If the previous turn inserted non-stream history (exec output, patch status, MCP
|
||||
// calls), render a separator before starting the next streamed assistant message.
|
||||
if self.needs_final_message_separator && self.had_work_activity {
|
||||
@@ -4826,17 +4812,14 @@ impl ChatWidget {
|
||||
// Reset the flag even if we don't show separator (no work was done)
|
||||
self.needs_final_message_separator = false;
|
||||
}
|
||||
self.stream_controller = Some(StreamController::new(
|
||||
self.current_stream_width(/*reserved_cols*/ 2),
|
||||
&self.config.cwd,
|
||||
));
|
||||
self.stream_controller = Some(SourceBackedStreamController::new(&self.config.cwd));
|
||||
}
|
||||
if let Some(controller) = self.stream_controller.as_mut()
|
||||
&& controller.push(&delta)
|
||||
{
|
||||
self.app_event_tx.send(AppEvent::StartCommitAnimation);
|
||||
self.run_catch_up_commit_tick();
|
||||
if let Some(controller) = self.stream_controller.as_mut() {
|
||||
controller.push(&delta);
|
||||
self.active_cell = Some(Box::new(controller.active_cell()));
|
||||
self.bump_active_cell_revision();
|
||||
}
|
||||
self.bottom_pane.hide_status_indicator();
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
@@ -11365,11 +11348,7 @@ impl ChatWidget {
|
||||
pub(crate) fn on_terminal_resize(&mut self, width: u16) {
|
||||
let had_rendered_width = self.last_rendered_width.get().is_some();
|
||||
self.last_rendered_width.set(Some(width as usize));
|
||||
let stream_width = self.current_stream_width(/*reserved_cols*/ 2);
|
||||
let plan_stream_width = self.current_stream_width(/*reserved_cols*/ 4);
|
||||
if let Some(controller) = self.stream_controller.as_mut() {
|
||||
controller.set_width(stream_width);
|
||||
}
|
||||
if let Some(controller) = self.plan_stream_controller.as_mut() {
|
||||
controller.set_width(plan_stream_width);
|
||||
}
|
||||
@@ -11378,11 +11357,6 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether an agent message stream is active (not a plan stream).
|
||||
pub(crate) fn has_active_agent_stream(&self) -> bool {
|
||||
self.stream_controller.is_some()
|
||||
}
|
||||
|
||||
/// Whether a proposed-plan stream is active.
|
||||
pub(crate) fn has_active_plan_stream(&self) -> bool {
|
||||
self.plan_stream_controller.is_some()
|
||||
@@ -11538,9 +11512,6 @@ impl ChatWidget {
|
||||
if matches!(op.view(), crate::app_command::AppCommandView::Interrupt)
|
||||
&& self.agent_turn_running
|
||||
{
|
||||
if let Some(controller) = self.stream_controller.as_mut() {
|
||||
controller.clear_queue();
|
||||
}
|
||||
if let Some(controller) = self.plan_stream_controller.as_mut() {
|
||||
controller.clear_queue();
|
||||
}
|
||||
|
||||
@@ -2987,6 +2987,56 @@ printf 'fenced within fenced\n'
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn assistant_delta_updates_source_backed_active_cell_without_history_insert() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "delta-1".into(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
|
||||
delta: "This is a long streamed paragraph that should reflow from markdown source while it is still active.".to_string(),
|
||||
}),
|
||||
});
|
||||
|
||||
assert!(
|
||||
drain_insert_history(&mut rx).is_empty(),
|
||||
"streaming deltas should render through the active cell until completion"
|
||||
);
|
||||
let narrow = chat
|
||||
.active_cell_transcript_lines(/*width*/ 28)
|
||||
.expect("active stream should have transcript lines");
|
||||
let wide = chat
|
||||
.active_cell_transcript_lines(/*width*/ 100)
|
||||
.expect("active stream should have transcript lines");
|
||||
assert!(
|
||||
narrow.len() > wide.len(),
|
||||
"active stream should re-render from source at the requested width"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn completed_assistant_item_replaces_streamed_draft_before_history_flush() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "delta-1".into(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
|
||||
delta: "draft text".to_string(),
|
||||
}),
|
||||
});
|
||||
complete_assistant_message(
|
||||
&mut chat,
|
||||
"msg-1",
|
||||
"final **markdown** text",
|
||||
/*phase*/ None,
|
||||
);
|
||||
|
||||
let rendered = lines_to_single_string(&drain_insert_history(&mut rx).concat());
|
||||
assert!(rendered.contains("final markdown text"));
|
||||
assert!(!rendered.contains("draft text"));
|
||||
assert!(chat.active_cell_transcript_lines(/*width*/ 80).is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chatwidget_tall() {
|
||||
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
@@ -455,11 +455,13 @@ impl HistoryCell for ReasoningSummaryCell {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[cfg(test)]
|
||||
pub(crate) struct AgentMessageCell {
|
||||
lines: Vec<Line<'static>>,
|
||||
is_first_line: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl AgentMessageCell {
|
||||
pub(crate) fn new(lines: Vec<Line<'static>>, is_first_line: bool) -> Self {
|
||||
Self {
|
||||
@@ -469,6 +471,7 @@ impl AgentMessageCell {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl HistoryCell for AgentMessageCell {
|
||||
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
adaptive_wrap_lines(
|
||||
@@ -490,10 +493,8 @@ impl HistoryCell for AgentMessageCell {
|
||||
|
||||
/// A consolidated agent message cell that stores raw markdown source and re-renders from it.
|
||||
///
|
||||
/// After a stream finalizes, the `ConsolidateAgentMessage` handler in `App`
|
||||
/// replaces the contiguous run of `AgentMessageCell`s with a single
|
||||
/// `AgentMarkdownCell`. On terminal resize, `display_lines(width)` re-renders
|
||||
/// from source via `append_markdown`.
|
||||
/// The chat widget uses this both while assistant output is live and after the stream finalizes.
|
||||
/// On terminal resize, `display_lines(width)` re-renders from source via `append_markdown`.
|
||||
///
|
||||
/// The cell snapshots `cwd` at construction so local file-link display remains aligned with the
|
||||
/// session that produced the message. Reusing the current process cwd during reflow would make old
|
||||
@@ -5156,80 +5157,4 @@ mod tests {
|
||||
"word_wrap_lines should not alter lines that already fit within width"
|
||||
);
|
||||
}
|
||||
|
||||
/// Simulate the consolidation backward-walk logic from `App::handle_event`
|
||||
/// to verify it correctly identifies and replaces `AgentMessageCell` runs.
|
||||
#[test]
|
||||
fn consolidation_walker_replaces_agent_message_cells() {
|
||||
use std::sync::Arc;
|
||||
|
||||
// Build a transcript with: [UserCell, AgentMsg(head), AgentMsg(cont), AgentMsg(cont)]
|
||||
let user = Arc::new(UserHistoryCell {
|
||||
message: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
local_image_paths: Vec::new(),
|
||||
remote_image_urls: Vec::new(),
|
||||
}) as Arc<dyn HistoryCell>;
|
||||
let head = Arc::new(AgentMessageCell::new(
|
||||
vec![Line::from("line 1")],
|
||||
/*is_first_line*/ true,
|
||||
)) as Arc<dyn HistoryCell>;
|
||||
let cont1 = Arc::new(AgentMessageCell::new(
|
||||
vec![Line::from("line 2")],
|
||||
/*is_first_line*/ false,
|
||||
)) as Arc<dyn HistoryCell>;
|
||||
let cont2 = Arc::new(AgentMessageCell::new(
|
||||
vec![Line::from("line 3")],
|
||||
/*is_first_line*/ false,
|
||||
)) as Arc<dyn HistoryCell>;
|
||||
|
||||
let mut transcript_cells: Vec<Arc<dyn HistoryCell>> =
|
||||
vec![user.clone(), head, cont1, cont2];
|
||||
|
||||
// Run the same consolidation logic as the handler.
|
||||
let source = "line 1\nline 2\nline 3\n".to_string();
|
||||
let end = transcript_cells.len();
|
||||
let mut start = end;
|
||||
while start > 0
|
||||
&& transcript_cells[start - 1].is_stream_continuation()
|
||||
&& transcript_cells[start - 1]
|
||||
.as_any()
|
||||
.is::<AgentMessageCell>()
|
||||
{
|
||||
start -= 1;
|
||||
}
|
||||
if start > 0
|
||||
&& transcript_cells[start - 1]
|
||||
.as_any()
|
||||
.is::<AgentMessageCell>()
|
||||
&& !transcript_cells[start - 1].is_stream_continuation()
|
||||
{
|
||||
start -= 1;
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
start, 1,
|
||||
"should find all 3 agent cells starting at index 1"
|
||||
);
|
||||
assert_eq!(end, 4);
|
||||
|
||||
// Splice.
|
||||
let consolidated: Arc<dyn HistoryCell> =
|
||||
Arc::new(AgentMarkdownCell::new(source, &test_cwd()));
|
||||
transcript_cells.splice(start..end, std::iter::once(consolidated));
|
||||
|
||||
assert_eq!(transcript_cells.len(), 2, "should be [user, consolidated]");
|
||||
|
||||
// Verify first cell is still the user cell.
|
||||
assert!(
|
||||
transcript_cells[0].as_any().is::<UserHistoryCell>(),
|
||||
"first cell should be UserHistoryCell"
|
||||
);
|
||||
|
||||
// Verify second cell is AgentMarkdownCell.
|
||||
assert!(
|
||||
transcript_cells[1].as_any().is::<AgentMarkdownCell>(),
|
||||
"second cell should be AgentMarkdownCell"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,11 +555,10 @@ impl TranscriptOverlay {
|
||||
|
||||
/// Replace a range of committed cells with a single consolidated cell.
|
||||
///
|
||||
/// Mirrors the splice performed on `App::transcript_cells` during
|
||||
/// `ConsolidateAgentMessage` so the Ctrl+T overlay stays in sync with the
|
||||
/// main transcript. The range is clamped defensively: cells may have been
|
||||
/// inserted after the overlay opened, leaving it with fewer entries than
|
||||
/// the main transcript.
|
||||
/// Mirrors source-backed stream consolidation on `App::transcript_cells` so the Ctrl+T overlay
|
||||
/// stays in sync with the main transcript. The range is clamped defensively: cells may have
|
||||
/// been inserted after the overlay opened, leaving it with fewer entries than the main
|
||||
/// transcript.
|
||||
pub(crate) fn consolidate_cells(
|
||||
&mut self,
|
||||
range: std::ops::Range<usize>,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Orchestrates commit-tick drains across streaming controllers.
|
||||
//! Orchestrates commit-tick drains for queue-backed plan streaming.
|
||||
//!
|
||||
//! This module bridges queue-based chunking policy (`chunking`) with the concrete stream
|
||||
//! controllers (`controller`). Callers provide the current controllers and tick scope; the module
|
||||
//! controller (`controller`). Callers provide the current controller and tick scope; the module
|
||||
//! computes queue pressure, selects a drain plan, applies it, and returns emitted history cells.
|
||||
//!
|
||||
//! The module preserves ordering by draining only from controller queue heads. It does not schedule
|
||||
@@ -24,7 +24,6 @@ use super::chunking::ChunkingMode;
|
||||
use super::chunking::DrainPlan;
|
||||
use super::chunking::QueueSnapshot;
|
||||
use super::controller::PlanStreamController;
|
||||
use super::controller::StreamController;
|
||||
|
||||
/// Describes whether a commit tick may run in all modes or only in catch-up mode.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
@@ -59,53 +58,38 @@ impl Default for CommitTickOutput {
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs one commit tick against the provided stream controllers.
|
||||
/// Runs one commit tick against the provided plan stream controller.
|
||||
///
|
||||
/// This function collects a [`QueueSnapshot`], asks [`AdaptiveChunkingPolicy`] for a
|
||||
/// [`ChunkingDecision`], and then applies the resulting [`DrainPlan`] to both controllers.
|
||||
/// If callers pass stale controller references (for example, references not tied to the
|
||||
/// [`ChunkingDecision`], and then applies the resulting [`DrainPlan`] to the controller.
|
||||
/// If callers pass a stale controller reference (for example, one not tied to the
|
||||
/// current turn), queue age can be misread and the policy may stay in catch-up longer
|
||||
/// than expected.
|
||||
pub(crate) fn run_commit_tick(
|
||||
policy: &mut AdaptiveChunkingPolicy,
|
||||
stream_controller: Option<&mut StreamController>,
|
||||
plan_stream_controller: Option<&mut PlanStreamController>,
|
||||
scope: CommitTickScope,
|
||||
now: Instant,
|
||||
) -> CommitTickOutput {
|
||||
let snapshot = stream_queue_snapshot(
|
||||
stream_controller.as_deref(),
|
||||
plan_stream_controller.as_deref(),
|
||||
now,
|
||||
);
|
||||
let snapshot = stream_queue_snapshot(plan_stream_controller.as_deref(), now);
|
||||
let decision = resolve_chunking_plan(policy, snapshot, now);
|
||||
if scope == CommitTickScope::CatchUpOnly && decision.mode != ChunkingMode::CatchUp {
|
||||
return CommitTickOutput::default();
|
||||
}
|
||||
|
||||
apply_commit_tick_plan(
|
||||
decision.drain_plan,
|
||||
stream_controller,
|
||||
plan_stream_controller,
|
||||
)
|
||||
apply_commit_tick_plan(decision.drain_plan, plan_stream_controller)
|
||||
}
|
||||
|
||||
/// Builds the combined queue-pressure snapshot consumed by chunking policy.
|
||||
///
|
||||
/// The snapshot sums queue depth across controllers and keeps the maximum oldest age
|
||||
/// so policy decisions reflect the most delayed queued line currently visible.
|
||||
/// Policy decisions reflect the most delayed queued plan line currently visible.
|
||||
fn stream_queue_snapshot(
|
||||
stream_controller: Option<&StreamController>,
|
||||
plan_stream_controller: Option<&PlanStreamController>,
|
||||
now: Instant,
|
||||
) -> QueueSnapshot {
|
||||
let mut queued_lines = 0usize;
|
||||
let mut oldest_age: Option<Duration> = None;
|
||||
|
||||
if let Some(controller) = stream_controller {
|
||||
queued_lines += controller.queued_lines();
|
||||
oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now));
|
||||
}
|
||||
if let Some(controller) = plan_stream_controller {
|
||||
queued_lines += controller.queued_lines();
|
||||
oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now));
|
||||
@@ -141,25 +125,16 @@ fn resolve_chunking_plan(
|
||||
decision
|
||||
}
|
||||
|
||||
/// Applies a [`DrainPlan`] to all available stream controllers.
|
||||
/// Applies a [`DrainPlan`] to the available plan stream controller.
|
||||
///
|
||||
/// The returned [`CommitTickOutput`] reports emitted cells and whether all
|
||||
/// present controllers are idle after draining.
|
||||
/// The returned [`CommitTickOutput`] reports emitted cells and whether the
|
||||
/// present controller is idle after draining.
|
||||
fn apply_commit_tick_plan(
|
||||
drain_plan: DrainPlan,
|
||||
stream_controller: Option<&mut StreamController>,
|
||||
plan_stream_controller: Option<&mut PlanStreamController>,
|
||||
) -> CommitTickOutput {
|
||||
let mut output = CommitTickOutput::default();
|
||||
|
||||
if let Some(controller) = stream_controller {
|
||||
output.has_controller = true;
|
||||
let (cell, is_idle) = drain_stream_controller(controller, drain_plan);
|
||||
if let Some(cell) = cell {
|
||||
output.cells.push(cell);
|
||||
}
|
||||
output.all_idle &= is_idle;
|
||||
}
|
||||
if let Some(controller) = plan_stream_controller {
|
||||
output.has_controller = true;
|
||||
let (cell, is_idle) = drain_plan_stream_controller(controller, drain_plan);
|
||||
@@ -172,25 +147,10 @@ fn apply_commit_tick_plan(
|
||||
output
|
||||
}
|
||||
|
||||
/// Applies one drain step to the main stream controller.
|
||||
///
|
||||
/// [`DrainPlan::Single`] maps to one-line drain; [`DrainPlan::Batch`] maps to
|
||||
/// multi-line drain (including instant catch-up when policy requests the full
|
||||
/// queued backlog).
|
||||
fn drain_stream_controller(
|
||||
controller: &mut StreamController,
|
||||
drain_plan: DrainPlan,
|
||||
) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
match drain_plan {
|
||||
DrainPlan::Single => controller.on_commit_tick(),
|
||||
DrainPlan::Batch(max_lines) => controller.on_commit_tick_batch(max_lines),
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies one drain step to the plan stream controller.
|
||||
///
|
||||
/// This mirrors [`drain_stream_controller`] so both controller types follow the
|
||||
/// same chunking policy decisions.
|
||||
/// [`DrainPlan::Single`] maps to one-line drain; [`DrainPlan::Batch`] maps to multi-line drain
|
||||
/// including instant catch-up when policy requests the full queued backlog.
|
||||
fn drain_plan_stream_controller(
|
||||
controller: &mut PlanStreamController,
|
||||
drain_plan: DrainPlan,
|
||||
|
||||
@@ -210,23 +210,24 @@ impl StreamCore {
|
||||
}
|
||||
}
|
||||
|
||||
/// Controls newline-gated streaming for assistant messages.
|
||||
/// Test-only newline-gated streaming controller for transient assistant cells.
|
||||
///
|
||||
/// The controller emits transient `AgentMessageCell`s for live display and returns raw markdown
|
||||
/// source on `finalize` so the app can replace those transient cells with a source-backed
|
||||
/// `AgentMarkdownCell`. Callers should use `set_width` on terminal resize; rebuilding the queue
|
||||
/// from already emitted cells would duplicate output instead of preserving the stream position.
|
||||
/// Production assistant streaming is source-backed. This controller remains as coverage for the
|
||||
/// lower-level line streamer behavior that the plan stream still shares.
|
||||
#[cfg(test)]
|
||||
pub(crate) struct StreamController {
|
||||
core: StreamCore,
|
||||
header_emitted: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl StreamController {
|
||||
/// Create a stream controller that renders markdown relative to the given width and cwd.
|
||||
///
|
||||
/// `width` is the content width available to markdown rendering, not necessarily the full
|
||||
/// terminal width. Passing a stale width after resize will keep queued live output wrapped for
|
||||
/// the old viewport until app-level reflow repairs the finalized transcript.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
|
||||
Self {
|
||||
core: StreamCore::new(width, cwd),
|
||||
@@ -238,6 +239,7 @@ impl StreamController {
|
||||
///
|
||||
/// Deltas are committed only through newline boundaries. A `false` return can still mean source
|
||||
/// was buffered; it only means no newly renderable complete line is ready for live emission.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn push(&mut self, delta: &str) -> bool {
|
||||
self.core.push_delta(delta)
|
||||
}
|
||||
@@ -247,6 +249,7 @@ impl StreamController {
|
||||
/// The source is `None` only when the stream never accumulated content. Callers that discard the
|
||||
/// returned source cannot later consolidate the transcript into a width-sensitive finalized
|
||||
/// cell.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn finalize(&mut self) -> (Option<Box<dyn HistoryCell>>, Option<String>) {
|
||||
let remaining = self.core.finalize_remaining();
|
||||
if self.core.raw_source.is_empty() {
|
||||
@@ -277,14 +280,7 @@ impl StreamController {
|
||||
self.core.queued_lines()
|
||||
}
|
||||
|
||||
pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
|
||||
self.core.oldest_queued_age(now)
|
||||
}
|
||||
|
||||
pub(crate) fn clear_queue(&mut self) {
|
||||
self.core.clear_queue();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn set_width(&mut self, width: Option<usize>) {
|
||||
self.core.set_width(width);
|
||||
}
|
||||
@@ -301,11 +297,53 @@ impl StreamController {
|
||||
}
|
||||
}
|
||||
|
||||
/// Keeps assistant output source-backed while the message is still streaming.
|
||||
///
|
||||
/// This controller does not emit transient history cells. The chat widget renders the current
|
||||
/// source through an active `AgentMarkdownCell`, then commits the same source-backed cell once the
|
||||
/// stream finishes.
|
||||
pub(crate) struct SourceBackedStreamController {
|
||||
markdown_source: String,
|
||||
cwd: PathBuf,
|
||||
}
|
||||
|
||||
impl SourceBackedStreamController {
|
||||
pub(crate) fn new(cwd: &Path) -> Self {
|
||||
Self {
|
||||
markdown_source: String::new(),
|
||||
cwd: cwd.to_path_buf(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push(&mut self, delta: &str) {
|
||||
self.markdown_source.push_str(delta);
|
||||
}
|
||||
|
||||
pub(crate) fn set_markdown(&mut self, markdown_source: String) {
|
||||
self.markdown_source = markdown_source;
|
||||
}
|
||||
|
||||
pub(crate) fn active_cell(&self) -> history_cell::AgentMarkdownCell {
|
||||
history_cell::AgentMarkdownCell::new(self.markdown_source.clone(), self.cwd.as_path())
|
||||
}
|
||||
|
||||
pub(crate) fn finalize(self) -> Option<Box<dyn HistoryCell>> {
|
||||
if self.markdown_source.trim().is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(Box::new(history_cell::AgentMarkdownCell::new(
|
||||
self.markdown_source,
|
||||
self.cwd.as_path(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Controls newline-gated streaming for proposed plan markdown.
|
||||
///
|
||||
/// This follows the same source-retention contract as `StreamController`, but wraps emitted lines
|
||||
/// in the proposed-plan header, padding, and style. Finalization must return source for
|
||||
/// `ProposedPlanCell`; otherwise a resized finalized plan would keep the transient stream shape.
|
||||
/// This follows the same source-retention contract as the lower-level stream core, but wraps
|
||||
/// emitted lines in the proposed-plan header, padding, and style. Finalization must return source
|
||||
/// for `ProposedPlanCell`; otherwise a resized finalized plan would keep the transient stream
|
||||
/// shape.
|
||||
pub(crate) struct PlanStreamController {
|
||||
core: StreamCore,
|
||||
header_emitted: bool,
|
||||
@@ -315,8 +353,8 @@ pub(crate) struct PlanStreamController {
|
||||
impl PlanStreamController {
|
||||
/// Create a proposed-plan stream controller that renders markdown relative to the given cwd.
|
||||
///
|
||||
/// The width has the same meaning as in `StreamController`: it is the markdown body width, and
|
||||
/// callers must update it when the terminal width changes.
|
||||
/// The width is the markdown body width, and callers must update it when the terminal width
|
||||
/// changes.
|
||||
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
|
||||
Self {
|
||||
core: StreamCore::new(width, cwd),
|
||||
|
||||
Reference in New Issue
Block a user