mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
Compare commits
28 Commits
pr10349
...
interrupt-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9780b6d550 | ||
|
|
db54493c92 | ||
|
|
aabce31e84 | ||
|
|
78c6f0eb70 | ||
|
|
098462494e | ||
|
|
761ea58759 | ||
|
|
4a7b8aaace | ||
|
|
2e30a84c68 | ||
|
|
52d6655de9 | ||
|
|
3ef1f26ecc | ||
|
|
aad6dc1e4c | ||
|
|
aa4f9dff7a | ||
|
|
3baa5a73ae | ||
|
|
fb8622ac6a | ||
|
|
0b30945eef | ||
|
|
790c5ace10 | ||
|
|
7bcc77bb3c | ||
|
|
80bc428b37 | ||
|
|
9b3e1a8b56 | ||
|
|
666a546adc | ||
|
|
f90d91b1c3 | ||
|
|
b73b211ee5 | ||
|
|
2bb8d37b12 | ||
|
|
79825c08f1 | ||
|
|
4758897e6f | ||
|
|
6655653d77 | ||
|
|
df04fddbc4 | ||
|
|
47725f9fa8 |
@@ -92,6 +92,32 @@ http_headers = { "X-Example-Header" = "example-value" }
|
||||
env_http_headers = { "X-Example-Features": "EXAMPLE_FEATURES" }
|
||||
```
|
||||
|
||||
### Per-provider network tuning
|
||||
|
||||
The following optional settings control retry behaviour and streaming idle timeouts **per model provider**. They must be specified inside the corresponding `[model_providers.<id>]` block in `config.toml`. (Older releases accepted top‑level keys; those are now ignored.)
|
||||
|
||||
Example:
|
||||
|
||||
```toml
|
||||
[model_providers.openai]
|
||||
name = "OpenAI"
|
||||
base_url = "https://api.openai.com/v1"
|
||||
env_key = "OPENAI_API_KEY"
|
||||
# network tuning overrides (all optional; falls back to built‑in defaults)
|
||||
request_max_retries = 4 # retry failed HTTP requests
|
||||
stream_max_retries = 10 # retry dropped SSE streams
|
||||
stream_idle_timeout_ms = 300000 # 5m idle timeout
|
||||
```
|
||||
|
||||
#### request_max_retries
|
||||
How many times Codex will retry a failed HTTP request to the model provider. Defaults to `4`.
|
||||
|
||||
#### stream_max_retries
|
||||
Number of times Codex will attempt to reconnect when a streaming response is interrupted. Defaults to `10`.
|
||||
|
||||
#### stream_idle_timeout_ms
|
||||
How long Codex will wait for activity on a streaming response before treating the connection as lost. Defaults to `300_000` (5 minutes).
|
||||
|
||||
## model_provider
|
||||
|
||||
Identifies which provider to use from the `model_providers` map. Defaults to `"openai"`. You can override the `base_url` for the built-in `openai` provider via the `OPENAI_BASE_URL` environment variable.
|
||||
@@ -444,7 +470,7 @@ Currently, `"vscode"` is the default, though Codex does not verify VS Code is in
|
||||
|
||||
## hide_agent_reasoning
|
||||
|
||||
Codex intermittently emits "reasoning" events that show the model’s internal "thinking" before it produces a final answer. Some users may find these events distracting, especially in CI logs or minimal terminal output.
|
||||
Codex intermittently emits "reasoning" events that show the model's internal "thinking" before it produces a final answer. Some users may find these events distracting, especially in CI logs or minimal terminal output.
|
||||
|
||||
Setting `hide_agent_reasoning` to `true` suppresses these events in **both** the TUI as well as the headless `exec` sub-command:
|
||||
|
||||
|
||||
@@ -21,8 +21,6 @@ use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
|
||||
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
|
||||
use crate::models::ContentItem;
|
||||
use crate::models::ResponseItem;
|
||||
use crate::openai_tools::create_tools_json_for_chat_completions_api;
|
||||
@@ -121,6 +119,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
);
|
||||
|
||||
let mut attempt = 0;
|
||||
let max_retries = provider.request_max_retries();
|
||||
loop {
|
||||
attempt += 1;
|
||||
|
||||
@@ -136,7 +135,11 @@ pub(crate) async fn stream_chat_completions(
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
|
||||
tokio::spawn(process_chat_sse(stream, tx_event));
|
||||
tokio::spawn(process_chat_sse(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
));
|
||||
return Ok(ResponseStream { rx_event });
|
||||
}
|
||||
Ok(res) => {
|
||||
@@ -146,7 +149,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
return Err(CodexErr::UnexpectedStatus(status, body));
|
||||
}
|
||||
|
||||
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
|
||||
if attempt > max_retries {
|
||||
return Err(CodexErr::RetryLimit(status));
|
||||
}
|
||||
|
||||
@@ -162,7 +165,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
|
||||
if attempt > max_retries {
|
||||
return Err(e.into());
|
||||
}
|
||||
let delay = backoff(attempt);
|
||||
@@ -175,14 +178,15 @@ pub(crate) async fn stream_chat_completions(
|
||||
/// Lightweight SSE processor for the Chat Completions streaming format. The
|
||||
/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
|
||||
/// of the pipeline can stay agnostic of the underlying wire format.
|
||||
async fn process_chat_sse<S>(stream: S, tx_event: mpsc::Sender<Result<ResponseEvent>>)
|
||||
where
|
||||
async fn process_chat_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
idle_timeout: Duration,
|
||||
) where
|
||||
S: Stream<Item = Result<Bytes>> + Unpin,
|
||||
{
|
||||
let mut stream = stream.eventsource();
|
||||
|
||||
let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS;
|
||||
|
||||
// State to accumulate a function call across streaming chunks.
|
||||
// OpenAI may split the `arguments` string over multiple `delta` events
|
||||
// until the chunk whose `finish_reason` is `tool_calls` is emitted. We
|
||||
|
||||
@@ -29,8 +29,6 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::flags::CODEX_RS_SSE_FIXTURE;
|
||||
use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
|
||||
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::models::ResponseItem;
|
||||
@@ -109,7 +107,7 @@ impl ModelClient {
|
||||
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
|
||||
// short circuit for tests
|
||||
warn!(path, "Streaming from fixture");
|
||||
return stream_from_fixture(path).await;
|
||||
return stream_from_fixture(path, self.provider.clone()).await;
|
||||
}
|
||||
|
||||
let full_instructions = prompt.get_full_instructions(&self.config.model);
|
||||
@@ -136,6 +134,7 @@ impl ModelClient {
|
||||
);
|
||||
|
||||
let mut attempt = 0;
|
||||
let max_retries = self.provider.request_max_retries();
|
||||
loop {
|
||||
attempt += 1;
|
||||
|
||||
@@ -153,7 +152,11 @@ impl ModelClient {
|
||||
|
||||
// spawn task to process SSE
|
||||
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
|
||||
tokio::spawn(process_sse(stream, tx_event));
|
||||
tokio::spawn(process_sse(
|
||||
stream,
|
||||
tx_event,
|
||||
self.provider.stream_idle_timeout(),
|
||||
));
|
||||
|
||||
return Ok(ResponseStream { rx_event });
|
||||
}
|
||||
@@ -172,7 +175,7 @@ impl ModelClient {
|
||||
return Err(CodexErr::UnexpectedStatus(status, body));
|
||||
}
|
||||
|
||||
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
|
||||
if attempt > max_retries {
|
||||
return Err(CodexErr::RetryLimit(status));
|
||||
}
|
||||
|
||||
@@ -189,7 +192,7 @@ impl ModelClient {
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
|
||||
if attempt > max_retries {
|
||||
return Err(e.into());
|
||||
}
|
||||
let delay = backoff(attempt);
|
||||
@@ -198,6 +201,10 @@ impl ModelClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_provider(&self) -> ModelProviderInfo {
|
||||
self.provider.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
@@ -249,14 +256,16 @@ struct ResponseCompletedOutputTokensDetails {
|
||||
reasoning_tokens: u64,
|
||||
}
|
||||
|
||||
async fn process_sse<S>(stream: S, tx_event: mpsc::Sender<Result<ResponseEvent>>)
|
||||
where
|
||||
async fn process_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
idle_timeout: Duration,
|
||||
) where
|
||||
S: Stream<Item = Result<Bytes>> + Unpin,
|
||||
{
|
||||
let mut stream = stream.eventsource();
|
||||
|
||||
// If the stream stays completely silent for an extended period treat it as disconnected.
|
||||
let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS;
|
||||
// The response id returned from the "complete" message.
|
||||
let mut response_completed: Option<ResponseCompleted> = None;
|
||||
|
||||
@@ -317,7 +326,7 @@ where
|
||||
// duplicated `output` array embedded in the `response.completed`
|
||||
// payload. That produced two concrete issues:
|
||||
// 1. No real‑time streaming – the user only saw output after the
|
||||
// entire turn had finished, which broke the “typing” UX and
|
||||
// entire turn had finished, which broke the "typing" UX and
|
||||
// made long‑running turns look stalled.
|
||||
// 2. Duplicate `function_call_output` items – both the
|
||||
// individual *and* the completed array were forwarded, which
|
||||
@@ -390,7 +399,10 @@ where
|
||||
}
|
||||
|
||||
/// used in tests to stream from a text SSE file
|
||||
async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
|
||||
async fn stream_from_fixture(
|
||||
path: impl AsRef<Path>,
|
||||
provider: ModelProviderInfo,
|
||||
) -> Result<ResponseStream> {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
let f = std::fs::File::open(path.as_ref())?;
|
||||
let lines = std::io::BufReader::new(f).lines();
|
||||
@@ -404,7 +416,11 @@ async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
|
||||
|
||||
let rdr = std::io::Cursor::new(content);
|
||||
let stream = ReaderStream::new(rdr).map_err(CodexErr::Io);
|
||||
tokio::spawn(process_sse(stream, tx_event));
|
||||
tokio::spawn(process_sse(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
));
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
|
||||
@@ -424,7 +440,10 @@ mod tests {
|
||||
|
||||
/// Runs the SSE parser on pre-chunked byte slices and returns every event
|
||||
/// (including any final `Err` from a stream-closure check).
|
||||
async fn collect_events(chunks: &[&[u8]]) -> Vec<Result<ResponseEvent>> {
|
||||
async fn collect_events(
|
||||
chunks: &[&[u8]],
|
||||
provider: ModelProviderInfo,
|
||||
) -> Vec<Result<ResponseEvent>> {
|
||||
let mut builder = IoBuilder::new();
|
||||
for chunk in chunks {
|
||||
builder.read(chunk);
|
||||
@@ -433,7 +452,7 @@ mod tests {
|
||||
let reader = builder.build();
|
||||
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
|
||||
tokio::spawn(process_sse(stream, tx));
|
||||
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
|
||||
|
||||
let mut events = Vec::new();
|
||||
while let Some(ev) = rx.recv().await {
|
||||
@@ -444,7 +463,10 @@ mod tests {
|
||||
|
||||
/// Builds an in-memory SSE stream from JSON fixtures and returns only the
|
||||
/// successfully parsed events (panics on internal channel errors).
|
||||
async fn run_sse(events: Vec<serde_json::Value>) -> Vec<ResponseEvent> {
|
||||
async fn run_sse(
|
||||
events: Vec<serde_json::Value>,
|
||||
provider: ModelProviderInfo,
|
||||
) -> Vec<ResponseEvent> {
|
||||
let mut body = String::new();
|
||||
for e in events {
|
||||
let kind = e
|
||||
@@ -460,7 +482,7 @@ mod tests {
|
||||
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
|
||||
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
|
||||
tokio::spawn(process_sse(stream, tx));
|
||||
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
|
||||
|
||||
let mut out = Vec::new();
|
||||
while let Some(ev) = rx.recv().await {
|
||||
@@ -505,7 +527,25 @@ mod tests {
|
||||
let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n");
|
||||
let sse3 = format!("event: response.completed\ndata: {completed}\n\n");
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()]).await;
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: "https://test.com".to_string(),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
};
|
||||
|
||||
let events = collect_events(
|
||||
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
|
||||
provider,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(events.len(), 3);
|
||||
|
||||
@@ -546,8 +586,21 @@ mod tests {
|
||||
.to_string();
|
||||
|
||||
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: "https://test.com".to_string(),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
};
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes()]).await;
|
||||
let events = collect_events(&[sse1.as_bytes()], provider).await;
|
||||
|
||||
assert_eq!(events.len(), 2);
|
||||
|
||||
@@ -635,7 +688,21 @@ mod tests {
|
||||
let mut evs = vec![case.event];
|
||||
evs.push(completed.clone());
|
||||
|
||||
let out = run_sse(evs).await;
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: "https://test.com".to_string(),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
};
|
||||
|
||||
let out = run_sse(evs, provider).await;
|
||||
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
|
||||
assert!(
|
||||
(case.expect_first)(&out[0]),
|
||||
|
||||
@@ -49,7 +49,6 @@ use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::process_exec_tool_call;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::flags::OPENAI_STREAM_MAX_RETRIES;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_tool_call::handle_mcp_tool_call;
|
||||
use crate::models::ContentItem;
|
||||
@@ -991,6 +990,52 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
|
||||
// ---
|
||||
// Helpers --------------------------------------------------------------------
|
||||
//
|
||||
// When a turn is interrupted before Codex can deliver tool output(s) back to
|
||||
// the model, the next request can fail with a 400 from the OpenAI API:
|
||||
// {"error": {"message": "No tool output found for function call call_XXXXX", ...}}
|
||||
// Historically this manifested as a confusing retry loop ("stream error: 400 …")
|
||||
// because we never learned about the missing `call_id` (the stream was aborted
|
||||
// before we observed the `ResponseEvent::OutputItemDone` that would have let us
|
||||
// record it in `pending_call_ids`).
|
||||
//
|
||||
// To make interruption robust we parse the error body for the offending call id
|
||||
// and add it to `pending_call_ids` so the very next retry can inject a synthetic
|
||||
// `FunctionCallOutput { content: "aborted" }` and satisfy the API contract.
|
||||
// -----------------------------------------------------------------------------
|
||||
fn extract_missing_tool_call_id(body: &str) -> Option<String> {
|
||||
// Try to parse the canonical JSON error shape first.
|
||||
if let Ok(v) = serde_json::from_str::<serde_json::Value>(body) {
|
||||
if let Some(msg) = v
|
||||
.get("error")
|
||||
.and_then(|e| e.get("message"))
|
||||
.and_then(|m| m.as_str())
|
||||
{
|
||||
if let Some(id) = extract_missing_tool_call_id_from_msg(msg) {
|
||||
return Some(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Fallback: scan the raw body.
|
||||
extract_missing_tool_call_id_from_msg(body)
|
||||
}
|
||||
|
||||
fn extract_missing_tool_call_id_from_msg(msg: &str) -> Option<String> {
|
||||
const NEEDLE: &str = "No tool output found for function call";
|
||||
let idx = msg.find(NEEDLE)?;
|
||||
let rest = &msg[idx + NEEDLE.len()..];
|
||||
// Find the beginning of the call id (typically starts with "call_").
|
||||
let start = rest.find("call_")?;
|
||||
let rest = &rest[start..];
|
||||
// Capture valid id chars [A-Za-z0-9_-/]. Hyphen shows up in some IDs; be permissive.
|
||||
let end = rest
|
||||
.find(|c: char| !(c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '/'))
|
||||
.unwrap_or(rest.len());
|
||||
Some(rest[..end].to_string())
|
||||
}
|
||||
|
||||
async fn run_turn(
|
||||
sess: &Session,
|
||||
sub_id: String,
|
||||
@@ -1025,13 +1070,58 @@ async fn run_turn(
|
||||
Ok(output) => return Ok(output),
|
||||
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
|
||||
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
|
||||
Err(e) => {
|
||||
if retries < *OPENAI_STREAM_MAX_RETRIES {
|
||||
Err(CodexErr::UnexpectedStatus(status, body)) => {
|
||||
// Detect the specific 400 "No tool output found for function call ..." error that
|
||||
// occurs when a user interrupted before Codex could answer a tool call.
|
||||
if status == reqwest::StatusCode::BAD_REQUEST {
|
||||
if let Some(call_id) = extract_missing_tool_call_id(&body) {
|
||||
{
|
||||
let mut state = sess.state.lock().unwrap();
|
||||
state.pending_call_ids.insert(call_id.clone());
|
||||
}
|
||||
// Surface a friendlier background event so users understand the recovery.
|
||||
sess
|
||||
.notify_background_event(
|
||||
&sub_id,
|
||||
format!(
|
||||
"previous turn interrupted before responding to tool {call_id}; sending aborted output and retrying…",
|
||||
),
|
||||
)
|
||||
.await;
|
||||
// Immediately retry the turn without consuming a provider stream retry budget.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Fall through to generic retry path if we could not auto‑recover.
|
||||
let e = CodexErr::UnexpectedStatus(status, body);
|
||||
// Use the configured provider-specific stream retry budget.
|
||||
let max_retries = sess.client.get_provider().stream_max_retries();
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
let delay = backoff(retries);
|
||||
warn!(
|
||||
"stream disconnected - retrying turn ({retries}/{} in {delay:?})...",
|
||||
*OPENAI_STREAM_MAX_RETRIES
|
||||
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
sess.notify_background_event(
|
||||
&sub_id,
|
||||
format!(
|
||||
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…",
|
||||
),
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(delay).await;
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Use the configured provider-specific stream retry budget.
|
||||
let max_retries = sess.client.get_provider().stream_max_retries();
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
let delay = backoff(retries);
|
||||
warn!(
|
||||
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
@@ -1040,8 +1130,7 @@ async fn run_turn(
|
||||
sess.notify_background_event(
|
||||
&sub_id,
|
||||
format!(
|
||||
"stream error: {e}; retrying {retries}/{} in {:?}…",
|
||||
*OPENAI_STREAM_MAX_RETRIES, delay
|
||||
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…",
|
||||
),
|
||||
)
|
||||
.await;
|
||||
@@ -1123,7 +1212,28 @@ async fn try_run_turn(
|
||||
let mut stream = sess.client.clone().stream(&prompt).await?;
|
||||
|
||||
let mut output = Vec::new();
|
||||
while let Some(Ok(event)) = stream.next().await {
|
||||
loop {
|
||||
// Poll the next item from the model stream. We must inspect *both* Ok and Err
|
||||
// cases so that transient stream failures (e.g., dropped SSE connection before
|
||||
// `response.completed`) bubble up and trigger the caller's retry logic.
|
||||
let event = stream.next().await;
|
||||
let Some(event) = event else {
|
||||
// Channel closed without yielding a final Completed event or explicit error.
|
||||
// Treat as a disconnected stream so the caller can retry.
|
||||
return Err(CodexErr::Stream(
|
||||
"stream closed before response.completed".into(),
|
||||
));
|
||||
};
|
||||
|
||||
let event = match event {
|
||||
Ok(ev) => ev,
|
||||
Err(e) => {
|
||||
// Propagate the underlying stream error to the caller (run_turn), which
|
||||
// will apply the configured `stream_max_retries` policy.
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
match event {
|
||||
ResponseEvent::Created => {
|
||||
let mut state = sess.state.lock().unwrap();
|
||||
@@ -1164,7 +1274,7 @@ async fn try_run_turn(
|
||||
|
||||
let mut state = sess.state.lock().unwrap();
|
||||
state.previous_response_id = Some(response_id);
|
||||
break;
|
||||
return Ok(output);
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
let event = Event {
|
||||
@@ -1182,7 +1292,6 @@ async fn try_run_turn(
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
async fn handle_response_item(
|
||||
|
||||
@@ -682,6 +682,9 @@ name = "OpenAI using Chat Completions"
|
||||
base_url = "https://api.openai.com/v1"
|
||||
env_key = "OPENAI_API_KEY"
|
||||
wire_api = "chat"
|
||||
request_max_retries = 4 # retry failed HTTP requests
|
||||
stream_max_retries = 10 # retry dropped SSE streams
|
||||
stream_idle_timeout_ms = 300000 # 5m idle timeout
|
||||
|
||||
[profiles.o3]
|
||||
model = "o3"
|
||||
@@ -722,6 +725,9 @@ disable_response_storage = true
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(4),
|
||||
stream_max_retries: Some(10),
|
||||
stream_idle_timeout_ms: Some(300_000),
|
||||
};
|
||||
let model_provider_map = {
|
||||
let mut model_provider_map = built_in_model_providers();
|
||||
|
||||
@@ -11,14 +11,6 @@ env_flags! {
|
||||
pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
|
||||
value.parse().map(Duration::from_millis)
|
||||
};
|
||||
pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4;
|
||||
pub OPENAI_STREAM_MAX_RETRIES: u64 = 10;
|
||||
|
||||
// We generally don't want to disconnect; this updates the timeout to be five minutes
|
||||
// which matches the upstream typescript codex impl.
|
||||
pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
|
||||
value.parse().map(Duration::from_millis)
|
||||
};
|
||||
|
||||
/// Fixture path for offline tests (see client.rs).
|
||||
pub CODEX_RS_SSE_FIXTURE: Option<&str> = None;
|
||||
|
||||
@@ -9,6 +9,7 @@ use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::env::VarError;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::error::EnvVarError;
|
||||
use crate::openai_api_key::get_openai_api_key;
|
||||
@@ -16,6 +17,9 @@ use crate::openai_api_key::get_openai_api_key;
|
||||
/// Value for the `OpenAI-Originator` header that is sent with requests to
|
||||
/// OpenAI.
|
||||
const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs";
|
||||
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
|
||||
const DEFAULT_STREAM_MAX_RETRIES: u64 = 10;
|
||||
const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4;
|
||||
|
||||
/// Wire protocol that the provider speaks. Most third-party services only
|
||||
/// implement the classic OpenAI Chat Completions JSON schema, whereas OpenAI
|
||||
@@ -26,7 +30,7 @@ const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs";
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum WireApi {
|
||||
/// The experimental “Responses” API exposed by OpenAI at `/v1/responses`.
|
||||
/// The experimental "Responses" API exposed by OpenAI at `/v1/responses`.
|
||||
Responses,
|
||||
|
||||
/// Regular Chat Completions compatible with `/v1/chat/completions`.
|
||||
@@ -64,6 +68,16 @@ pub struct ModelProviderInfo {
|
||||
/// value should be used. If the environment variable is not set, or the
|
||||
/// value is empty, the header will not be included in the request.
|
||||
pub env_http_headers: Option<HashMap<String, String>>,
|
||||
|
||||
/// Maximum number of times to retry a failed HTTP request to this provider.
|
||||
pub request_max_retries: Option<u64>,
|
||||
|
||||
/// Number of times to retry reconnecting a dropped streaming response before failing.
|
||||
pub stream_max_retries: Option<u64>,
|
||||
|
||||
/// Idle timeout (in milliseconds) to wait for activity on a streaming response before treating
|
||||
/// the connection as lost.
|
||||
pub stream_idle_timeout_ms: Option<u64>,
|
||||
}
|
||||
|
||||
impl ModelProviderInfo {
|
||||
@@ -161,6 +175,25 @@ impl ModelProviderInfo {
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Effective maximum number of request retries for this provider.
|
||||
pub fn request_max_retries(&self) -> u64 {
|
||||
self.request_max_retries
|
||||
.unwrap_or(DEFAULT_REQUEST_MAX_RETRIES)
|
||||
}
|
||||
|
||||
/// Effective maximum number of stream reconnection attempts for this provider.
|
||||
pub fn stream_max_retries(&self) -> u64 {
|
||||
self.stream_max_retries
|
||||
.unwrap_or(DEFAULT_STREAM_MAX_RETRIES)
|
||||
}
|
||||
|
||||
/// Effective idle timeout for streaming responses.
|
||||
pub fn stream_idle_timeout(&self) -> Duration {
|
||||
self.stream_idle_timeout_ms
|
||||
.map(Duration::from_millis)
|
||||
.unwrap_or(Duration::from_millis(DEFAULT_STREAM_IDLE_TIMEOUT_MS))
|
||||
}
|
||||
}
|
||||
|
||||
/// Built-in default provider list.
|
||||
@@ -205,6 +238,10 @@ pub fn built_in_model_providers() -> HashMap<String, ModelProviderInfo> {
|
||||
.into_iter()
|
||||
.collect(),
|
||||
),
|
||||
// Use global defaults for retry/timeout unless overridden in config.toml.
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
},
|
||||
),
|
||||
]
|
||||
@@ -234,6 +271,9 @@ base_url = "http://localhost:11434/v1"
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -259,6 +299,9 @@ query_params = { api-version = "2025-04-01-preview" }
|
||||
}),
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -287,6 +330,9 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
env_http_headers: Some(maplit::hashmap! {
|
||||
"X-Example-Env-Header".to_string() => "EXAMPLE_ENV_VAR".to_string(),
|
||||
}),
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
|
||||
@@ -173,7 +173,7 @@ async fn integration_creates_and_checks_session_file() {
|
||||
// 5. Sessions are written asynchronously; wait briefly for the directory to appear.
|
||||
let sessions_dir = home.path().join("sessions");
|
||||
let start = Instant::now();
|
||||
while !sessions_dir.exists() && start.elapsed() < Duration::from_secs(2) {
|
||||
while !sessions_dir.exists() && start.elapsed() < Duration::from_secs(3) {
|
||||
std::thread::sleep(Duration::from_millis(50));
|
||||
}
|
||||
|
||||
|
||||
@@ -45,22 +45,10 @@ async fn spawn_codex() -> Result<Codex, CodexErr> {
|
||||
"OPENAI_API_KEY must be set for live tests"
|
||||
);
|
||||
|
||||
// Environment tweaks to keep the tests snappy and inexpensive while still
|
||||
// exercising retry/robustness logic.
|
||||
//
|
||||
// NOTE: Starting with the 2024 edition `std::env::set_var` is `unsafe`
|
||||
// because changing the process environment races with any other threads
|
||||
// that might be performing environment look-ups at the same time.
|
||||
// Restrict the unsafety to this tiny block that happens at the very
|
||||
// beginning of the test, before we spawn any background tasks that could
|
||||
// observe the environment.
|
||||
unsafe {
|
||||
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "2");
|
||||
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2");
|
||||
}
|
||||
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let config = load_default_config_for_test(&codex_home);
|
||||
let mut config = load_default_config_for_test(&codex_home);
|
||||
config.model_provider.request_max_retries = Some(2);
|
||||
config.model_provider.stream_max_retries = Some(2);
|
||||
let (agent, _init_id) = Codex::spawn(config, std::sync::Arc::new(Notify::new())).await?;
|
||||
|
||||
Ok(agent)
|
||||
@@ -79,7 +67,7 @@ async fn live_streaming_and_prev_id_reset() {
|
||||
|
||||
let codex = spawn_codex().await.unwrap();
|
||||
|
||||
// ---------- Task 1 ----------
|
||||
// ---------- Task 1 ----------
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
@@ -113,7 +101,7 @@ async fn live_streaming_and_prev_id_reset() {
|
||||
"Agent did not stream any AgentMessage before TaskComplete"
|
||||
);
|
||||
|
||||
// ---------- Task 2 (same session) ----------
|
||||
// ---------- Task 2 (same session) ----------
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
|
||||
@@ -88,13 +88,8 @@ async fn keeps_previous_response_id_between_tasks() {
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
// Environment
|
||||
// Update environment – `set_var` is `unsafe` starting with the 2024
|
||||
// edition so we group the calls into a single `unsafe { … }` block.
|
||||
unsafe {
|
||||
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
|
||||
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0");
|
||||
}
|
||||
// Configure retry behavior explicitly to avoid mutating process-wide
|
||||
// environment variables.
|
||||
let model_provider = ModelProviderInfo {
|
||||
name: "openai".into(),
|
||||
base_url: format!("{}/v1", server.uri()),
|
||||
@@ -107,6 +102,10 @@ async fn keeps_previous_response_id_between_tasks() {
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
// disable retries so we don't get duplicate calls in this test
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: None,
|
||||
};
|
||||
|
||||
// Init session
|
||||
|
||||
@@ -32,8 +32,6 @@ fn sse_completed(id: &str) -> String {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// this test is flaky (has race conditions), so we ignore it for now
|
||||
#[ignore]
|
||||
async fn retries_on_early_close() {
|
||||
#![allow(clippy::unwrap_used)]
|
||||
|
||||
@@ -72,19 +70,8 @@ async fn retries_on_early_close() {
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
// Environment
|
||||
//
|
||||
// As of Rust 2024 `std::env::set_var` has been made `unsafe` because
|
||||
// mutating the process environment is inherently racy when other threads
|
||||
// are running. We therefore have to wrap every call in an explicit
|
||||
// `unsafe` block. These are limited to the test-setup section so the
|
||||
// scope is very small and clearly delineated.
|
||||
|
||||
unsafe {
|
||||
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
|
||||
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1");
|
||||
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
|
||||
}
|
||||
// Configure retry behavior explicitly to avoid mutating process-wide
|
||||
// environment variables.
|
||||
|
||||
let model_provider = ModelProviderInfo {
|
||||
name: "openai".into(),
|
||||
@@ -98,6 +85,10 @@ async fn retries_on_early_close() {
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
// exercise retry path: first attempt yields incomplete stream, so allow 1 retry
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(1),
|
||||
stream_idle_timeout_ms: Some(2000),
|
||||
};
|
||||
|
||||
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
|
||||
|
||||
@@ -53,6 +53,7 @@ pub(crate) struct ChatWidget<'a> {
|
||||
token_usage: TokenUsage,
|
||||
reasoning_buffer: String,
|
||||
answer_buffer: String,
|
||||
active_task_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
@@ -141,6 +142,7 @@ impl ChatWidget<'_> {
|
||||
token_usage: TokenUsage::default(),
|
||||
reasoning_buffer: String::new(),
|
||||
answer_buffer: String::new(),
|
||||
active_task_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,10 +224,30 @@ impl ChatWidget<'_> {
|
||||
self.conversation_history.add_user_message(text);
|
||||
}
|
||||
self.conversation_history.scroll_to_bottom();
|
||||
|
||||
// IMPORTANT: Starting a *new* user turn. Clear any partially streamed
|
||||
// answer from a previous turn (e.g., one that was interrupted) so that
|
||||
// the next AgentMessageDelta spawns a fresh agent message cell instead
|
||||
// of overwriting the last one.
|
||||
self.answer_buffer.clear();
|
||||
self.reasoning_buffer.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn handle_codex_event(&mut self, event: Event) {
|
||||
let Event { id, msg } = event;
|
||||
// Retain the event ID so we can refer to it after destructuring.
|
||||
let event_id = event.id.clone();
|
||||
let Event { id: _, msg } = event;
|
||||
|
||||
// When we are in the middle of a task (active_task_id is Some) we drop
|
||||
// streaming text/reasoning events for *other* task IDs. This prevents
|
||||
// late tokens from an interrupted run from bleeding into the current
|
||||
// answer.
|
||||
let should_drop_streaming = self
|
||||
.active_task_id
|
||||
.as_ref()
|
||||
.map(|active| active != &event_id)
|
||||
.unwrap_or(false);
|
||||
|
||||
match msg {
|
||||
EventMsg::SessionConfigured(event) => {
|
||||
// Record session information at the top of the conversation.
|
||||
@@ -246,6 +268,9 @@ impl ChatWidget<'_> {
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
// if the answer buffer is empty, this means we haven't received any
|
||||
// delta. Thus, we need to print the message as a new answer.
|
||||
if self.answer_buffer.is_empty() {
|
||||
@@ -259,6 +284,9 @@ impl ChatWidget<'_> {
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
if self.answer_buffer.is_empty() {
|
||||
self.conversation_history
|
||||
.add_agent_message(&self.config, "".to_string());
|
||||
@@ -269,6 +297,9 @@ impl ChatWidget<'_> {
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
if self.reasoning_buffer.is_empty() {
|
||||
self.conversation_history
|
||||
.add_agent_reasoning(&self.config, "".to_string());
|
||||
@@ -279,6 +310,9 @@ impl ChatWidget<'_> {
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
// if the reasoning buffer is empty, this means we haven't received any
|
||||
// delta. Thus, we need to print the message as a new reasoning.
|
||||
if self.reasoning_buffer.is_empty() {
|
||||
@@ -293,6 +327,10 @@ impl ChatWidget<'_> {
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::TaskStarted => {
|
||||
// New task has begun – update state and clear any stale buffers.
|
||||
self.active_task_id = Some(event_id);
|
||||
self.answer_buffer.clear();
|
||||
self.reasoning_buffer.clear();
|
||||
self.bottom_pane.clear_ctrl_c_quit_hint();
|
||||
self.bottom_pane.set_task_running(true);
|
||||
self.request_redraw();
|
||||
@@ -300,6 +338,10 @@ impl ChatWidget<'_> {
|
||||
EventMsg::TaskComplete(TaskCompleteEvent {
|
||||
last_agent_message: _,
|
||||
}) => {
|
||||
// Task finished; clear active_task_id so that subsequent events are processed.
|
||||
if self.active_task_id.as_ref() == Some(&event_id) {
|
||||
self.active_task_id = None;
|
||||
}
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.request_redraw();
|
||||
}
|
||||
@@ -309,16 +351,25 @@ impl ChatWidget<'_> {
|
||||
.set_token_usage(self.token_usage.clone(), self.config.model_context_window);
|
||||
}
|
||||
EventMsg::Error(ErrorEvent { message }) => {
|
||||
// Error events always get surfaced (even for stale task IDs) so that the user sees
|
||||
// why a run stopped. However, only clear the running indicator if this is the
|
||||
// active task.
|
||||
if self.active_task_id.as_ref() == Some(&event_id) {
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.active_task_id = None;
|
||||
}
|
||||
self.conversation_history.add_error(message);
|
||||
self.bottom_pane.set_task_running(false);
|
||||
}
|
||||
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
}) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
let request = ApprovalRequest::Exec {
|
||||
id,
|
||||
id: event_id,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
@@ -330,6 +381,9 @@ impl ChatWidget<'_> {
|
||||
reason,
|
||||
grant_root,
|
||||
}) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
// ------------------------------------------------------------------
|
||||
// Before we even prompt the user for approval we surface the patch
|
||||
// summary in the main conversation so that the dialog appears in a
|
||||
@@ -348,7 +402,7 @@ impl ChatWidget<'_> {
|
||||
|
||||
// Now surface the approval request in the BottomPane as before.
|
||||
let request = ApprovalRequest::ApplyPatch {
|
||||
id,
|
||||
id: event_id,
|
||||
reason,
|
||||
grant_root,
|
||||
};
|
||||
@@ -360,6 +414,9 @@ impl ChatWidget<'_> {
|
||||
command,
|
||||
cwd: _,
|
||||
}) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
self.conversation_history
|
||||
.add_active_exec_command(call_id, command);
|
||||
self.request_redraw();
|
||||
@@ -369,6 +426,9 @@ impl ChatWidget<'_> {
|
||||
auto_approved,
|
||||
changes,
|
||||
}) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
// Even when a patch is auto‑approved we still display the
|
||||
// summary so the user can follow along.
|
||||
self.conversation_history
|
||||
@@ -384,6 +444,9 @@ impl ChatWidget<'_> {
|
||||
stdout,
|
||||
stderr,
|
||||
}) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
self.conversation_history
|
||||
.record_completed_exec_command(call_id, stdout, stderr, exit_code);
|
||||
self.request_redraw();
|
||||
@@ -394,11 +457,17 @@ impl ChatWidget<'_> {
|
||||
tool,
|
||||
arguments,
|
||||
}) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
self.conversation_history
|
||||
.add_active_mcp_tool_call(call_id, server, tool, arguments);
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::McpToolCallEnd(mcp_tool_call_end_event) => {
|
||||
if should_drop_streaming {
|
||||
return;
|
||||
}
|
||||
let success = mcp_tool_call_end_event.is_success();
|
||||
let McpToolCallEndEvent { call_id, result } = mcp_tool_call_end_event;
|
||||
self.conversation_history
|
||||
|
||||
Reference in New Issue
Block a user