chore: rework unified exec events (#7775)

This commit is contained in:
jif-oai
2025-12-10 10:30:38 +00:00
committed by GitHub
parent d1c5db5796
commit 0ad54982ae
20 changed files with 876 additions and 174 deletions

View File

@@ -648,16 +648,16 @@ async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
})
.await?;
let delta = wait_for_event_match(&codex, |msg| match msg {
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == call_id => Some(ev.clone()),
let event = wait_for_event_match(&codex, |msg| match msg {
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => Some(ev.clone()),
_ => None,
})
.await;
let text = String::from_utf8_lossy(&delta.chunk).to_string();
let text = event.stdout;
assert!(
text.contains("HELLO-UEXEC"),
"delta chunk missing expected text: {text:?}"
"delta chunk missing expected text: {text:?}",
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
@@ -665,7 +665,116 @@ async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
async fn unified_exec_full_lifecycle_with_background_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let call_id = "uexec-full-lifecycle";
let args = json!({
"cmd": "printf 'HELLO-FULL-LIFECYCLE'",
"yield_time_ms": 50,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "finished"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "exercise full unified exec lifecycle".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;
let mut begin_event = None;
let mut end_event = None;
let mut saw_delta_with_marker = 0;
loop {
let msg = wait_for_event(&codex, |_| true).await;
match msg {
EventMsg::ExecCommandBegin(ev) if ev.call_id == call_id => begin_event = Some(ev),
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == call_id => {
let text = String::from_utf8_lossy(&ev.chunk);
if text.contains("HELLO-FULL-LIFECYCLE") {
saw_delta_with_marker += 1;
}
}
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => {
assert!(
end_event.is_none(),
"expected a single ExecCommandEnd event for this call id"
);
end_event = Some(ev);
}
EventMsg::TaskComplete(_) => break,
_ => {}
}
}
let begin_event = begin_event.expect("expected ExecCommandBegin event");
assert_eq!(begin_event.call_id, call_id);
assert!(
begin_event.process_id.is_some(),
"begin event should include a process_id for a long-lived session"
);
assert_eq!(
saw_delta_with_marker, 0,
"no ExecCommandOutputDelta should be sent for early exit commands"
);
let end_event = end_event.expect("expected ExecCommandEnd event");
assert_eq!(end_event.call_id, call_id);
assert_eq!(end_event.exit_code, 0);
assert!(
end_event.process_id.is_some(),
"end event should include process_id emitted by background watcher"
);
assert!(
end_event.aggregated_output.contains("HELLO-FULL-LIFECYCLE"),
"aggregated_output should contain the full PTY transcript; got {:?}",
end_event.aggregated_output
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_terminal_interaction_for_write_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
@@ -740,27 +849,210 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
})
.await?;
// Expect a delta event corresponding to the write_stdin call.
let delta = wait_for_event_match(&codex, |msg| match msg {
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == open_call_id => {
let text = String::from_utf8_lossy(&ev.chunk);
if text.contains("WSTDIN-MARK") {
Some(ev.clone())
} else {
None
}
}
_ => None,
})
.await;
let mut terminal_interaction = None;
let text = String::from_utf8_lossy(&delta.chunk).to_string();
loop {
let msg = wait_for_event(&codex, |_| true).await;
match msg {
EventMsg::TerminalInteraction(ev) if ev.call_id == open_call_id => {
terminal_interaction = Some(ev);
}
EventMsg::TaskComplete(_) => break,
_ => {}
}
}
let delta = terminal_interaction.expect("expected TerminalInteraction event");
assert_eq!(delta.process_id, "1000");
let expected_stdin = stdin_args
.get("chars")
.and_then(Value::as_str)
.expect("stdin chars");
assert_eq!(delta.stdin, expected_stdin);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_terminal_interaction_captures_delayed_output() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let open_call_id = "uexec-delayed-open";
let open_args = json!({
"cmd": "sleep 5 && echo MARKER1 && sleep 5 && echo MARKER2",
"yield_time_ms": 10,
});
// Poll stdin three times: first for no output, second after the first marker,
// and a final long poll to capture the second marker.
let first_poll_call_id = "uexec-delayed-poll-1";
let first_poll_args = json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 10,
});
let second_poll_call_id = "uexec-delayed-poll-2";
let second_poll_args = json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 6000,
});
let third_poll_call_id = "uexec-delayed-poll-3";
let third_poll_args = json!({
"chars": "",
"session_id": 1000,
"yield_time_ms": 10000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
open_call_id,
"exec_command",
&serde_json::to_string(&open_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
first_poll_call_id,
"write_stdin",
&serde_json::to_string(&first_poll_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_function_call(
second_poll_call_id,
"write_stdin",
&serde_json::to_string(&second_poll_args)?,
),
ev_completed("resp-3"),
]),
sse(vec![
ev_response_created("resp-4"),
ev_function_call(
third_poll_call_id,
"write_stdin",
&serde_json::to_string(&third_poll_args)?,
),
ev_completed("resp-4"),
]),
sse(vec![
ev_response_created("resp-5"),
ev_assistant_message("msg-1", "complete"),
ev_completed("resp-5"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "delayed terminal interaction output".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;
let mut begin_event = None;
let mut end_event = None;
let mut terminal_events = Vec::new();
let mut delta_text = String::new();
// Consume all events for this turn so we can assert on each stage.
loop {
let msg = wait_for_event(&codex, |_| true).await;
match msg {
EventMsg::ExecCommandBegin(ev) if ev.call_id == open_call_id => {
begin_event = Some(ev);
}
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == open_call_id => {
delta_text.push_str(&String::from_utf8_lossy(&ev.chunk));
}
EventMsg::TerminalInteraction(ev) if ev.call_id == open_call_id => {
terminal_events.push(ev);
}
EventMsg::ExecCommandEnd(ev) if ev.call_id == open_call_id => {
end_event = Some(ev);
}
EventMsg::TaskComplete(_) => break,
_ => {}
}
}
let begin_event = begin_event.expect("expected ExecCommandBegin event");
assert!(
text.contains("WSTDIN-MARK"),
"stdin delta chunk missing expected text: {text:?}"
begin_event.process_id.is_some(),
"begin event should include process_id for a live session"
);
// We expect three terminal interactions matching the three write_stdin calls.
assert_eq!(
terminal_events.len(),
3,
"expected three terminal interactions; got {terminal_events:?}"
);
for event in &terminal_events {
assert_eq!(event.call_id, open_call_id);
assert_eq!(event.process_id, "1000");
}
assert_eq!(
terminal_events
.iter()
.map(|ev| ev.stdin.as_str())
.collect::<Vec<_>>(),
vec!["", "", ""],
"terminal interactions should reflect the three stdin polls"
);
assert!(
delta_text.contains("MARKER1") && delta_text.contains("MARKER2"),
"streamed deltas should contain both markers; got {delta_text:?}"
);
let end_event = end_event.expect("expected ExecCommandEnd event");
assert_eq!(end_event.call_id, open_call_id);
assert_eq!(end_event.exit_code, 0);
assert!(
end_event.process_id.is_some(),
"end event should include the process_id"
);
assert!(
end_event.aggregated_output.contains("MARKER1")
&& end_event.aggregated_output.contains("MARKER2"),
"aggregated output should include both markers in order; got {:?}",
end_event.aggregated_output
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
Ok(())
}