diff --git a/codex-rs/core/src/exec_remote.rs b/codex-rs/core/src/exec_remote.rs index 31a67b9d46..f381458afe 100644 --- a/codex-rs/core/src/exec_remote.rs +++ b/codex-rs/core/src/exec_remote.rs @@ -5,7 +5,6 @@ use codex_exec_server::Environment as ExecutorEnvironment; use codex_exec_server::ExecOutputStream as ExecutorOutputStream; use codex_exec_server::ExecParams as ExecutorExecParams; use codex_exec_server::ExecProcess; -use codex_exec_server::ProcessOutputChunk as ExecutorProcessOutputChunk; use codex_exec_server::ReadParams as ExecutorReadParams; use uuid::Uuid; @@ -90,7 +89,7 @@ pub(crate) async fn execute_exec_request_via_environment( finalize_exec_result(raw_output_result, sandbox, duration) } -async fn consume_exec_server_output( +pub(crate) async fn consume_exec_server_output( executor: std::sync::Arc, process_id: &str, expiration: ExecExpiration, @@ -142,15 +141,23 @@ async fn consume_exec_server_output( }; after_seq = Some(read_response.next_seq.saturating_sub(1)); - append_exec_server_chunks( - read_response.chunks, - &mut stdout, - &mut stderr, - retained_bytes_cap, - stdout_stream.as_ref(), - &mut emitted_deltas, - ) - .await; + for chunk in read_response.chunks { + let bytes = chunk.chunk.into_inner(); + let is_stderr = chunk.stream == ExecutorOutputStream::Stderr; + if emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL { + emit_output_delta(stdout_stream.as_ref(), is_stderr, bytes.clone()).await; + emitted_deltas += 1; + } + + match chunk.stream { + ExecutorOutputStream::Stderr => { + append_with_cap(&mut stderr, &bytes, retained_bytes_cap) + } + ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => { + append_with_cap(&mut stdout, &bytes, retained_bytes_cap) + } + } + } if read_response.exited { exit_status = Some(synthetic_exit_status(read_response.exit_code.unwrap_or(-1))); @@ -168,15 +175,23 @@ async fn consume_exec_server_output( break; } after_seq = Some(drain_response.next_seq.saturating_sub(1)); - append_exec_server_chunks( - drain_response.chunks, - &mut stdout, - &mut stderr, - retained_bytes_cap, - stdout_stream.as_ref(), - &mut emitted_deltas, - ) - .await; + for chunk in drain_response.chunks { + let bytes = chunk.chunk.into_inner(); + let is_stderr = chunk.stream == ExecutorOutputStream::Stderr; + if emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL { + emit_output_delta(stdout_stream.as_ref(), is_stderr, bytes.clone()).await; + emitted_deltas += 1; + } + + match chunk.stream { + ExecutorOutputStream::Stderr => { + append_with_cap(&mut stderr, &bytes, retained_bytes_cap) + } + ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => { + append_with_cap(&mut stdout, &bytes, retained_bytes_cap) + } + } + } } break; } @@ -202,31 +217,6 @@ async fn consume_exec_server_output( }) } -async fn append_exec_server_chunks( - chunks: Vec, - stdout: &mut Vec, - stderr: &mut Vec, - retained_bytes_cap: Option, - stdout_stream: Option<&StdoutStream>, - emitted_deltas: &mut usize, -) { - for chunk in chunks { - let bytes = chunk.chunk.into_inner(); - let is_stderr = chunk.stream == ExecutorOutputStream::Stderr; - if *emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL { - emit_output_delta(stdout_stream, is_stderr, bytes.clone()).await; - *emitted_deltas += 1; - } - - match chunk.stream { - ExecutorOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap), - ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => { - append_with_cap(stdout, &bytes, retained_bytes_cap) - } - } - } -} - fn append_with_cap(dst: &mut Vec, src: &[u8], max_bytes: Option) { if let Some(max_bytes) = max_bytes { append_capped(dst, src, max_bytes); diff --git a/codex-rs/core/src/exec_tests.rs b/codex-rs/core/src/exec_tests.rs index d63063bab0..b3bc358514 100644 --- a/codex-rs/core/src/exec_tests.rs +++ b/codex-rs/core/src/exec_tests.rs @@ -1,3 +1,4 @@ +use super::exec_remote::consume_exec_server_output; use super::*; use codex_exec_server::Environment as ExecutorEnvironment; use codex_protocol::config_types::WindowsSandboxLevel; diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 68ff9f6549..46480ebb8b 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -48,6 +48,7 @@ pub use protocol::ExecParams; pub use protocol::ExecResponse; pub use protocol::InitializeParams; pub use protocol::InitializeResponse; +pub use protocol::ProcessOutputChunk; pub use protocol::ReadParams; pub use protocol::ReadResponse; pub use protocol::TerminateParams;