[codex] Make command exec delta tests chunk tolerant (#17999)

## Summary
- Make command/exec output-delta tests accumulate streamed chunks
instead of assuming complete logical output in a single notification.
- Collect stdout and stderr independently so stream interleaving does
not fail the pipe streaming test.

## Why
The command/exec protocol exposes output as deltas, so tests should not
rely on chunk boundaries being stable. A line like `out-start\n` may
arrive split across multiple notifications, and stdout/stderr
notifications may interleave.

## Validation
- `just fmt`
- `git diff --check`
- `cargo test -p codex-app-server suite::v2::command_exec`
This commit is contained in:
Ruslan Nigmatullin
2026-04-15 17:57:02 -07:00
committed by GitHub
parent e2dbe7dfc3
commit f948690fc8

View File

@@ -411,11 +411,14 @@ async fn command_exec_streaming_does_not_buffer_output() -> Result<()> {
})
.await?;
let delta = read_command_exec_delta(&mut mcp).await?;
assert_eq!(delta.process_id, process_id.as_str());
assert_eq!(delta.stream, CommandExecOutputStream::Stdout);
assert_eq!(STANDARD.decode(&delta.delta_base64)?, b"abcde");
assert!(delta.cap_reached);
let output = collect_command_exec_output_until(
CommandExecDeltaReader::Mcp(&mut mcp),
process_id.as_str(),
"capped stdout",
|_output, delta| delta.stream == CommandExecOutputStream::Stdout && delta.cap_reached,
)
.await?;
assert_eq!(output.stdout, "abcde");
let terminate_request_id = mcp
.send_command_exec_terminate_request(CommandExecTerminateParams {
process_id: process_id.clone(),
@@ -471,21 +474,13 @@ async fn command_exec_pipe_streams_output_and_accepts_write() -> Result<()> {
})
.await?;
let first_stdout = read_command_exec_delta(&mut mcp).await?;
let first_stderr = read_command_exec_delta(&mut mcp).await?;
let seen = [first_stdout, first_stderr];
assert!(
seen.iter()
.all(|delta| delta.process_id == process_id.as_str())
);
assert!(seen.iter().any(|delta| {
delta.stream == CommandExecOutputStream::Stdout
&& delta.delta_base64 == STANDARD.encode("out-start\n")
}));
assert!(seen.iter().any(|delta| {
delta.stream == CommandExecOutputStream::Stderr
&& delta.delta_base64 == STANDARD.encode("err-start\n")
}));
wait_for_command_exec_outputs_contains(
&mut mcp,
process_id.as_str(),
"out-start\n",
"err-start\n",
)
.await?;
let write_request_id = mcp
.send_command_exec_write_request(CommandExecWriteParams {
@@ -499,21 +494,13 @@ async fn command_exec_pipe_streams_output_and_accepts_write() -> Result<()> {
.await?;
assert_eq!(write_response.result, serde_json::json!({}));
let next_delta = read_command_exec_delta(&mut mcp).await?;
let final_delta = read_command_exec_delta(&mut mcp).await?;
let seen = [next_delta, final_delta];
assert!(
seen.iter()
.all(|delta| delta.process_id == process_id.as_str())
);
assert!(seen.iter().any(|delta| {
delta.stream == CommandExecOutputStream::Stdout
&& delta.delta_base64 == STANDARD.encode("out:hello\n")
}));
assert!(seen.iter().any(|delta| {
delta.stream == CommandExecOutputStream::Stderr
&& delta.delta_base64 == STANDARD.encode("err:hello\n")
}));
wait_for_command_exec_outputs_contains(
&mut mcp,
process_id.as_str(),
"out:hello\n",
"err:hello\n",
)
.await?;
let response = mcp
.read_stream_until_response_message(RequestId::Integer(command_request_id))
@@ -562,17 +549,13 @@ async fn command_exec_tty_implies_streaming_and_reports_pty_output() -> Result<(
})
.await?;
let started_text = read_command_exec_output_until_contains(
wait_for_command_exec_output_contains(
&mut mcp,
process_id.as_str(),
CommandExecOutputStream::Stdout,
"tty\n",
)
.await?;
assert!(
started_text.contains("tty\n"),
"expected TTY startup output, got {started_text:?}"
);
let write_request_id = mcp
.send_command_exec_write_request(CommandExecWriteParams {
@@ -586,17 +569,13 @@ async fn command_exec_tty_implies_streaming_and_reports_pty_output() -> Result<(
.await?;
assert_eq!(write_response.result, serde_json::json!({}));
let echoed_text = read_command_exec_output_until_contains(
wait_for_command_exec_output_contains(
&mut mcp,
process_id.as_str(),
CommandExecOutputStream::Stdout,
"echo:world\n",
)
.await?;
assert!(
echoed_text.contains("echo:world\n"),
"expected TTY echo output, got {echoed_text:?}"
);
let response = mcp
.read_stream_until_response_message(RequestId::Integer(command_request_id))
@@ -643,17 +622,13 @@ async fn command_exec_tty_supports_initial_size_and_resize() -> Result<()> {
})
.await?;
let started_text = read_command_exec_output_until_contains(
wait_for_command_exec_output_contains(
&mut mcp,
process_id.as_str(),
CommandExecOutputStream::Stdout,
"start:31 101\n",
)
.await?;
assert!(
started_text.contains("start:31 101\n"),
"unexpected initial size output: {started_text:?}"
);
let resize_request_id = mcp
.send_command_exec_resize_request(CommandExecResizeParams {
@@ -681,17 +656,13 @@ async fn command_exec_tty_supports_initial_size_and_resize() -> Result<()> {
.await?;
assert_eq!(write_response.result, serde_json::json!({}));
let resized_text = read_command_exec_output_until_contains(
wait_for_command_exec_output_contains(
&mut mcp,
process_id.as_str(),
CommandExecOutputStream::Stdout,
"after:45 132\n",
)
.await?;
assert!(
resized_text.contains("after:45 132\n"),
"unexpected resized output: {resized_text:?}"
);
let response = mcp
.read_stream_until_response_message(RequestId::Integer(command_request_id))
@@ -744,11 +715,13 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
)
.await?;
let delta = read_command_exec_delta_ws(&mut ws1).await?;
assert_eq!(delta.process_id, "shared-process");
assert_eq!(delta.stream, CommandExecOutputStream::Stdout);
let delta_text = String::from_utf8(STANDARD.decode(&delta.delta_base64)?)?;
assert!(delta_text.contains("ready"));
collect_command_exec_output_until(
CommandExecDeltaReader::Websocket(&mut ws1),
"shared-process",
"websocket ready output",
|output, _delta| output.stdout.contains("ready\n"),
)
.await?;
wait_for_process_marker(&marker, /*should_exist*/ true).await?;
send_request(
@@ -796,31 +769,98 @@ async fn read_command_exec_delta(
decode_delta_notification(notification)
}
async fn read_command_exec_output_until_contains(
async fn wait_for_command_exec_output_contains(
mcp: &mut McpProcess,
process_id: &str,
stream: CommandExecOutputStream,
expected: &str,
) -> Result<String> {
) -> Result<()> {
let stream_name = match stream {
CommandExecOutputStream::Stdout => "stdout",
CommandExecOutputStream::Stderr => "stderr",
};
collect_command_exec_output_until(
CommandExecDeltaReader::Mcp(mcp),
process_id,
format!("{stream_name} containing {expected:?}"),
|output, _delta| match stream {
CommandExecOutputStream::Stdout => output.stdout.contains(expected),
CommandExecOutputStream::Stderr => output.stderr.contains(expected),
},
)
.await?;
Ok(())
}
async fn wait_for_command_exec_outputs_contains(
mcp: &mut McpProcess,
process_id: &str,
stdout_expected: &str,
stderr_expected: &str,
) -> Result<()> {
collect_command_exec_output_until(
CommandExecDeltaReader::Mcp(mcp),
process_id,
format!("stdout containing {stdout_expected:?} and stderr containing {stderr_expected:?}"),
|output, _delta| {
output.stdout.contains(stdout_expected) && output.stderr.contains(stderr_expected)
},
)
.await?;
Ok(())
}
enum CommandExecDeltaReader<'a> {
Mcp(&'a mut McpProcess),
Websocket(&'a mut super::connection_handling_websocket::WsClient),
}
#[derive(Default)]
struct CollectedCommandExecOutput {
stdout: String,
stderr: String,
}
async fn collect_command_exec_output_until(
mut reader: CommandExecDeltaReader<'_>,
process_id: &str,
waiting_for: impl Into<String>,
mut should_stop: impl FnMut(
&CollectedCommandExecOutput,
&CommandExecOutputDeltaNotification,
) -> bool,
) -> Result<CollectedCommandExecOutput> {
let waiting_for = waiting_for.into();
let deadline = Instant::now() + DEFAULT_READ_TIMEOUT;
let mut collected = String::new();
let mut output = CollectedCommandExecOutput::default();
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
let delta = timeout(remaining, read_command_exec_delta(mcp))
.await
.with_context(|| {
format!(
"timed out waiting for {expected:?} in command/exec output for {process_id}; collected {collected:?}"
)
})??;
let delta = timeout(remaining, async {
match &mut reader {
CommandExecDeltaReader::Mcp(mcp) => read_command_exec_delta(mcp).await,
CommandExecDeltaReader::Websocket(stream) => {
read_command_exec_delta_ws(stream).await
}
}
})
.await
.with_context(|| {
format!(
"timed out waiting for {waiting_for} in command/exec output for {process_id}; collected stdout={:?}, stderr={:?}",
output.stdout, output.stderr
)
})??;
assert_eq!(delta.process_id, process_id);
assert_eq!(delta.stream, stream);
let delta_text = String::from_utf8(STANDARD.decode(&delta.delta_base64)?)?;
collected.push_str(&delta_text.replace('\r', ""));
if collected.contains(expected) {
return Ok(collected);
let delta_text = delta_text.replace('\r', "");
match delta.stream {
CommandExecOutputStream::Stdout => output.stdout.push_str(&delta_text),
CommandExecOutputStream::Stderr => output.stderr.push_str(&delta_text),
}
if should_stop(&output, &delta) {
return Ok(output);
}
}
}