mirror of
https://github.com/openai/codex.git
synced 2026-02-08 01:43:46 +00:00
Compare commits
12 Commits
patch-squa
...
codex/add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c58b5c0910 | ||
|
|
0bd53ac078 | ||
|
|
9aa1331c92 | ||
|
|
555a172722 | ||
|
|
bd0f423d36 | ||
|
|
b0757a1c23 | ||
|
|
e98b35ac78 | ||
|
|
e6939025f5 | ||
|
|
f123cc6541 | ||
|
|
5f16fe8dda | ||
|
|
98b31b0390 | ||
|
|
9487ae4ce7 |
@@ -426,6 +426,12 @@ where
|
||||
// will never appear in a Chat Completions stream.
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
|
||||
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
|
||||
// Deltas are ignored here since aggregation waits for the
|
||||
// final OutputItemDone.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,6 +197,10 @@ impl ModelClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn streaming_enabled(&self) -> bool {
|
||||
self.config.streaming_enabled
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
@@ -205,6 +209,7 @@ struct SseEvent {
|
||||
kind: String,
|
||||
response: Option<Value>,
|
||||
item: Option<Value>,
|
||||
delta: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -315,7 +320,7 @@ where
|
||||
// duplicated `output` array embedded in the `response.completed`
|
||||
// payload. That produced two concrete issues:
|
||||
// 1. No real‑time streaming – the user only saw output after the
|
||||
// entire turn had finished, which broke the “typing” UX and
|
||||
// entire turn had finished, which broke the "typing" UX and
|
||||
// made long‑running turns look stalled.
|
||||
// 2. Duplicate `function_call_output` items – both the
|
||||
// individual *and* the completed array were forwarded, which
|
||||
@@ -337,6 +342,22 @@ where
|
||||
return;
|
||||
}
|
||||
}
|
||||
"response.output_text.delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
let event = ResponseEvent::OutputTextDelta(delta);
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
"response.reasoning_summary_text.delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
let event = ResponseEvent::ReasoningSummaryDelta(delta);
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
"response.created" => {
|
||||
if event.response.is_some() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
|
||||
@@ -360,10 +381,8 @@ where
|
||||
| "response.function_call_arguments.delta"
|
||||
| "response.in_progress"
|
||||
| "response.output_item.added"
|
||||
| "response.output_text.delta"
|
||||
| "response.output_text.done"
|
||||
| "response.reasoning_summary_part.added"
|
||||
| "response.reasoning_summary_text.delta"
|
||||
| "response.reasoning_summary_text.done" => {
|
||||
// Currently, we ignore these events, but we handle them
|
||||
// separately to skip the logging message in the `other` case.
|
||||
|
||||
@@ -53,6 +53,10 @@ impl Prompt {
|
||||
pub enum ResponseEvent {
|
||||
Created,
|
||||
OutputItemDone(ResponseItem),
|
||||
/// Streaming text from an assistant message.
|
||||
OutputTextDelta(String),
|
||||
/// Streaming text from a reasoning summary.
|
||||
ReasoningSummaryDelta(String),
|
||||
Completed {
|
||||
response_id: String,
|
||||
token_usage: Option<TokenUsage>,
|
||||
|
||||
@@ -1121,19 +1121,15 @@ async fn try_run_turn(
|
||||
|
||||
let mut stream = sess.client.clone().stream(&prompt).await?;
|
||||
|
||||
// Buffer all the incoming messages from the stream first, then execute them.
|
||||
// If we execute a function call in the middle of handling the stream, it can time out.
|
||||
let mut input = Vec::new();
|
||||
while let Some(event) = stream.next().await {
|
||||
input.push(event?);
|
||||
}
|
||||
|
||||
let mut output = Vec::new();
|
||||
for event in input {
|
||||
// Patch: buffer for non-streaming mode
|
||||
let mut assistant_message_buf = String::new();
|
||||
let streaming_enabled = sess.client.streaming_enabled();
|
||||
while let Some(event) = stream.next().await {
|
||||
let event = event?;
|
||||
match event {
|
||||
ResponseEvent::Created => {
|
||||
let mut state = sess.state.lock().unwrap();
|
||||
// We successfully created a new response and ensured that all pending calls were included so we can clear the pending call ids.
|
||||
state.pending_call_ids.clear();
|
||||
}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
@@ -1146,18 +1142,59 @@ async fn try_run_turn(
|
||||
_ => None,
|
||||
};
|
||||
if let Some(call_id) = call_id {
|
||||
// We just got a new call id so we need to make sure to respond to it in the next turn.
|
||||
let mut state = sess.state.lock().unwrap();
|
||||
state.pending_call_ids.insert(call_id.clone());
|
||||
}
|
||||
let response = handle_response_item(sess, sub_id, item.clone()).await?;
|
||||
|
||||
// Patch: buffer assistant message text if streaming is disabled
|
||||
if !streaming_enabled {
|
||||
if let ResponseItem::Message { role, content } = &item {
|
||||
if role == "assistant" {
|
||||
for c in content {
|
||||
if let ContentItem::OutputText { text } = c {
|
||||
assistant_message_buf.push_str(text);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let response = match &item {
|
||||
ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } => None,
|
||||
_ => handle_response_item(sess, sub_id, item.clone()).await?,
|
||||
};
|
||||
output.push(ProcessedResponseItem { item, response });
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(text) => {
|
||||
if streaming_enabled {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageEvent { message: text }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
} else {
|
||||
assistant_message_buf.push_str(&text);
|
||||
}
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(text) => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
} => {
|
||||
// Patch: emit full message if we buffered deltas
|
||||
if !streaming_enabled && !assistant_message_buf.is_empty() {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: assistant_message_buf.clone(),
|
||||
}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
if let Some(token_usage) = token_usage {
|
||||
sess.tx_event
|
||||
.send(Event {
|
||||
|
||||
@@ -131,6 +131,13 @@ pub struct Config {
|
||||
/// request using the Responses API.
|
||||
pub model_reasoning_summary: ReasoningSummary,
|
||||
|
||||
/// Whether to surface live streaming delta events in front-ends. When `true`
|
||||
/// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta`
|
||||
/// events and UIs may show a typing indicator. When `false` Codex UIs should
|
||||
/// ignore delta events and rely solely on the final aggregated
|
||||
/// `AgentMessage`/`AgentReasoning` items (legacy behaviour).
|
||||
pub streaming_enabled: bool,
|
||||
|
||||
/// When set to `true`, overrides the default heuristic and forces
|
||||
/// `model_supports_reasoning_summaries()` to return `true`.
|
||||
pub model_supports_reasoning_summaries: bool,
|
||||
@@ -321,6 +328,13 @@ pub struct ConfigToml {
|
||||
|
||||
/// Base URL for requests to ChatGPT (as opposed to the OpenAI API).
|
||||
pub chatgpt_base_url: Option<String>,
|
||||
|
||||
/// Whether to surface live streaming delta events in front-ends. When `true`
|
||||
/// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta`
|
||||
/// events and UIs may show a typing indicator. When `false` Codex UIs should
|
||||
/// ignore delta events and rely solely on the final aggregated
|
||||
/// `AgentMessage`/`AgentReasoning` items (legacy behaviour).
|
||||
pub streaming: Option<bool>,
|
||||
}
|
||||
|
||||
impl ConfigToml {
|
||||
@@ -486,6 +500,7 @@ impl Config {
|
||||
.or(cfg.model_reasoning_summary)
|
||||
.unwrap_or_default(),
|
||||
|
||||
streaming_enabled: cfg.streaming.unwrap_or(true),
|
||||
model_supports_reasoning_summaries: cfg
|
||||
.model_supports_reasoning_summaries
|
||||
.unwrap_or(false),
|
||||
@@ -798,6 +813,7 @@ disable_response_storage = true
|
||||
hide_agent_reasoning: false,
|
||||
model_reasoning_effort: ReasoningEffort::High,
|
||||
model_reasoning_summary: ReasoningSummary::Detailed,
|
||||
streaming_enabled: true,
|
||||
model_supports_reasoning_summaries: false,
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
},
|
||||
@@ -844,6 +860,7 @@ disable_response_storage = true
|
||||
hide_agent_reasoning: false,
|
||||
model_reasoning_effort: ReasoningEffort::default(),
|
||||
model_reasoning_summary: ReasoningSummary::default(),
|
||||
streaming_enabled: true,
|
||||
model_supports_reasoning_summaries: false,
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
};
|
||||
@@ -905,6 +922,7 @@ disable_response_storage = true
|
||||
hide_agent_reasoning: false,
|
||||
model_reasoning_effort: ReasoningEffort::default(),
|
||||
model_reasoning_summary: ReasoningSummary::default(),
|
||||
streaming_enabled: true,
|
||||
model_supports_reasoning_summaries: false,
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
};
|
||||
|
||||
@@ -150,7 +150,7 @@ pub type EnvironmentVariablePattern = WildMatchPattern<'*', '?'>;
|
||||
/// Deriving the `env` based on this policy works as follows:
|
||||
/// 1. Create an initial map based on the `inherit` policy.
|
||||
/// 2. If `ignore_default_excludes` is false, filter the map using the default
|
||||
/// exclude pattern(s), which are: `"*KEY*"` and `"*TOKEN*"`.
|
||||
/// exclude pattern(s), which are: "*KEY*" and "*TOKEN*".
|
||||
/// 3. If `exclude` is not empty, filter the map using the provided patterns.
|
||||
/// 4. Insert any entries from `r#set` into the map.
|
||||
/// 5. If non-empty, filter the map using the `include_only` patterns.
|
||||
@@ -228,3 +228,10 @@ pub enum ReasoningSummary {
|
||||
/// Option to disable reasoning summaries.
|
||||
None,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// NOTE: The canonical ConfigToml definition lives in `crate::config`.
|
||||
// Historically this file accidentally re-declared that struct, which caused
|
||||
// drift and confusion. The duplicate has been removed; please use
|
||||
// `codex_core::config::ConfigToml` instead.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -282,9 +282,15 @@ pub enum EventMsg {
|
||||
/// Agent text output message
|
||||
AgentMessage(AgentMessageEvent),
|
||||
|
||||
/// Incremental assistant text delta
|
||||
AgentMessageDelta(AgentMessageEvent),
|
||||
|
||||
/// Reasoning event from agent.
|
||||
AgentReasoning(AgentReasoningEvent),
|
||||
|
||||
/// Incremental reasoning text delta.
|
||||
AgentReasoningDelta(AgentReasoningEvent),
|
||||
|
||||
/// Ack the client's configure message.
|
||||
SessionConfigured(SessionConfiguredEvent),
|
||||
|
||||
|
||||
@@ -58,6 +58,8 @@ async fn chat_mode_stream_cli() {
|
||||
.arg(&provider_override)
|
||||
.arg("-c")
|
||||
.arg("model_provider=\"mock\"")
|
||||
.arg("-c")
|
||||
.arg("streaming=false")
|
||||
.arg("-C")
|
||||
.arg(env!("CARGO_MANIFEST_DIR"))
|
||||
.arg("hello?");
|
||||
@@ -71,8 +73,8 @@ async fn chat_mode_stream_cli() {
|
||||
println!("Stderr:\n{}", String::from_utf8_lossy(&output.stderr));
|
||||
assert!(output.status.success());
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
assert!(stdout.contains("hi"));
|
||||
assert_eq!(stdout.matches("hi").count(), 1);
|
||||
let hi_lines = stdout.lines().filter(|line| line.trim() == "hi").count();
|
||||
assert_eq!(hi_lines, 1, "Expected exactly one line with 'hi'");
|
||||
|
||||
server.verify().await;
|
||||
}
|
||||
@@ -104,6 +106,8 @@ async fn responses_api_stream_cli() {
|
||||
.arg("--")
|
||||
.arg("exec")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-c")
|
||||
.arg("streaming=false")
|
||||
.arg("-C")
|
||||
.arg(env!("CARGO_MANIFEST_DIR"))
|
||||
.arg("hello?");
|
||||
@@ -117,3 +121,188 @@ async fn responses_api_stream_cli() {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
assert!(stdout.contains("fixture hello"));
|
||||
}
|
||||
|
||||
/// Tests chat completions with streaming enabled (streaming=true) through the CLI using a mock server.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn chat_mode_streaming_enabled_cli() {
|
||||
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
|
||||
println!(
|
||||
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let server = MockServer::start().await;
|
||||
// Simulate streaming deltas: 'h' and 'i' as separate chunks
|
||||
let sse = concat!(
|
||||
"data: {\"choices\":[{\"delta\":{\"content\":\"h\"}}]}\n\n",
|
||||
"data: {\"choices\":[{\"delta\":{\"content\":\"i\"}}]}\n\n",
|
||||
"data: {\"choices\":[{\"delta\":{}}]}\n\n",
|
||||
"data: [DONE]\n\n"
|
||||
);
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat/completions"))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(sse, "text/event-stream"),
|
||||
)
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let home = TempDir::new().unwrap();
|
||||
let provider_override = format!(
|
||||
"model_providers.mock={{ name = \"mock\", base_url = \"{}/v1\", env_key = \"PATH\", wire_api = \"chat\" }}",
|
||||
server.uri()
|
||||
);
|
||||
let mut cmd = AssertCommand::new("cargo");
|
||||
cmd.arg("run")
|
||||
.arg("-p")
|
||||
.arg("codex-cli")
|
||||
.arg("--quiet")
|
||||
.arg("--")
|
||||
.arg("exec")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-c")
|
||||
.arg(&provider_override)
|
||||
.arg("-c")
|
||||
.arg("model_provider=\"mock\"")
|
||||
.arg("-c")
|
||||
.arg("streaming=true")
|
||||
.arg("-C")
|
||||
.arg(env!("CARGO_MANIFEST_DIR"))
|
||||
.arg("hello?");
|
||||
cmd.env("CODEX_HOME", home.path())
|
||||
.env("OPENAI_API_KEY", "dummy")
|
||||
.env("OPENAI_BASE_URL", format!("{}/v1", server.uri()));
|
||||
|
||||
let output = cmd.output().unwrap();
|
||||
assert!(output.status.success());
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
// Assert that 'h' and 'i' are output as two separate chunks from stdout, not as a single chunk
|
||||
// We split the output on 'h' and 'i' and check their order and separation
|
||||
let mut chunks = Vec::new();
|
||||
let mut last = 0;
|
||||
for (idx, c) in stdout.char_indices() {
|
||||
if c == 'h' || c == 'i' {
|
||||
if last != idx {
|
||||
let chunk = &stdout[last..idx];
|
||||
if !chunk.trim().is_empty() {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
}
|
||||
chunks.push(&stdout[idx..idx + c.len_utf8()]);
|
||||
last = idx + c.len_utf8();
|
||||
}
|
||||
}
|
||||
if last < stdout.len() {
|
||||
let chunk = &stdout[last..];
|
||||
if !chunk.trim().is_empty() {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
}
|
||||
// Only keep the 'h' and 'i' chunks
|
||||
let delta_chunks: Vec<&str> = chunks
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|s| *s == "h" || *s == "i")
|
||||
.collect();
|
||||
assert_eq!(
|
||||
delta_chunks,
|
||||
vec!["h", "i"],
|
||||
"Expected two separate delta chunks 'h' and 'i' from stdout"
|
||||
);
|
||||
|
||||
server.verify().await;
|
||||
}
|
||||
|
||||
/// Tests responses API with streaming enabled (streaming=true) through the CLI using a local SSE fixture file.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_api_streaming_enabled_cli() {
|
||||
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
|
||||
println!(
|
||||
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a fixture with two deltas: 'fixture ' and 'hello'
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("tests/cli_responses_fixture_streaming.sse");
|
||||
let mut fixture_file = fs::File::create(&fixture_path).unwrap();
|
||||
writeln!(fixture_file, "event: response.created").unwrap();
|
||||
writeln!(
|
||||
fixture_file,
|
||||
"data: {{\"type\":\"response.created\",\"response\":{{\"id\":\"resp1\"}}}}\n"
|
||||
)
|
||||
.unwrap();
|
||||
writeln!(fixture_file, "event: response.output_text.delta").unwrap();
|
||||
writeln!(fixture_file, "data: {{\"type\":\"response.output_text.delta\",\"delta\":\"fixture \",\"item_id\":\"msg1\"}}\n").unwrap();
|
||||
writeln!(fixture_file, "event: response.output_text.delta").unwrap();
|
||||
writeln!(fixture_file, "data: {{\"type\":\"response.output_text.delta\",\"delta\":\"hello\",\"item_id\":\"msg1\"}}\n").unwrap();
|
||||
writeln!(fixture_file, "event: response.output_text.done").unwrap();
|
||||
writeln!(fixture_file, "data: {{\"type\":\"response.output_text.done\",\"text\":\"fixture hello\",\"item_id\":\"msg1\"}}\n").unwrap();
|
||||
writeln!(fixture_file, "event: response.output_item.done").unwrap();
|
||||
writeln!(fixture_file, "data: {{\"type\":\"response.output_item.done\",\"item\":{{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{{\"type\":\"output_text\",\"text\":\"fixture hello\"}}]}}}}\n").unwrap();
|
||||
writeln!(fixture_file, "event: response.completed").unwrap();
|
||||
writeln!(fixture_file, "data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"resp1\",\"output\":[]}}}}\n").unwrap();
|
||||
|
||||
let home = TempDir::new().unwrap();
|
||||
let mut cmd = AssertCommand::new("cargo");
|
||||
cmd.arg("run")
|
||||
.arg("-p")
|
||||
.arg("codex-cli")
|
||||
.arg("--quiet")
|
||||
.arg("--")
|
||||
.arg("exec")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-c")
|
||||
.arg("streaming=true")
|
||||
.arg("-C")
|
||||
.arg(env!("CARGO_MANIFEST_DIR"))
|
||||
.arg("hello?");
|
||||
cmd.env("CODEX_HOME", home.path())
|
||||
.env("OPENAI_API_KEY", "dummy")
|
||||
.env("CODEX_RS_SSE_FIXTURE", &fixture_path)
|
||||
.env("OPENAI_BASE_URL", "http://unused.local");
|
||||
|
||||
let output = cmd.output().unwrap();
|
||||
assert!(output.status.success());
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
// Assert that 'fixture ' and 'hello' are output as two separate chunks from stdout, not as a single chunk
|
||||
// We split the output on the known delta substrings and check their order and separation
|
||||
let mut chunks = Vec::new();
|
||||
let mut last = 0;
|
||||
for pat in ["fixture ", "hello"] {
|
||||
if let Some(idx) = stdout[last..].find(pat) {
|
||||
if last != last + idx {
|
||||
let chunk = &stdout[last..last + idx];
|
||||
if !chunk.trim().is_empty() {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
}
|
||||
chunks.push(&stdout[last + idx..last + idx + pat.len()]);
|
||||
last = last + idx + pat.len();
|
||||
}
|
||||
}
|
||||
if last < stdout.len() {
|
||||
let chunk = &stdout[last..];
|
||||
if !chunk.trim().is_empty() {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
}
|
||||
// Only keep the delta chunks
|
||||
let delta_chunks: Vec<&str> = chunks
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|s| *s == "fixture " || *s == "hello")
|
||||
.collect();
|
||||
assert_eq!(
|
||||
delta_chunks,
|
||||
vec!["fixture ", "hello"],
|
||||
"Expected two separate delta chunks 'fixture ' and 'hello' from stdout"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ use owo_colors::OwoColorize;
|
||||
use owo_colors::Style;
|
||||
use shlex::try_join;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::io::{self};
|
||||
use std::time::Instant;
|
||||
|
||||
/// This should be configurable. When used in CI, users may not want to impose
|
||||
@@ -50,10 +52,20 @@ pub(crate) struct EventProcessor {
|
||||
|
||||
/// Whether to include `AgentReasoning` events in the output.
|
||||
show_agent_reasoning: bool,
|
||||
/// Whether to surface streaming deltas (true = print deltas + suppress final message).
|
||||
streaming_enabled: bool,
|
||||
/// Internal: have we already printed the `codex` header for the current streaming turn?
|
||||
printed_agent_header: bool,
|
||||
/// Internal: have we already printed the `thinking` header for current streaming turn?
|
||||
printed_reasoning_header: bool,
|
||||
}
|
||||
|
||||
impl EventProcessor {
|
||||
pub(crate) fn create_with_ansi(with_ansi: bool, show_agent_reasoning: bool) -> Self {
|
||||
pub(crate) fn create_with_ansi(
|
||||
with_ansi: bool,
|
||||
show_agent_reasoning: bool,
|
||||
streaming_enabled: bool,
|
||||
) -> Self {
|
||||
let call_id_to_command = HashMap::new();
|
||||
let call_id_to_patch = HashMap::new();
|
||||
let call_id_to_tool_call = HashMap::new();
|
||||
@@ -71,6 +83,9 @@ impl EventProcessor {
|
||||
cyan: Style::new().cyan(),
|
||||
call_id_to_tool_call,
|
||||
show_agent_reasoning,
|
||||
streaming_enabled,
|
||||
printed_agent_header: false,
|
||||
printed_reasoning_header: false,
|
||||
}
|
||||
} else {
|
||||
Self {
|
||||
@@ -85,6 +100,9 @@ impl EventProcessor {
|
||||
cyan: Style::new(),
|
||||
call_id_to_tool_call,
|
||||
show_agent_reasoning,
|
||||
streaming_enabled,
|
||||
printed_agent_header: false,
|
||||
printed_reasoning_header: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -179,17 +197,46 @@ impl EventProcessor {
|
||||
ts_println!(self, "{}", message.style(self.dimmed));
|
||||
}
|
||||
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
|
||||
// Reset streaming headers at start/end boundaries.
|
||||
if matches!(msg, EventMsg::TaskStarted) {
|
||||
self.printed_agent_header = false;
|
||||
self.printed_reasoning_header = false;
|
||||
}
|
||||
// Ignore.
|
||||
}
|
||||
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
|
||||
ts_println!(self, "tokens used: {total_tokens}");
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}\n{message}",
|
||||
"codex".style(self.bold).style(self.magenta)
|
||||
);
|
||||
if self.streaming_enabled {
|
||||
// Suppress full message when streaming; final markdown not printed in CLI.
|
||||
// If no deltas were seen, fall back to printing now.
|
||||
if !self.printed_agent_header {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}\n{message}",
|
||||
"codex".style(self.bold).style(self.magenta)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}\n{message}",
|
||||
"codex".style(self.bold).style(self.magenta)
|
||||
);
|
||||
}
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => {
|
||||
if !self.streaming_enabled {
|
||||
// streaming disabled, ignore
|
||||
} else {
|
||||
if !self.printed_agent_header {
|
||||
ts_println!(self, "{}", "codex".style(self.bold).style(self.magenta));
|
||||
self.printed_agent_header = true;
|
||||
}
|
||||
print!("{message}");
|
||||
let _ = io::stdout().flush();
|
||||
}
|
||||
}
|
||||
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||||
call_id,
|
||||
@@ -343,7 +390,7 @@ impl EventProcessor {
|
||||
);
|
||||
|
||||
// Pretty-print the patch summary with colored diff markers so
|
||||
// it’s easy to scan in the terminal output.
|
||||
// it's easy to scan in the terminal output.
|
||||
for (path, change) in changes.iter() {
|
||||
match change {
|
||||
FileChange::Add { content } => {
|
||||
@@ -441,12 +488,37 @@ impl EventProcessor {
|
||||
}
|
||||
EventMsg::AgentReasoning(agent_reasoning_event) => {
|
||||
if self.show_agent_reasoning {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}\n{}",
|
||||
"thinking".style(self.italic).style(self.magenta),
|
||||
agent_reasoning_event.text
|
||||
);
|
||||
if self.streaming_enabled {
|
||||
if !self.printed_reasoning_header {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}\n{}",
|
||||
"thinking".style(self.italic).style(self.magenta),
|
||||
agent_reasoning_event.text
|
||||
);
|
||||
}
|
||||
} else {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}\n{}",
|
||||
"thinking".style(self.italic).style(self.magenta),
|
||||
agent_reasoning_event.text
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(agent_reasoning_event) => {
|
||||
if self.show_agent_reasoning && self.streaming_enabled {
|
||||
if !self.printed_reasoning_header {
|
||||
ts_println!(
|
||||
self,
|
||||
"{}",
|
||||
"thinking".style(self.italic).style(self.magenta)
|
||||
);
|
||||
self.printed_reasoning_header = true;
|
||||
}
|
||||
print!("{}", agent_reasoning_event.text);
|
||||
let _ = io::stdout().flush();
|
||||
}
|
||||
}
|
||||
EventMsg::SessionConfigured(session_configured_event) => {
|
||||
|
||||
@@ -115,8 +115,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
};
|
||||
|
||||
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
|
||||
let mut event_processor =
|
||||
EventProcessor::create_with_ansi(stdout_with_ansi, !config.hide_agent_reasoning);
|
||||
println!("[DEBUG] streaming_enabled: {}", config.streaming_enabled);
|
||||
let mut event_processor = EventProcessor::create_with_ansi(
|
||||
stdout_with_ansi,
|
||||
!config.hide_agent_reasoning,
|
||||
config.streaming_enabled,
|
||||
);
|
||||
// Print the effective configuration and prompt so users can see what Codex
|
||||
// is using.
|
||||
event_processor.print_config_summary(&config, &prompt);
|
||||
|
||||
@@ -175,6 +175,8 @@ pub async fn run_codex_tool_session(
|
||||
| EventMsg::TaskStarted
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
| EventMsg::McpToolCallEnd(_)
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
|
||||
@@ -40,6 +40,15 @@ use crate::history_cell::PatchEventType;
|
||||
use crate::user_approval_widget::ApprovalRequest;
|
||||
use codex_file_search::FileMatch;
|
||||
|
||||
/// Bookkeeping for a live streaming cell. We track the `sub_id` to know when
|
||||
/// a new turn has started (and thus when to start a new cell) and accumulate
|
||||
/// the full text so we can re-render markdown cleanly when the turn ends.
|
||||
#[derive(Default)]
|
||||
struct StreamingBuf {
|
||||
sub_id: Option<String>,
|
||||
text: String,
|
||||
}
|
||||
|
||||
pub(crate) struct ChatWidget<'a> {
|
||||
app_event_tx: AppEventSender,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
@@ -49,6 +58,10 @@ pub(crate) struct ChatWidget<'a> {
|
||||
config: Config,
|
||||
initial_user_message: Option<UserMessage>,
|
||||
token_usage: TokenUsage,
|
||||
/// Accumulates assistant streaming text for the *current* turn.
|
||||
streaming_agent: StreamingBuf,
|
||||
/// Accumulates reasoning streaming text for the *current* turn.
|
||||
streaming_reasoning: StreamingBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
@@ -135,6 +148,8 @@ impl ChatWidget<'_> {
|
||||
initial_images,
|
||||
),
|
||||
token_usage: TokenUsage::default(),
|
||||
streaming_agent: StreamingBuf::default(),
|
||||
streaming_reasoning: StreamingBuf::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,6 +235,8 @@ impl ChatWidget<'_> {
|
||||
|
||||
pub(crate) fn handle_codex_event(&mut self, event: Event) {
|
||||
let Event { id, msg } = event;
|
||||
// We need a copy of `id` for streaming bookkeeping because it is moved into some match arms.
|
||||
let event_id = id.clone();
|
||||
match msg {
|
||||
EventMsg::SessionConfigured(event) => {
|
||||
// Record session information at the top of the conversation.
|
||||
@@ -240,14 +257,111 @@ impl ChatWidget<'_> {
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
if self.config.streaming_enabled {
|
||||
// Final full assistant message. If we have an in-flight streaming cell for this id, replace it.
|
||||
let same_turn = self
|
||||
.streaming_agent
|
||||
.sub_id
|
||||
.as_ref()
|
||||
.map(|s| s == &event_id)
|
||||
.unwrap_or(false);
|
||||
if same_turn {
|
||||
self.conversation_history
|
||||
.replace_last_agent_message(&self.config, message.clone());
|
||||
self.streaming_agent.sub_id = None;
|
||||
self.streaming_agent.text.clear();
|
||||
} else {
|
||||
// Streaming enabled but we never saw deltas – just render normally.
|
||||
self.finalize_streams_if_new_turn(&event_id);
|
||||
self.conversation_history
|
||||
.add_agent_message(&self.config, message.clone());
|
||||
}
|
||||
} else {
|
||||
// Streaming disabled -> always render final message, ignore any deltas.
|
||||
self.conversation_history
|
||||
.add_agent_message(&self.config, message.clone());
|
||||
}
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => {
|
||||
// Streaming Assistant text.
|
||||
if !self.config.streaming_enabled {
|
||||
// Ignore when streaming disabled.
|
||||
return;
|
||||
}
|
||||
// Start a new cell if this delta belongs to a new turn.
|
||||
let is_new_stream = self
|
||||
.streaming_agent
|
||||
.sub_id
|
||||
.as_ref()
|
||||
.map(|s| s != &event_id)
|
||||
.unwrap_or(true);
|
||||
if is_new_stream {
|
||||
// Finalise any in-flight stream from the prior turn.
|
||||
self.finalize_streams_if_new_turn(&event_id);
|
||||
// Start a header-only streaming cell so we don't parse partial markdown.
|
||||
self.conversation_history
|
||||
.add_agent_message(&self.config, String::new());
|
||||
self.streaming_agent.sub_id = Some(event_id.clone());
|
||||
self.streaming_agent.text.clear();
|
||||
}
|
||||
// Accumulate full text; incremental markdown re-render happens in ConversationHistoryWidget.
|
||||
self.streaming_agent.text.push_str(&message);
|
||||
self.conversation_history
|
||||
.add_agent_message(&self.config, message);
|
||||
.append_agent_message_delta(&self.config, message);
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
|
||||
if !self.config.hide_agent_reasoning {
|
||||
if self.config.streaming_enabled {
|
||||
// Final full reasoning summary. Replace streaming cell if same turn.
|
||||
let same_turn = self
|
||||
.streaming_reasoning
|
||||
.sub_id
|
||||
.as_ref()
|
||||
.map(|s| s == &event_id)
|
||||
.unwrap_or(false);
|
||||
if same_turn {
|
||||
self.conversation_history
|
||||
.replace_last_agent_reasoning(&self.config, text.clone());
|
||||
self.streaming_reasoning.sub_id = None;
|
||||
self.streaming_reasoning.text.clear();
|
||||
} else {
|
||||
self.finalize_streams_if_new_turn(&event_id);
|
||||
self.conversation_history
|
||||
.add_agent_reasoning(&self.config, text.clone());
|
||||
}
|
||||
} else {
|
||||
self.conversation_history
|
||||
.add_agent_reasoning(&self.config, text.clone());
|
||||
}
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }) => {
|
||||
if !self.config.hide_agent_reasoning {
|
||||
if !self.config.streaming_enabled {
|
||||
// Ignore when streaming disabled.
|
||||
return;
|
||||
}
|
||||
let is_new_stream = self
|
||||
.streaming_reasoning
|
||||
.sub_id
|
||||
.as_ref()
|
||||
.map(|s| s != &event_id)
|
||||
.unwrap_or(true);
|
||||
if is_new_stream {
|
||||
self.finalize_streams_if_new_turn(&event_id);
|
||||
// Start header-only streaming cell.
|
||||
self.conversation_history
|
||||
.add_agent_reasoning(&self.config, String::new());
|
||||
self.streaming_reasoning.sub_id = Some(event_id.clone());
|
||||
self.streaming_reasoning.text.clear();
|
||||
}
|
||||
// Accumulate full text; incremental markdown re-render happens in ConversationHistoryWidget.
|
||||
self.streaming_reasoning.text.push_str(&text);
|
||||
self.conversation_history
|
||||
.add_agent_reasoning(&self.config, text);
|
||||
.append_agent_reasoning_delta(&self.config, text);
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
@@ -259,6 +373,8 @@ impl ChatWidget<'_> {
|
||||
EventMsg::TaskComplete(TaskCompleteEvent {
|
||||
last_agent_message: _,
|
||||
}) => {
|
||||
// Turn has ended – ensure no lingering streaming cells remain un-finalised.
|
||||
self.finalize_streams();
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.request_redraw();
|
||||
}
|
||||
@@ -438,6 +554,42 @@ impl ChatWidget<'_> {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Finalise (render) streaming buffers when we detect a new turn id.
|
||||
fn finalize_streams_if_new_turn(&mut self, new_id: &str) {
|
||||
// If the incoming id differs from the current stream id(s) we must flush.
|
||||
let agent_changed = self
|
||||
.streaming_agent
|
||||
.sub_id
|
||||
.as_ref()
|
||||
.map(|s| s != new_id)
|
||||
.unwrap_or(false);
|
||||
let reasoning_changed = self
|
||||
.streaming_reasoning
|
||||
.sub_id
|
||||
.as_ref()
|
||||
.map(|s| s != new_id)
|
||||
.unwrap_or(false);
|
||||
if agent_changed || reasoning_changed {
|
||||
self.finalize_streams();
|
||||
}
|
||||
}
|
||||
|
||||
/// Re-render any in-flight streaming cells with full markdown and clear buffers.
|
||||
fn finalize_streams(&mut self) {
|
||||
let had_agent = self.streaming_agent.sub_id.take().is_some();
|
||||
if had_agent {
|
||||
let text = std::mem::take(&mut self.streaming_agent.text);
|
||||
self.conversation_history
|
||||
.replace_last_agent_message(&self.config, text);
|
||||
}
|
||||
let had_reasoning = self.streaming_reasoning.sub_id.take().is_some();
|
||||
if had_reasoning {
|
||||
let text = std::mem::take(&mut self.streaming_reasoning.text);
|
||||
self.conversation_history
|
||||
.replace_last_agent_reasoning(&self.config, text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WidgetRef for &ChatWidget<'_> {
|
||||
|
||||
@@ -9,9 +9,9 @@ use crossterm::event::KeyCode;
|
||||
use crossterm::event::KeyEvent;
|
||||
use ratatui::prelude::*;
|
||||
use ratatui::style::Style;
|
||||
use ratatui::text::Span;
|
||||
use ratatui::widgets::*;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::cell::Cell as StdCell;
|
||||
use std::cell::Cell;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
@@ -26,24 +26,30 @@ pub struct ConversationHistoryWidget {
|
||||
entries: Vec<Entry>,
|
||||
/// The width (in terminal cells/columns) that [`Entry::line_count`] was
|
||||
/// computed for. When the available width changes we recompute counts.
|
||||
cached_width: StdCell<u16>,
|
||||
cached_width: Cell<u16>,
|
||||
scroll_position: usize,
|
||||
/// Number of lines the last time render_ref() was called
|
||||
num_rendered_lines: StdCell<usize>,
|
||||
num_rendered_lines: Cell<usize>,
|
||||
/// The height of the viewport last time render_ref() was called
|
||||
last_viewport_height: StdCell<usize>,
|
||||
last_viewport_height: Cell<usize>,
|
||||
has_input_focus: bool,
|
||||
/// Scratch buffer used while incrementally streaming an agent message so we can re-render markdown at newline boundaries.
|
||||
streaming_agent_message_buf: String,
|
||||
/// Scratch buffer used while incrementally streaming agent reasoning so we can re-render markdown at newline boundaries.
|
||||
streaming_agent_reasoning_buf: String,
|
||||
}
|
||||
|
||||
impl ConversationHistoryWidget {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
entries: Vec::new(),
|
||||
cached_width: StdCell::new(0),
|
||||
cached_width: Cell::new(0),
|
||||
scroll_position: usize::MAX,
|
||||
num_rendered_lines: StdCell::new(0),
|
||||
last_viewport_height: StdCell::new(0),
|
||||
num_rendered_lines: Cell::new(0),
|
||||
last_viewport_height: Cell::new(0),
|
||||
has_input_focus: false,
|
||||
streaming_agent_message_buf: String::new(),
|
||||
streaming_agent_reasoning_buf: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,38 +90,26 @@ impl ConversationHistoryWidget {
|
||||
}
|
||||
|
||||
fn scroll_up(&mut self, num_lines: u32) {
|
||||
// If a user is scrolling up from the "stick to bottom" mode, we need to
|
||||
// map this to a specific scroll position so we can calculate the delta.
|
||||
// This requires us to care about how tall the screen is.
|
||||
// Convert sticky-to-bottom sentinel into a concrete offset anchored at the bottom.
|
||||
if self.scroll_position == usize::MAX {
|
||||
self.scroll_position = self
|
||||
.num_rendered_lines
|
||||
.get()
|
||||
.saturating_sub(self.last_viewport_height.get());
|
||||
self.scroll_position = sticky_offset(
|
||||
self.num_rendered_lines.get(),
|
||||
self.last_viewport_height.get(),
|
||||
);
|
||||
}
|
||||
|
||||
self.scroll_position = self.scroll_position.saturating_sub(num_lines as usize);
|
||||
}
|
||||
|
||||
fn scroll_down(&mut self, num_lines: u32) {
|
||||
// If we're already pinned to the bottom there's nothing to do.
|
||||
// Nothing to do if we're already pinned to the bottom.
|
||||
if self.scroll_position == usize::MAX {
|
||||
return;
|
||||
}
|
||||
|
||||
let viewport_height = self.last_viewport_height.get().max(1);
|
||||
let num_rendered_lines = self.num_rendered_lines.get();
|
||||
|
||||
// Compute the maximum explicit scroll offset that still shows a full
|
||||
// viewport. This mirrors the calculation in `scroll_page_down()` and
|
||||
// in the render path.
|
||||
let max_scroll = num_rendered_lines.saturating_sub(viewport_height);
|
||||
|
||||
let max_scroll = sticky_offset(self.num_rendered_lines.get(), viewport_height);
|
||||
let new_pos = self.scroll_position.saturating_add(num_lines as usize);
|
||||
|
||||
if new_pos >= max_scroll {
|
||||
// Reached (or passed) the bottom – switch to stick‑to‑bottom mode
|
||||
// so that additional output keeps the view pinned automatically.
|
||||
// Switch to sticky-bottom mode so subsequent output pins view.
|
||||
self.scroll_position = usize::MAX;
|
||||
} else {
|
||||
self.scroll_position = new_pos;
|
||||
@@ -125,44 +119,21 @@ impl ConversationHistoryWidget {
|
||||
/// Scroll up by one full viewport height (Page Up).
|
||||
fn scroll_page_up(&mut self) {
|
||||
let viewport_height = self.last_viewport_height.get().max(1);
|
||||
|
||||
// If we are currently in the "stick to bottom" mode, first convert the
|
||||
// implicit scroll position (`usize::MAX`) into an explicit offset that
|
||||
// represents the very bottom of the scroll region. This mirrors the
|
||||
// logic from `scroll_up()`.
|
||||
if self.scroll_position == usize::MAX {
|
||||
self.scroll_position = self
|
||||
.num_rendered_lines
|
||||
.get()
|
||||
.saturating_sub(viewport_height);
|
||||
self.scroll_position = sticky_offset(self.num_rendered_lines.get(), viewport_height);
|
||||
}
|
||||
|
||||
// Move up by a full page.
|
||||
self.scroll_position = self.scroll_position.saturating_sub(viewport_height);
|
||||
}
|
||||
|
||||
/// Scroll down by one full viewport height (Page Down).
|
||||
fn scroll_page_down(&mut self) {
|
||||
// Nothing to do if we're already stuck to the bottom.
|
||||
if self.scroll_position == usize::MAX {
|
||||
return;
|
||||
}
|
||||
|
||||
let viewport_height = self.last_viewport_height.get().max(1);
|
||||
let num_lines = self.num_rendered_lines.get();
|
||||
|
||||
// Calculate the maximum explicit scroll offset that is still within
|
||||
// range. This matches the logic in `scroll_down()` and the render
|
||||
// method.
|
||||
let max_scroll = num_lines.saturating_sub(viewport_height);
|
||||
|
||||
// Attempt to move down by a full page.
|
||||
let max_scroll = sticky_offset(self.num_rendered_lines.get(), viewport_height);
|
||||
let new_pos = self.scroll_position.saturating_add(viewport_height);
|
||||
|
||||
if new_pos >= max_scroll {
|
||||
// We have reached (or passed) the bottom – switch back to
|
||||
// automatic stick‑to‑bottom mode so that subsequent output keeps
|
||||
// the viewport pinned.
|
||||
self.scroll_position = usize::MAX;
|
||||
} else {
|
||||
self.scroll_position = new_pos;
|
||||
@@ -195,13 +166,107 @@ impl ConversationHistoryWidget {
|
||||
}
|
||||
|
||||
pub fn add_agent_message(&mut self, config: &Config, message: String) {
|
||||
// Reset streaming scratch because we are starting a fresh agent message.
|
||||
self.streaming_agent_message_buf.clear();
|
||||
self.streaming_agent_message_buf.push_str(&message);
|
||||
self.add_to_history(HistoryCell::new_agent_message(config, message));
|
||||
}
|
||||
|
||||
pub fn add_agent_reasoning(&mut self, config: &Config, text: String) {
|
||||
self.streaming_agent_reasoning_buf.clear();
|
||||
self.streaming_agent_reasoning_buf.push_str(&text);
|
||||
self.add_to_history(HistoryCell::new_agent_reasoning(config, text));
|
||||
}
|
||||
|
||||
/// Append incremental assistant text.
|
||||
///
|
||||
/// Previous heuristic: fast‑append chunks until we saw a newline, then re‑render.
|
||||
/// This caused visible "one‑word" lines (e.g., "The" -> "The user") when models
|
||||
/// streamed small token fragments and also delayed Markdown styling (headings, code fences)
|
||||
/// until the first newline arrived. To improve perceived quality we now *always* re‑render
|
||||
/// the accumulated markdown buffer on every incoming delta chunk. We still apply the
|
||||
/// soft‑break collapsing heuristic (outside fenced code blocks) so interim layout more closely
|
||||
/// matches the final message and reduces layout thrash.
|
||||
pub fn append_agent_message_delta(&mut self, _config: &Config, text: String) {
|
||||
if text.is_empty() {
|
||||
return;
|
||||
}
|
||||
// Accumulate full buffer.
|
||||
self.streaming_agent_message_buf.push_str(&text);
|
||||
|
||||
let collapsed = collapse_single_newlines_for_streaming(&self.streaming_agent_message_buf);
|
||||
if let Some(idx) = last_agent_message_idx(&self.entries) {
|
||||
let width = self.cached_width.get();
|
||||
let entry = &mut self.entries[idx];
|
||||
entry.cell = HistoryCell::new_agent_message(_config, collapsed);
|
||||
// Drop trailing blank so we can continue streaming additional tokens cleanly.
|
||||
if let HistoryCell::AgentMessage { view } = &mut entry.cell {
|
||||
drop_trailing_blank_line(&mut view.lines);
|
||||
}
|
||||
if width > 0 {
|
||||
update_entry_height(entry, width);
|
||||
}
|
||||
} else {
|
||||
// No existing cell? Start a new one.
|
||||
self.add_agent_message(_config, self.streaming_agent_message_buf.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Append incremental reasoning text (mirrors `append_agent_message_delta`).
|
||||
pub fn append_agent_reasoning_delta(&mut self, _config: &Config, text: String) {
|
||||
if text.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.streaming_agent_reasoning_buf.push_str(&text);
|
||||
|
||||
let collapsed = collapse_single_newlines_for_streaming(&self.streaming_agent_reasoning_buf);
|
||||
if let Some(idx) = last_agent_reasoning_idx(&self.entries) {
|
||||
let width = self.cached_width.get();
|
||||
let entry = &mut self.entries[idx];
|
||||
entry.cell = HistoryCell::new_agent_reasoning(_config, collapsed);
|
||||
if let HistoryCell::AgentReasoning { view } = &mut entry.cell {
|
||||
drop_trailing_blank_line(&mut view.lines);
|
||||
}
|
||||
if width > 0 {
|
||||
update_entry_height(entry, width);
|
||||
}
|
||||
} else {
|
||||
self.add_agent_reasoning(_config, self.streaming_agent_reasoning_buf.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the most recent AgentMessage cell with the fully accumulated `text`.
|
||||
/// This should be called once the turn is complete so we can render proper markdown.
|
||||
pub fn replace_last_agent_message(&mut self, config: &Config, text: String) {
|
||||
self.streaming_agent_message_buf.clear();
|
||||
if let Some(idx) = last_agent_message_idx(&self.entries) {
|
||||
let width = self.cached_width.get();
|
||||
let entry = &mut self.entries[idx];
|
||||
entry.cell = HistoryCell::new_agent_message(config, text);
|
||||
if width > 0 {
|
||||
update_entry_height(entry, width);
|
||||
}
|
||||
} else {
|
||||
// No existing AgentMessage (shouldn't happen) – append new.
|
||||
self.add_agent_message(config, text);
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the most recent AgentReasoning cell with the fully accumulated `text`.
|
||||
pub fn replace_last_agent_reasoning(&mut self, config: &Config, text: String) {
|
||||
self.streaming_agent_reasoning_buf.clear();
|
||||
if let Some(idx) = last_agent_reasoning_idx(&self.entries) {
|
||||
let width = self.cached_width.get();
|
||||
let entry = &mut self.entries[idx];
|
||||
entry.cell = HistoryCell::new_agent_reasoning(config, text);
|
||||
if width > 0 {
|
||||
update_entry_height(entry, width);
|
||||
}
|
||||
} else {
|
||||
self.add_agent_reasoning(config, text);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_background_event(&mut self, message: String) {
|
||||
self.add_to_history(HistoryCell::new_background_event(message));
|
||||
}
|
||||
@@ -279,7 +344,7 @@ impl ConversationHistoryWidget {
|
||||
|
||||
// Update cached line count.
|
||||
if width > 0 {
|
||||
entry.line_count.set(cell.height(width));
|
||||
update_entry_height(entry, width);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -313,7 +378,7 @@ impl ConversationHistoryWidget {
|
||||
entry.cell = completed;
|
||||
|
||||
if width > 0 {
|
||||
entry.line_count.set(entry.cell.height(width));
|
||||
update_entry_height(entry, width);
|
||||
}
|
||||
|
||||
break;
|
||||
@@ -370,14 +435,12 @@ impl WidgetRef for ConversationHistoryWidget {
|
||||
self.entries.iter().map(|e| e.line_count.get()).sum()
|
||||
};
|
||||
|
||||
// Determine the scroll position. Note the existing value of
|
||||
// `self.scroll_position` could exceed the maximum scroll offset if the
|
||||
// user made the window wider since the last render.
|
||||
let max_scroll = num_lines.saturating_sub(viewport_height);
|
||||
// Determine the scroll position (respect sticky-to-bottom sentinel and clamp).
|
||||
let max_scroll = sticky_offset(num_lines, viewport_height);
|
||||
let scroll_pos = if self.scroll_position == usize::MAX {
|
||||
max_scroll
|
||||
} else {
|
||||
self.scroll_position.min(max_scroll)
|
||||
clamp_scroll_pos(self.scroll_position, max_scroll)
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
@@ -454,7 +517,7 @@ impl WidgetRef for ConversationHistoryWidget {
|
||||
|
||||
{
|
||||
// Choose a thumb color that stands out only when this pane has focus so that the
|
||||
// user’s attention is naturally drawn to the active viewport. When unfocused we show
|
||||
// user's attention is naturally drawn to the active viewport. When unfocused we show
|
||||
// a low-contrast thumb so the scrollbar fades into the background without becoming
|
||||
// invisible.
|
||||
let thumb_style = if self.has_input_focus {
|
||||
@@ -497,3 +560,118 @@ impl WidgetRef for ConversationHistoryWidget {
|
||||
pub(crate) const fn wrap_cfg() -> ratatui::widgets::Wrap {
|
||||
ratatui::widgets::Wrap { trim: false }
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Scrolling helpers (private)
|
||||
// ---------------------------------------------------------------------------
|
||||
#[inline]
|
||||
fn sticky_offset(num_lines: usize, viewport_height: usize) -> usize {
|
||||
num_lines.saturating_sub(viewport_height.max(1))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn clamp_scroll_pos(pos: usize, max_scroll: usize) -> usize {
|
||||
pos.min(max_scroll)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Streaming helpers (private)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Locate the most recent `HistoryCell::AgentMessage` entry.
|
||||
fn last_agent_message_idx(entries: &[Entry]) -> Option<usize> {
|
||||
entries
|
||||
.iter()
|
||||
.rposition(|e| matches!(e.cell, HistoryCell::AgentMessage { .. }))
|
||||
}
|
||||
|
||||
/// Locate the most recent `HistoryCell::AgentReasoning` entry.
|
||||
fn last_agent_reasoning_idx(entries: &[Entry]) -> Option<usize> {
|
||||
entries
|
||||
.iter()
|
||||
.rposition(|e| matches!(e.cell, HistoryCell::AgentReasoning { .. }))
|
||||
}
|
||||
|
||||
/// True if the line is an empty spacer (single empty span).
|
||||
fn is_blank_line(line: &Line<'_>) -> bool {
|
||||
line.spans.len() == 1 && line.spans[0].content.is_empty()
|
||||
}
|
||||
|
||||
/// Ensure that the vector has *at least* one body line after the header.
|
||||
/// A freshly-created AgentMessage/Reasoning cell always has a header + blank line,
|
||||
/// but streaming cells may be created empty; this makes sure we have a target line.
|
||||
#[allow(dead_code)]
|
||||
fn ensure_body_line(lines: &mut Vec<Line<'static>>) {
|
||||
if lines.len() < 2 {
|
||||
lines.push(Line::from(""));
|
||||
}
|
||||
}
|
||||
|
||||
/// Trim a single trailing blank spacer line (but preserve intentional paragraph breaks).
|
||||
fn drop_trailing_blank_line(lines: &mut Vec<Line<'static>>) {
|
||||
if let Some(last) = lines.last() {
|
||||
if is_blank_line(last) {
|
||||
lines.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Append streaming text, honouring embedded newlines.
|
||||
#[allow(dead_code)]
|
||||
fn append_streaming_text_chunks(lines: &mut Vec<Line<'static>>, text: &str) {
|
||||
// NOTE: This helper is now a fallback path only (we eagerly re-render accumulated markdown).
|
||||
// Still, keep behaviour sane: drop trailing spacer, ensure a writable body line, then append.
|
||||
drop_trailing_blank_line(lines);
|
||||
ensure_body_line(lines);
|
||||
if let Some(last_line) = lines.last_mut() {
|
||||
last_line.spans.push(Span::raw(text.to_string()));
|
||||
} else {
|
||||
lines.push(Line::from(text.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Re-measure a mutated entry at `width` columns and update its cached height.
|
||||
fn update_entry_height(entry: &Entry, width: u16) {
|
||||
entry.line_count.set(entry.cell.height(width));
|
||||
}
|
||||
|
||||
/// Collapse *single* newlines in a streaming buffer into spaces so that interim streaming
|
||||
/// renders more closely match final Markdown layout — *except* when we detect fenced code blocks.
|
||||
/// If the accumulated text contains a Markdown code fence (``` or ~~~), we preserve **all**
|
||||
/// newlines verbatim so multi-line code renders correctly while streaming.
|
||||
fn collapse_single_newlines_for_streaming(src: &str) -> String {
|
||||
// Quick fence detection. If we see a code fence marker anywhere in the accumulated text,
|
||||
// skip collapsing entirely so we do not mangle code formatting.
|
||||
if src.contains("```") || src.contains("~~~") {
|
||||
return src.to_string();
|
||||
}
|
||||
|
||||
let mut out = String::with_capacity(src.len());
|
||||
let mut pending_newlines = 0usize;
|
||||
for ch in src.chars() {
|
||||
if ch == '\n' {
|
||||
pending_newlines += 1;
|
||||
continue;
|
||||
}
|
||||
if pending_newlines == 1 {
|
||||
// soft break -> space
|
||||
out.push(' ');
|
||||
} else if pending_newlines > 1 {
|
||||
// preserve paragraph breaks exactly
|
||||
for _ in 0..pending_newlines {
|
||||
out.push('\n');
|
||||
}
|
||||
}
|
||||
pending_newlines = 0;
|
||||
out.push(ch);
|
||||
}
|
||||
// flush tail
|
||||
if pending_newlines == 1 {
|
||||
out.push(' ');
|
||||
} else if pending_newlines > 1 {
|
||||
for _ in 0..pending_newlines {
|
||||
out.push('\n');
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user