Stream apply_patch changes (#17862)

Adds new events for streaming apply_patch changes from responses api.
This is to enable clients to show progress during file writes.

Caveat: This does not work with apply_patch in function call mode, since
that required adding streaming json parsing.
This commit is contained in:
Akshay Nathan
2026-04-16 18:12:19 -07:00
committed by GitHub
parent 9effa0509f
commit 7995c66032
20 changed files with 729 additions and 29 deletions

View File

@@ -935,6 +935,128 @@ async fn apply_patch_cli_can_use_shell_command_output_as_patch_input() -> Result
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_patch_custom_tool_streaming_emits_updated_changes() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = apply_patch_harness_with(|builder| {
builder.with_config(|config| {
config
.features
.enable(Feature::ApplyPatchStreamingEvents)
.expect("enable apply_patch streaming events");
})
})
.await?;
let test = harness.test();
let codex = test.codex.clone();
let call_id = "apply-patch-streaming";
let patch = "*** Begin Patch\n*** Add File: streamed.txt\n+hello\n+world\n*** End Patch";
mount_sse_sequence(
harness.server(),
vec![
sse(vec![
ev_response_created("resp-1"),
json!({
"type": "response.output_item.added",
"item": {
"type": "custom_tool_call",
"call_id": call_id,
"name": "apply_patch",
"input": "",
}
}),
json!({
"type": "response.custom_tool_call_input.delta",
"call_id": call_id,
"delta": "*** Begin Patch\n",
}),
json!({
"type": "response.custom_tool_call_input.delta",
"call_id": call_id,
"delta": "*** Add File: streamed.txt\n+hello",
}),
json!({
"type": "response.custom_tool_call_input.delta",
"call_id": call_id,
"delta": "\n+world\n*** End Patch",
}),
ev_apply_patch_custom_tool_call(call_id, patch),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "create streamed file".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: harness.cwd().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: test.session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut updates = Vec::new();
wait_for_event(&codex, |event| match event {
EventMsg::PatchApplyUpdated(update) => {
updates.push(update.clone());
false
}
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
assert_eq!(
updates
.iter()
.map(|update| update.call_id.as_str())
.collect::<Vec<_>>(),
vec![call_id, call_id]
);
assert_eq!(
updates
.first()
.expect("first update")
.changes
.get(&std::path::PathBuf::from("streamed.txt")),
Some(&codex_protocol::protocol::FileChange::Add {
content: "hello\n".to_string(),
})
);
assert_eq!(
updates
.last()
.expect("last update")
.changes
.get(&std::path::PathBuf::from("streamed.txt")),
Some(&codex_protocol::protocol::FileChange::Add {
content: "hello\nworld\n".to_string(),
})
);
assert_eq!(
harness.read_file_text("streamed.txt").await?,
"hello\nworld\n"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_patch_shell_command_heredoc_with_cd_emits_turn_diff() -> Result<()> {
skip_if_no_network!(Ok(()));