mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix: limit output size for exec command in unified exec (#8460)
### Issue [Investigation thread](https://openai.slack.com/archives/C095U48JNL9/p1766426234975789) Github issue: [issue one](https://github.com/openai/codex/issues/8197), [issue two](https://github.com/openai/codex/issues/8358), [issue three](https://github.com/openai/codex/issues/7585) Commonality: working in monorepo and large projects. Multiple threads going and showing sluggishness ending with a crash and grey background. Potential high usage of context. How to reproduce: * Open the whole monorepo with cursor or VSC on the latest extension. * Run ls -R in current CWD. ### Change In unified exec, we do not have max output check to the delta we ouput for shell commands. This causes issue with our VSCE UI which take the delta and construct the shell output.
This commit is contained in:
@@ -10,6 +10,7 @@ use tokio::time::Sleep;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::MAX_EXEC_OUTPUT_DELTAS_PER_CALL;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||
@@ -25,6 +26,14 @@ use super::session::UnifiedExecSession;
|
||||
|
||||
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
|
||||
|
||||
/// Upper bound for a single ExecCommandOutputDelta chunk emitted by unified exec.
|
||||
///
|
||||
/// The unified exec output buffer already caps *retained* output (see
|
||||
/// `UNIFIED_EXEC_OUTPUT_MAX_BYTES`), but we also cap per-event payload size so
|
||||
/// downstream event consumers (especially app-server JSON-RPC) don't have to
|
||||
/// process arbitrarily large delta payloads.
|
||||
const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
|
||||
|
||||
/// Spawn a background task that continuously reads from the PTY, appends to the
|
||||
/// shared transcript, and emits ExecCommandOutputDelta events on UTF‑8
|
||||
/// boundaries.
|
||||
@@ -45,6 +54,7 @@ pub(crate) fn start_streaming_output(
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
let mut pending = Vec::<u8>::new();
|
||||
let mut emitted_deltas: usize = 0;
|
||||
|
||||
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
|
||||
|
||||
@@ -82,6 +92,7 @@ pub(crate) fn start_streaming_output(
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
}
|
||||
@@ -135,6 +146,7 @@ async fn process_chunk(
|
||||
call_id: &str,
|
||||
session_ref: &Arc<Session>,
|
||||
turn_ref: &Arc<TurnContext>,
|
||||
emitted_deltas: &mut usize,
|
||||
chunk: Vec<u8>,
|
||||
) {
|
||||
pending.extend_from_slice(&chunk);
|
||||
@@ -144,6 +156,10 @@ async fn process_chunk(
|
||||
guard.append(&prefix);
|
||||
}
|
||||
|
||||
if *emitted_deltas >= MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
|
||||
continue;
|
||||
}
|
||||
|
||||
let event = ExecCommandOutputDeltaEvent {
|
||||
call_id: call_id.to_string(),
|
||||
stream: ExecOutputStream::Stdout,
|
||||
@@ -152,6 +168,7 @@ async fn process_chunk(
|
||||
session_ref
|
||||
.send_event(turn_ref.as_ref(), EventMsg::ExecCommandOutputDelta(event))
|
||||
.await;
|
||||
*emitted_deltas += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,12 +210,16 @@ pub(crate) async fn emit_exec_end_for_unified_exec(
|
||||
}
|
||||
|
||||
fn split_valid_utf8_prefix(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
|
||||
split_valid_utf8_prefix_with_max(buffer, UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES)
|
||||
}
|
||||
|
||||
fn split_valid_utf8_prefix_with_max(buffer: &mut Vec<u8>, max_bytes: usize) -> Option<Vec<u8>> {
|
||||
if buffer.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let len = buffer.len();
|
||||
let mut split = len;
|
||||
let max_len = buffer.len().min(max_bytes);
|
||||
let mut split = max_len;
|
||||
while split > 0 {
|
||||
if std::str::from_utf8(&buffer[..split]).is_ok() {
|
||||
let prefix = buffer[..split].to_vec();
|
||||
@@ -206,7 +227,7 @@ fn split_valid_utf8_prefix(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
|
||||
return Some(prefix);
|
||||
}
|
||||
|
||||
if len - split > 4 {
|
||||
if max_len - split > 4 {
|
||||
break;
|
||||
}
|
||||
split -= 1;
|
||||
@@ -229,3 +250,42 @@ async fn resolve_aggregated_output(
|
||||
|
||||
String::from_utf8_lossy(&guard.data).to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::split_valid_utf8_prefix_with_max;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn split_valid_utf8_prefix_respects_max_bytes_for_ascii() {
|
||||
let mut buf = b"hello word!".to_vec();
|
||||
|
||||
let first = split_valid_utf8_prefix_with_max(&mut buf, 5).expect("expected prefix");
|
||||
assert_eq!(first, b"hello".to_vec());
|
||||
assert_eq!(buf, b" word!".to_vec());
|
||||
|
||||
let second = split_valid_utf8_prefix_with_max(&mut buf, 5).expect("expected prefix");
|
||||
assert_eq!(second, b" word".to_vec());
|
||||
assert_eq!(buf, b"!".to_vec());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_valid_utf8_prefix_avoids_splitting_utf8_codepoints() {
|
||||
// "é" is 2 bytes in UTF-8. With a max of 3 bytes, we should only emit 1 char (2 bytes).
|
||||
let mut buf = "ééé".as_bytes().to_vec();
|
||||
|
||||
let first = split_valid_utf8_prefix_with_max(&mut buf, 3).expect("expected prefix");
|
||||
assert_eq!(std::str::from_utf8(&first).unwrap(), "é");
|
||||
assert_eq!(buf, "éé".as_bytes().to_vec());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_valid_utf8_prefix_makes_progress_on_invalid_utf8() {
|
||||
let mut buf = vec![0xff, b'a', b'b'];
|
||||
|
||||
let first = split_valid_utf8_prefix_with_max(&mut buf, 2).expect("expected prefix");
|
||||
assert_eq!(first, vec![0xff]);
|
||||
assert_eq!(buf, b"ab".to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user