fix: memory leak issue (#9543)

Co-authored-by: Josh McKinney <joshka@openai.com>
This commit is contained in:
jif-oai
2026-01-20 20:14:14 +00:00
committed by GitHub
parent 714151eb4e
commit 0b3c802a54

View File

@@ -47,6 +47,12 @@ const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; // conventional timeout exit code
const READ_CHUNK_SIZE: usize = 8192; // bytes per read
const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB
/// Hard cap on bytes retained from exec stdout/stderr/aggregated output.
///
/// This mirrors unified exec's output cap so a single runaway command cannot
/// OOM the process by dumping huge amounts of data to stdout/stderr.
const EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
/// Limit the number of ExecCommandOutputDelta events emitted per exec call.
/// Aggregation still collects full output; only the live event stream is capped.
pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000;
@@ -290,18 +296,32 @@ async fn exec_windows_sandbox(
};
let exit_status = synthetic_exit_status(capture.exit_code);
let mut stdout_text = capture.stdout;
if stdout_text.len() > EXEC_OUTPUT_MAX_BYTES {
stdout_text.truncate(EXEC_OUTPUT_MAX_BYTES);
}
let mut stderr_text = capture.stderr;
if stderr_text.len() > EXEC_OUTPUT_MAX_BYTES {
stderr_text.truncate(EXEC_OUTPUT_MAX_BYTES);
}
let stdout = StreamOutput {
text: capture.stdout,
text: stdout_text,
truncated_after_lines: None,
};
let stderr = StreamOutput {
text: capture.stderr,
text: stderr_text,
truncated_after_lines: None,
};
// Best-effort aggregate: stdout then stderr
let mut aggregated = Vec::with_capacity(stdout.text.len() + stderr.text.len());
append_all(&mut aggregated, &stdout.text);
append_all(&mut aggregated, &stderr.text);
// Best-effort aggregate: stdout then stderr (capped).
let mut aggregated = Vec::with_capacity(
stdout
.text
.len()
.saturating_add(stderr.text.len())
.min(EXEC_OUTPUT_MAX_BYTES),
);
append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES);
append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES);
let aggregated_output = StreamOutput {
text: aggregated,
truncated_after_lines: None,
@@ -490,8 +510,13 @@ impl StreamOutput<Vec<u8>> {
}
#[inline]
fn append_all(dst: &mut Vec<u8>, src: &[u8]) {
dst.extend_from_slice(src);
fn append_capped(dst: &mut Vec<u8>, src: &[u8], max_bytes: usize) {
if dst.len() >= max_bytes {
return;
}
let remaining = max_bytes.saturating_sub(dst.len());
let take = remaining.min(src.len());
dst.extend_from_slice(&src[..take]);
}
#[derive(Clone, Debug)]
@@ -584,19 +609,15 @@ async fn consume_truncated_output(
))
})?;
let (agg_tx, agg_rx) = async_channel::unbounded::<Vec<u8>>();
let stdout_handle = tokio::spawn(read_capped(
BufReader::new(stdout_reader),
stdout_stream.clone(),
false,
Some(agg_tx.clone()),
));
let stderr_handle = tokio::spawn(read_capped(
BufReader::new(stderr_reader),
stdout_stream.clone(),
true,
Some(agg_tx.clone()),
));
let (exit_status, timed_out) = tokio::select! {
@@ -662,15 +683,18 @@ async fn consume_truncated_output(
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
)
.await?;
drop(agg_tx);
let mut combined_buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
while let Ok(chunk) = agg_rx.recv().await {
append_all(&mut combined_buf, &chunk);
}
// Best-effort aggregate: stdout then stderr (capped).
let mut aggregated = Vec::with_capacity(
stdout
.text
.len()
.saturating_add(stderr.text.len())
.min(EXEC_OUTPUT_MAX_BYTES),
);
append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES);
append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES * 2);
let aggregated_output = StreamOutput {
text: combined_buf,
text: aggregated,
truncated_after_lines: None,
};
@@ -687,14 +711,11 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
stream: Option<StdoutStream>,
is_stderr: bool,
aggregate_tx: Option<Sender<Vec<u8>>>,
) -> io::Result<StreamOutput<Vec<u8>>> {
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY.min(EXEC_OUTPUT_MAX_BYTES));
let mut tmp = [0u8; READ_CHUNK_SIZE];
let mut emitted_deltas: usize = 0;
// No caps: append all bytes
loop {
let n = reader.read(&mut tmp).await?;
if n == 0 {
@@ -723,11 +744,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
emitted_deltas += 1;
}
if let Some(tx) = &aggregate_tx {
let _ = tx.send(tmp[..n].to_vec()).await;
}
append_all(&mut buf, &tmp[..n]);
append_capped(&mut buf, &tmp[..n], EXEC_OUTPUT_MAX_BYTES);
// Continue reading to EOF to avoid back-pressure
}
@@ -755,6 +772,7 @@ fn synthetic_exit_status(code: i32) -> ExitStatus {
mod tests {
use super::*;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
fn make_exec_output(
exit_code: i32,
@@ -816,6 +834,18 @@ mod tests {
));
}
#[tokio::test]
async fn read_capped_limits_retained_bytes() {
let (mut writer, reader) = tokio::io::duplex(1024);
let bytes = vec![b'a'; EXEC_OUTPUT_MAX_BYTES.saturating_add(128 * 1024)];
tokio::spawn(async move {
writer.write_all(&bytes).await.expect("write");
});
let out = read_capped(reader, None, false).await.expect("read");
assert_eq!(out.text.len(), EXEC_OUTPUT_MAX_BYTES);
}
#[cfg(unix)]
#[test]
fn sandbox_detection_flags_sigsys_exit_code() {