Cap remote shell output deltas

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-20 17:31:43 -07:00
parent 275be5a38d
commit f5ffc0373d

View File

@@ -37,6 +37,7 @@ use codex_exec_server::Environment as ExecutorEnvironment;
use codex_exec_server::ExecOutputStream as ExecutorOutputStream;
use codex_exec_server::ExecParams as ExecutorExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ProcessOutputChunk as ExecutorProcessOutputChunk;
use codex_exec_server::ReadParams as ExecutorReadParams;
use codex_network_proxy::NetworkProxy;
#[cfg(any(target_os = "windows", test))]
@@ -1126,6 +1127,7 @@ async fn consume_exec_server_output(
let mut after_seq = None;
let mut exit_status = None;
let mut timed_out = false;
let mut emitted_deltas = 0usize;
let expiration_wait = async {
if capture_policy.uses_expiration() {
@@ -1160,19 +1162,15 @@ async fn consume_exec_server_output(
};
after_seq = Some(read_response.next_seq.saturating_sub(1));
for chunk in read_response.chunks {
let bytes = chunk.chunk.into_inner();
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
emit_output_delta(stdout_stream.as_ref(), is_stderr, bytes.clone()).await;
match chunk.stream {
ExecutorOutputStream::Stderr => {
append_with_cap(&mut stderr, &bytes, retained_bytes_cap)
}
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
append_with_cap(&mut stdout, &bytes, retained_bytes_cap)
}
}
}
append_exec_server_chunks(
read_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
stdout_stream.as_ref(),
&mut emitted_deltas,
)
.await;
if read_response.exited {
exit_status = Some(synthetic_exit_status(read_response.exit_code.unwrap_or(-1)));
@@ -1190,19 +1188,15 @@ async fn consume_exec_server_output(
break;
}
after_seq = Some(drain_response.next_seq.saturating_sub(1));
for chunk in drain_response.chunks {
let bytes = chunk.chunk.into_inner();
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
emit_output_delta(stdout_stream.as_ref(), is_stderr, bytes.clone()).await;
match chunk.stream {
ExecutorOutputStream::Stderr => {
append_with_cap(&mut stderr, &bytes, retained_bytes_cap)
}
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
append_with_cap(&mut stdout, &bytes, retained_bytes_cap)
}
}
}
append_exec_server_chunks(
drain_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
stdout_stream.as_ref(),
&mut emitted_deltas,
)
.await;
}
break;
}
@@ -1228,6 +1222,31 @@ async fn consume_exec_server_output(
})
}
async fn append_exec_server_chunks(
chunks: Vec<ExecutorProcessOutputChunk>,
stdout: &mut Vec<u8>,
stderr: &mut Vec<u8>,
retained_bytes_cap: Option<usize>,
stdout_stream: Option<&StdoutStream>,
emitted_deltas: &mut usize,
) {
for chunk in chunks {
let bytes = chunk.chunk.into_inner();
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
if *emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
emit_output_delta(stdout_stream, is_stderr, bytes.clone()).await;
*emitted_deltas += 1;
}
match chunk.stream {
ExecutorOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap),
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
append_with_cap(stdout, &bytes, retained_bytes_cap)
}
}
}
}
async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
stream: Option<StdoutStream>,