diff --git a/codex-rs/apply-patch/src/lib.rs b/codex-rs/apply-patch/src/lib.rs index 09d777ef6f..7a47b1ea48 100644 --- a/codex-rs/apply-patch/src/lib.rs +++ b/codex-rs/apply-patch/src/lib.rs @@ -2,6 +2,7 @@ mod invocation; mod parser; mod seek_sequence; mod standalone_executable; +mod streaming_parser; use std::collections::HashMap; use std::io; @@ -20,8 +21,8 @@ pub use parser::ParseError; use parser::ParseError::*; pub use parser::UpdateFileChunk; pub use parser::parse_patch; -pub use parser::parse_patch_streaming; use similar::TextDiff; +pub use streaming_parser::StreamingPatchParser; use thiserror::Error; pub use invocation::maybe_parse_apply_patch_verified; diff --git a/codex-rs/apply-patch/src/parser.rs b/codex-rs/apply-patch/src/parser.rs index 64d3685d59..5403055272 100644 --- a/codex-rs/apply-patch/src/parser.rs +++ b/codex-rs/apply-patch/src/parser.rs @@ -31,15 +31,15 @@ use std::path::PathBuf; use thiserror::Error; -const BEGIN_PATCH_MARKER: &str = "*** Begin Patch"; -const END_PATCH_MARKER: &str = "*** End Patch"; -const ADD_FILE_MARKER: &str = "*** Add File: "; -const DELETE_FILE_MARKER: &str = "*** Delete File: "; -const UPDATE_FILE_MARKER: &str = "*** Update File: "; -const MOVE_TO_MARKER: &str = "*** Move to: "; -const EOF_MARKER: &str = "*** End of File"; -const CHANGE_CONTEXT_MARKER: &str = "@@ "; -const EMPTY_CHANGE_CONTEXT_MARKER: &str = "@@"; +pub(crate) const BEGIN_PATCH_MARKER: &str = "*** Begin Patch"; +pub(crate) const END_PATCH_MARKER: &str = "*** End Patch"; +pub(crate) const ADD_FILE_MARKER: &str = "*** Add File: "; +pub(crate) const DELETE_FILE_MARKER: &str = "*** Delete File: "; +pub(crate) const UPDATE_FILE_MARKER: &str = "*** Update File: "; +pub(crate) const MOVE_TO_MARKER: &str = "*** Move to: "; +pub(crate) const EOF_MARKER: &str = "*** End of File"; +pub(crate) const CHANGE_CONTEXT_MARKER: &str = "@@ "; +pub(crate) const EMPTY_CHANGE_CONTEXT_MARKER: &str = "@@"; /// Currently, the only OpenAI model that knowingly requires lenient parsing is /// gpt-4.1. While we could try to require everyone to pass in a strictness @@ -132,14 +132,6 @@ pub fn parse_patch(patch: &str) -> Result { parse_patch_text(patch, mode) } -/// Parses streamed patch text that may not have reached `*** End Patch` yet. -/// -/// This entry point is for progress reporting only; callers must not use its -/// output to apply a patch. -pub fn parse_patch_streaming(patch: &str) -> Result { - parse_patch_text(patch, ParseMode::Streaming) -} - enum ParseMode { /// Parse the patch text argument as is. Strict, @@ -177,12 +169,6 @@ enum ParseMode { /// `<<'EOF'` and ends with `EOF\n`. If so, we strip off these markers, /// trim() the result, and treat what is left as the patch text. Lenient, - - /// Parse partial patch text for progress reporting while the model is - /// still streaming tool input. This mode requires a begin marker but does - /// not require an end marker, and its output must not be used to apply a - /// patch. - Streaming, } fn parse_patch_text(patch: &str, mode: ParseMode) -> Result { @@ -190,15 +176,13 @@ fn parse_patch_text(patch: &str, mode: ParseMode) -> Result check_patch_boundaries_strict(&lines)?, ParseMode::Lenient => check_patch_boundaries_lenient(&lines)?, - ParseMode::Streaming => check_patch_boundaries_streaming(&lines)?, }; let mut hunks: Vec = Vec::new(); let mut remaining_lines = hunk_lines; let mut line_number = 2; - let allow_incomplete = matches!(mode, ParseMode::Streaming); while !remaining_lines.is_empty() { - let (hunk, hunk_lines) = parse_one_hunk(remaining_lines, line_number, allow_incomplete)?; + let (hunk, hunk_lines) = parse_one_hunk(remaining_lines, line_number)?; hunks.push(hunk); line_number += hunk_lines; remaining_lines = &remaining_lines[hunk_lines..] @@ -211,25 +195,6 @@ fn parse_patch_text(patch: &str, mode: ParseMode) -> Result( - original_lines: &'a [&'a str], -) -> Result<(&'a [&'a str], &'a [&'a str]), ParseError> { - match original_lines { - [first, ..] if first.trim() == BEGIN_PATCH_MARKER => { - let body_lines = if original_lines - .last() - .is_some_and(|line| line.trim() == END_PATCH_MARKER) - { - &original_lines[1..original_lines.len() - 1] - } else { - &original_lines[1..] - }; - Ok((original_lines, body_lines)) - } - _ => check_patch_boundaries_strict(original_lines), - } -} - /// Checks the start and end lines of the patch text for `apply_patch`, /// returning an error if they do not match the expected markers. fn check_patch_boundaries_strict<'a>( @@ -297,15 +262,9 @@ fn check_start_and_end_lines_strict( /// Attempts to parse a single hunk from the start of lines. /// Returns the parsed hunk and the number of lines parsed (or a ParseError). -fn parse_one_hunk( - lines: &[&str], - line_number: usize, - allow_incomplete: bool, -) -> Result<(Hunk, usize), ParseError> { - // Be tolerant of case mismatches and extra padding around marker strings. +fn parse_one_hunk(lines: &[&str], line_number: usize) -> Result<(Hunk, usize), ParseError> { let first_line = lines[0].trim(); if let Some(path) = first_line.strip_prefix(ADD_FILE_MARKER) { - // Add File let mut contents = String::new(); let mut parsed_lines = 1; for add_line in &lines[1..] { @@ -325,7 +284,6 @@ fn parse_one_hunk( parsed_lines, )); } else if let Some(path) = first_line.strip_prefix(DELETE_FILE_MARKER) { - // Delete File return Ok(( DeleteFile { path: PathBuf::from(path), @@ -333,11 +291,8 @@ fn parse_one_hunk( 1, )); } else if let Some(path) = first_line.strip_prefix(UPDATE_FILE_MARKER) { - // Update File let mut remaining_lines = &lines[1..]; let mut parsed_lines = 1; - - // Optional: move file line let move_path = remaining_lines .first() .and_then(|x| x.strip_prefix(MOVE_TO_MARKER)); @@ -348,9 +303,7 @@ fn parse_one_hunk( } let mut chunks = Vec::new(); - // NOTE: we need to know to stop once we reach the next special marker header. while !remaining_lines.is_empty() { - // Skip over any completely blank lines that may separate chunks. if remaining_lines[0].trim().is_empty() { parsed_lines += 1; remaining_lines = &remaining_lines[1..]; @@ -361,22 +314,11 @@ fn parse_one_hunk( break; } - if allow_incomplete && remaining_lines[0] == "@" { - break; - } - - let parsed_chunk = parse_update_file_chunk( + let (chunk, chunk_lines) = parse_update_file_chunk( remaining_lines, line_number + parsed_lines, chunks.is_empty(), - ); - let (chunk, chunk_lines) = match parsed_chunk { - Ok(parsed) => parsed, - Err(InvalidHunkError { .. }) if allow_incomplete && !chunks.is_empty() => { - break; - } - Err(err) => return Err(err), - }; + )?; chunks.push(chunk); parsed_lines += chunk_lines; remaining_lines = &remaining_lines[chunk_lines..] @@ -384,7 +326,10 @@ fn parse_one_hunk( if chunks.is_empty() { return Err(InvalidHunkError { - message: format!("Update file hunk for path '{path}' is empty"), + message: format!( + "Update file hunk for path '{}' is empty", + Path::new(path).display() + ), line_number, }); } @@ -418,8 +363,6 @@ fn parse_update_file_chunk( line_number, }); } - // If we see an explicit context marker @@ or @@ , consume it; otherwise, optionally - // allow treating the chunk as starting directly with diff lines. let (change_context, start_index) = if lines[0] == EMPTY_CHANGE_CONTEXT_MARKER { (None, 1) } else if let Some(context) = lines[0].strip_prefix(CHANGE_CONTEXT_MARKER) { @@ -501,162 +444,113 @@ fn parse_update_file_chunk( } #[test] -fn test_parse_patch_streaming() { +fn test_parse_one_hunk() { assert_eq!( - parse_patch_streaming("*** Begin Patch\n*** Add File: src/hello.txt\n+hello\n+wor"), - Ok(ApplyPatchArgs { - hunks: vec![AddFile { - path: PathBuf::from("src/hello.txt"), - contents: "hello\nwor\n".to_string(), - }], - patch: "*** Begin Patch\n*** Add File: src/hello.txt\n+hello\n+wor".to_string(), - workdir: None, - }) - ); - - assert_eq!( - parse_patch_streaming( - "*** Begin Patch\n*** Update File: src/old.rs\n*** Move to: src/new.rs\n@@\n-old\n+new", - ), - Ok(ApplyPatchArgs { - hunks: vec![UpdateFile { - path: PathBuf::from("src/old.rs"), - move_path: Some(PathBuf::from("src/new.rs")), - chunks: vec![UpdateFileChunk { - change_context: None, - old_lines: vec!["old".to_string()], - new_lines: vec!["new".to_string()], - is_end_of_file: false, - }], - }], - patch: "*** Begin Patch\n*** Update File: src/old.rs\n*** Move to: src/new.rs\n@@\n-old\n+new".to_string(), - workdir: None, - }) - ); - - assert!( - parse_patch_text( - "*** Begin Patch\n*** Delete File: gone.txt", - ParseMode::Streaming - ) - .is_ok() - ); - assert!( - parse_patch_text( - "*** Begin Patch\n*** Delete File: gone.txt", - ParseMode::Strict - ) - .is_err() - ); - - assert_eq!( - parse_patch_streaming( - "*** Begin Patch\n*** Add File: src/one.txt\n+one\n*** Delete File: src/two.txt\n", - ), - Ok(ApplyPatchArgs { - hunks: vec![ - AddFile { - path: PathBuf::from("src/one.txt"), - contents: "one\n".to_string(), - }, - DeleteFile { - path: PathBuf::from("src/two.txt"), - }, - ], - patch: "*** Begin Patch\n*** Add File: src/one.txt\n+one\n*** Delete File: src/two.txt" - .to_string(), - workdir: None, + parse_one_hunk(&["bad"], /*line_number*/ 234), + Err(InvalidHunkError { + message: "'bad' is not a valid hunk header. \ + Valid hunk headers: '*** Add File: {path}', '*** Delete File: {path}', '*** Update File: {path}'".to_string(), + line_number: 234 }) ); } #[test] -fn test_parse_patch_streaming_large_patch_by_character() { - let patch = "\ -*** Begin Patch -*** Add File: docs/release-notes.md -+# Release notes -+ -+## CLI -+- Surface apply_patch progress while arguments stream. -+- Keep final patch application gated on the completed tool call. -+- Include file summaries in the progress event payload. -*** Update File: src/config.rs -@@ impl Config -- pub apply_patch_progress: bool, -+ pub stream_apply_patch_progress: bool, - pub include_diagnostics: bool, -@@ fn default_progress_interval() -- Duration::from_millis(500) -+ Duration::from_millis(250) -*** Delete File: src/legacy_patch_progress.rs -*** Update File: crates/cli/src/main.rs -*** Move to: crates/cli/src/bin/codex.rs -@@ fn run() -- let args = Args::parse(); -- dispatch(args) -+ let cli = Cli::parse(); -+ dispatch(cli) -*** Add File: tests/fixtures/apply_patch_progress.json -+{ -+ \"type\": \"apply_patch_progress\", -+ \"hunks\": [ -+ { \"operation\": \"add\", \"path\": \"docs/release-notes.md\" }, -+ { \"operation\": \"update\", \"path\": \"src/config.rs\" } -+ ] -+} -*** Update File: README.md -@@ Development workflow - Build the Rust workspace before opening a pull request. -+When touching streamed tool calls, include parser coverage for partial input. -+Prefer tests that exercise the exact event payload shape. -*** Delete File: docs/old-apply-patch-progress.md -*** End Patch"; - - let mut max_hunk_count = 0; - let mut saw_hunk_counts = Vec::new(); - for i in 1..=patch.len() { - let partial = &patch[..i]; - if let Ok(parsed) = parse_patch_streaming(partial) { - let hunk_count = parsed.hunks.len(); - assert!( - hunk_count >= max_hunk_count, - "hunk count should never decrease while streaming: {hunk_count} < {max_hunk_count} for {partial:?}", - ); - if hunk_count > max_hunk_count { - saw_hunk_counts.push(hunk_count); - max_hunk_count = hunk_count; - } - } - } - - assert_eq!(saw_hunk_counts, vec![1, 2, 3, 4, 5, 6, 7]); - let parsed = parse_patch_streaming(patch).unwrap(); - assert_eq!(parsed.hunks.len(), 7); +fn test_update_file_chunk() { assert_eq!( - parsed - .hunks - .iter() - .map(|hunk| match hunk { - AddFile { .. } => "add", - DeleteFile { .. } => "delete", - UpdateFile { - move_path: Some(_), .. - } => "move-update", - UpdateFile { - move_path: None, .. - } => "update", - }) - .collect::>(), - vec![ - "add", - "update", - "delete", - "move-update", - "add", - "update", - "delete" - ] + parse_update_file_chunk( + &["bad"], + /*line_number*/ 123, + /*allow_missing_context*/ false, + ), + Err(InvalidHunkError { + message: "Expected update hunk to start with a @@ context marker, got: 'bad'" + .to_string(), + line_number: 123 + }) + ); + assert_eq!( + parse_update_file_chunk( + &["@@"], + /*line_number*/ 123, + /*allow_missing_context*/ false, + ), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 124 + }) + ); + assert_eq!( + parse_update_file_chunk( + &["@@", "bad"], + /*line_number*/ 123, + /*allow_missing_context*/ false, + ), + Err(InvalidHunkError { + message: "Unexpected line found in update hunk: 'bad'. Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)".to_string(), + line_number: 124 + }) + ); + assert_eq!( + parse_update_file_chunk( + &["@@", "*** End of File"], + /*line_number*/ 123, + /*allow_missing_context*/ false, + ), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 124 + }) + ); + assert_eq!( + parse_update_file_chunk( + &[ + "@@ change_context", + "", + " context", + "-remove", + "+add", + " context2", + "*** End Patch", + ], + /*line_number*/ 123, + /*allow_missing_context*/ false, + ), + Ok(( + UpdateFileChunk { + change_context: Some("change_context".to_string()), + old_lines: vec![ + String::new(), + "context".to_string(), + "remove".to_string(), + "context2".to_string(), + ], + new_lines: vec![ + String::new(), + "context".to_string(), + "add".to_string(), + "context2".to_string(), + ], + is_end_of_file: false, + }, + 6, + )) + ); + assert_eq!( + parse_update_file_chunk( + &["@@", "+line", "*** End of File"], + /*line_number*/ 123, + /*allow_missing_context*/ false, + ), + Ok(( + UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: vec!["line".to_string()], + is_end_of_file: true, + }, + 3, + )) ); } @@ -997,112 +891,3 @@ fn test_parse_patch_lenient() { )) ); } - -#[test] -fn test_parse_one_hunk() { - assert_eq!( - parse_one_hunk(&["bad"], /*line_number*/ 234, /*allow_incomplete*/ false), - Err(InvalidHunkError { - message: "'bad' is not a valid hunk header. \ - Valid hunk headers: '*** Add File: {path}', '*** Delete File: {path}', '*** Update File: {path}'".to_string(), - line_number: 234 - }) - ); - // Other edge cases are already covered by tests above/below. -} - -#[test] -fn test_update_file_chunk() { - assert_eq!( - parse_update_file_chunk( - &["bad"], - /*line_number*/ 123, - /*allow_missing_context*/ false - ), - Err(InvalidHunkError { - message: "Expected update hunk to start with a @@ context marker, got: 'bad'" - .to_string(), - line_number: 123 - }) - ); - assert_eq!( - parse_update_file_chunk( - &["@@"], - /*line_number*/ 123, - /*allow_missing_context*/ false - ), - Err(InvalidHunkError { - message: "Update hunk does not contain any lines".to_string(), - line_number: 124 - }) - ); - assert_eq!( - parse_update_file_chunk(&["@@", "bad"], /*line_number*/ 123, /*allow_missing_context*/ false), - Err(InvalidHunkError { - message: "Unexpected line found in update hunk: 'bad'. \ - Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)".to_string(), - line_number: 124 - }) - ); - assert_eq!( - parse_update_file_chunk( - &["@@", "*** End of File"], - /*line_number*/ 123, - /*allow_missing_context*/ false - ), - Err(InvalidHunkError { - message: "Update hunk does not contain any lines".to_string(), - line_number: 124 - }) - ); - assert_eq!( - parse_update_file_chunk( - &[ - "@@ change_context", - "", - " context", - "-remove", - "+add", - " context2", - "*** End Patch", - ], - /*line_number*/ 123, - /*allow_missing_context*/ false - ), - Ok(( - (UpdateFileChunk { - change_context: Some("change_context".to_string()), - old_lines: vec![ - "".to_string(), - "context".to_string(), - "remove".to_string(), - "context2".to_string() - ], - new_lines: vec![ - "".to_string(), - "context".to_string(), - "add".to_string(), - "context2".to_string() - ], - is_end_of_file: false - }), - 6 - )) - ); - assert_eq!( - parse_update_file_chunk( - &["@@", "+line", "*** End of File"], - /*line_number*/ 123, - /*allow_missing_context*/ false - ), - Ok(( - (UpdateFileChunk { - change_context: None, - old_lines: vec![], - new_lines: vec!["line".to_string()], - is_end_of_file: true - }), - 3 - )) - ); -} diff --git a/codex-rs/apply-patch/src/streaming_parser.rs b/codex-rs/apply-patch/src/streaming_parser.rs new file mode 100644 index 0000000000..4acfad6720 --- /dev/null +++ b/codex-rs/apply-patch/src/streaming_parser.rs @@ -0,0 +1,813 @@ +use std::path::PathBuf; + +use crate::parser::ADD_FILE_MARKER; +use crate::parser::BEGIN_PATCH_MARKER; +use crate::parser::CHANGE_CONTEXT_MARKER; +use crate::parser::DELETE_FILE_MARKER; +use crate::parser::EMPTY_CHANGE_CONTEXT_MARKER; +use crate::parser::END_PATCH_MARKER; +use crate::parser::EOF_MARKER; +use crate::parser::Hunk; +use crate::parser::MOVE_TO_MARKER; +use crate::parser::ParseError; +use crate::parser::UPDATE_FILE_MARKER; +use crate::parser::UpdateFileChunk; + +use Hunk::*; +use ParseError::*; + +#[derive(Debug, Default, Clone)] +pub struct StreamingPatchParser { + line_buffer: String, + state: StreamingParserState, + line_number: usize, +} + +#[derive(Debug, Default, Clone)] +struct StreamingParserState { + mode: StreamingParserMode, + hunks: Vec, +} + +#[derive(Debug, Default, Clone)] +enum StreamingParserMode { + #[default] + NotStarted, + StartedPatch, + AddFile, + DeleteFile, + UpdateFile { + hunk_line_number: usize, + }, + EndedPatch, +} + +impl StreamingPatchParser { + fn ensure_update_hunk_is_not_empty(&self, line: &str) -> Result<(), ParseError> { + if let Some(UpdateFile { path, chunks, .. }) = self.state.hunks.last() { + if chunks.is_empty() + && let StreamingParserMode::UpdateFile { hunk_line_number } = self.state.mode + { + return Err(InvalidHunkError { + message: format!("Update file hunk for path '{}' is empty", path.display()), + line_number: hunk_line_number, + }); + } + if chunks + .last() + .is_some_and(|chunk| chunk.old_lines.is_empty() && chunk.new_lines.is_empty()) + { + if line == END_PATCH_MARKER { + return Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: self.line_number, + }); + } + return Err(InvalidHunkError { + message: format!( + "Unexpected line found in update hunk: '{line}'. Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)" + ), + line_number: self.line_number, + }); + } + } + Ok(()) + } + + fn handle_hunk_headers_and_end_patch(&mut self, trimmed: &str) -> Result { + if trimmed == END_PATCH_MARKER { + self.ensure_update_hunk_is_not_empty(trimmed)?; + self.state.mode = StreamingParserMode::EndedPatch; + return Ok(true); + } + if let Some(path) = trimmed.strip_prefix(ADD_FILE_MARKER) { + self.ensure_update_hunk_is_not_empty(trimmed)?; + self.state.hunks.push(AddFile { + path: PathBuf::from(path), + contents: String::new(), + }); + self.state.mode = StreamingParserMode::AddFile; + return Ok(true); + } + if let Some(path) = trimmed.strip_prefix(DELETE_FILE_MARKER) { + self.ensure_update_hunk_is_not_empty(trimmed)?; + self.state.hunks.push(DeleteFile { + path: PathBuf::from(path), + }); + self.state.mode = StreamingParserMode::DeleteFile; + return Ok(true); + } + if let Some(path) = trimmed.strip_prefix(UPDATE_FILE_MARKER) { + self.ensure_update_hunk_is_not_empty(trimmed)?; + self.state.hunks.push(UpdateFile { + path: PathBuf::from(path), + move_path: None, + chunks: Vec::new(), + }); + self.state.mode = StreamingParserMode::UpdateFile { + hunk_line_number: self.line_number, + }; + return Ok(true); + } + Ok(false) + } + + pub fn push_delta(&mut self, delta: &str) -> Result, ParseError> { + for ch in delta.chars() { + if ch == '\n' { + let mut line = std::mem::take(&mut self.line_buffer); + line.truncate(line.strip_suffix('\r').map_or(line.len(), str::len)); + self.line_number += 1; + self.process_line(&line)?; + } else { + self.line_buffer.push(ch); + } + } + + Ok(self.state.hunks.clone()) + } + + pub fn finish(&mut self) -> Result, ParseError> { + if !self.line_buffer.is_empty() { + let line = std::mem::take(&mut self.line_buffer); + self.line_number += 1; + if line.trim() == END_PATCH_MARKER { + self.ensure_update_hunk_is_not_empty(line.trim())?; + self.state.mode = StreamingParserMode::EndedPatch; + } else { + self.process_line(&line)?; + } + } + + if !matches!(self.state.mode, StreamingParserMode::EndedPatch) { + return Err(InvalidPatchError( + "The last line of the patch must be '*** End Patch'".to_string(), + )); + } + + Ok(self.state.hunks.clone()) + } + + fn process_line(&mut self, line: &str) -> Result<(), ParseError> { + let trimmed = line.trim(); + match self.state.mode.clone() { + StreamingParserMode::NotStarted => { + if trimmed == BEGIN_PATCH_MARKER { + self.state.mode = StreamingParserMode::StartedPatch; + return Ok(()); + } + Err(InvalidPatchError( + "The first line of the patch must be '*** Begin Patch'".to_string(), + )) + } + StreamingParserMode::StartedPatch => { + if self.handle_hunk_headers_and_end_patch(trimmed)? { + return Ok(()); + } + Err(InvalidHunkError { + message: format!( + "'{trimmed}' is not a valid hunk header. Valid hunk headers: '*** Add File: {{path}}', '*** Delete File: {{path}}', '*** Update File: {{path}}'" + ), + line_number: self.line_number, + }) + } + StreamingParserMode::AddFile => { + if self.handle_hunk_headers_and_end_patch(trimmed)? { + return Ok(()); + } + if let Some(line_to_add) = line.strip_prefix('+') + && let Some(AddFile { contents, .. }) = self.state.hunks.last_mut() + { + contents.push_str(line_to_add); + contents.push('\n'); + return Ok(()); + } + Err(InvalidHunkError { + message: format!( + "'{trimmed}' is not a valid hunk header. Valid hunk headers: '*** Add File: {{path}}', '*** Delete File: {{path}}', '*** Update File: {{path}}'" + ), + line_number: self.line_number, + }) + } + StreamingParserMode::DeleteFile => { + if self.handle_hunk_headers_and_end_patch(trimmed)? { + return Ok(()); + } + Err(InvalidHunkError { + message: format!( + "'{trimmed}' is not a valid hunk header. Valid hunk headers: '*** Add File: {{path}}', '*** Delete File: {{path}}', '*** Update File: {{path}}'" + ), + line_number: self.line_number, + }) + } + StreamingParserMode::UpdateFile { hunk_line_number } => { + let update_line = line.trim_end(); + if self.handle_hunk_headers_and_end_patch(update_line)? { + return Ok(()); + } + + if let Some(UpdateFile { + move_path, chunks, .. + }) = self.state.hunks.last_mut() + { + if chunks.is_empty() + && move_path.is_none() + && let Some(move_to_path) = update_line.strip_prefix(MOVE_TO_MARKER) + { + *move_path = Some(PathBuf::from(move_to_path)); + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if (update_line == EMPTY_CHANGE_CONTEXT_MARKER + || update_line.starts_with(CHANGE_CONTEXT_MARKER)) + && chunks.last().is_some_and(|chunk| { + chunk.old_lines.is_empty() && chunk.new_lines.is_empty() + }) + { + return Err(InvalidHunkError { + message: format!( + "Unexpected line found in update hunk: '{line}'. Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)" + ), + line_number: self.line_number, + }); + } + + if update_line == EMPTY_CHANGE_CONTEXT_MARKER { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if let Some(change_context) = update_line.strip_prefix(CHANGE_CONTEXT_MARKER) { + chunks.push(UpdateFileChunk { + change_context: Some(change_context.to_string()), + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if update_line == EOF_MARKER { + if chunks.last().is_some_and(|chunk| { + chunk.old_lines.is_empty() && chunk.new_lines.is_empty() + }) { + return Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: self.line_number, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.is_end_of_file = true; + } + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if line.is_empty() { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.old_lines.push(String::new()); + chunk.new_lines.push(String::new()); + } + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if let Some(line_to_add) = line.strip_prefix(' ') { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.old_lines.push(line_to_add.to_string()); + chunk.new_lines.push(line_to_add.to_string()); + } + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if let Some(line_to_add) = line.strip_prefix('+') { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.new_lines.push(line_to_add.to_string()); + } + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if let Some(line_to_remove) = line.strip_prefix('-') { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.old_lines.push(line_to_remove.to_string()); + } + self.state.mode = StreamingParserMode::UpdateFile { hunk_line_number }; + return Ok(()); + } + + if chunks.last().is_some_and(|chunk| { + !chunk.old_lines.is_empty() || !chunk.new_lines.is_empty() + }) { + return Err(InvalidHunkError { + message: format!( + "Expected update hunk to start with a @@ context marker, got: '{line}'" + ), + line_number: self.line_number, + }); + } + } + Err(InvalidHunkError { + message: format!( + "Unexpected line found in update hunk: '{line}'. Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)" + ), + line_number: self.line_number, + }) + } + StreamingParserMode::EndedPatch => Ok(()), + } + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use std::path::PathBuf; + + use super::*; + + #[test] + fn test_streaming_patch_parser_streams_complete_lines_before_end_patch() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Add File: src/hello.txt\n+hello\n+wor"), + Ok(vec![AddFile { + path: PathBuf::from("src/hello.txt"), + contents: "hello\n".to_string(), + }]) + ); + assert_eq!( + parser.push_delta("ld\n"), + Ok(vec![AddFile { + path: PathBuf::from("src/hello.txt"), + contents: "hello\nworld\n".to_string(), + }]) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "*** Begin Patch\n*** Update File: src/old.rs\n*** Move to: src/new.rs\n@@\n-old\n+new\n", + ), + Ok(vec![UpdateFile { + path: PathBuf::from("src/old.rs"), + move_path: Some(PathBuf::from("src/new.rs")), + chunks: vec![UpdateFileChunk { + change_context: None, + old_lines: vec!["old".to_string()], + new_lines: vec!["new".to_string()], + is_end_of_file: false, + }], + }]) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Delete File: gone.txt"), + Ok(Vec::new()) + ); + assert_eq!( + parser.push_delta("\n"), + Ok(vec![DeleteFile { + path: PathBuf::from("gone.txt"), + }]) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "*** Begin Patch\n*** Add File: src/one.txt\n+one\n*** Delete File: src/two.txt\n", + ), + Ok(vec![ + AddFile { + path: PathBuf::from("src/one.txt"), + contents: "one\n".to_string(), + }, + DeleteFile { + path: PathBuf::from("src/two.txt"), + }, + ]) + ); + } + + #[test] + fn test_streaming_patch_parser_large_patch_split_by_character() { + let patch = "\ +*** Begin Patch +*** Add File: docs/release-notes.md ++# Release notes ++ ++## CLI ++- Surface apply_patch progress while arguments stream. ++- Keep final patch application gated on the completed tool call. ++- Include file summaries in the progress event payload. +*** Update File: src/config.rs +@@ impl Config +- pub apply_patch_progress: bool, ++ pub stream_apply_patch_progress: bool, + pub include_diagnostics: bool, +@@ fn default_progress_interval() +- Duration::from_millis(500) ++ Duration::from_millis(250) +*** Delete File: src/legacy_patch_progress.rs +*** Update File: crates/cli/src/main.rs +*** Move to: crates/cli/src/bin/codex.rs +@@ fn run() +- let args = Args::parse(); +- dispatch(args) ++ let cli = Cli::parse(); ++ dispatch(cli) +*** Add File: tests/fixtures/apply_patch_progress.json ++{ ++ \"type\": \"apply_patch_progress\", ++ \"hunks\": [ ++ { \"operation\": \"add\", \"path\": \"docs/release-notes.md\" }, ++ { \"operation\": \"update\", \"path\": \"src/config.rs\" } ++ ] ++} +*** Update File: README.md +@@ Development workflow + Build the Rust workspace before opening a pull request. ++When touching streamed tool calls, include parser coverage for partial input. ++Prefer tests that exercise the exact event payload shape. +*** Delete File: docs/old-apply-patch-progress.md +*** End Patch"; + + let mut parser = StreamingPatchParser::default(); + let mut max_hunk_count = 0; + let mut saw_hunk_counts = Vec::new(); + let mut hunks = Vec::new(); + for ch in patch.chars() { + let updated_hunks = parser.push_delta(&ch.to_string()).unwrap(); + if !updated_hunks.is_empty() { + let hunk_count = updated_hunks.len(); + assert!( + hunk_count >= max_hunk_count, + "hunk count should never decrease while streaming: {hunk_count} < {max_hunk_count}", + ); + if hunk_count > max_hunk_count { + saw_hunk_counts.push(hunk_count); + max_hunk_count = hunk_count; + } + hunks = updated_hunks; + } + } + + assert_eq!(saw_hunk_counts, vec![1, 2, 3, 4, 5, 6, 7]); + assert_eq!(hunks.len(), 7); + assert_eq!( + hunks + .iter() + .map(|hunk| match hunk { + AddFile { .. } => "add", + DeleteFile { .. } => "delete", + UpdateFile { + move_path: Some(_), .. + } => "move-update", + UpdateFile { + move_path: None, .. + } => "update", + }) + .collect::>(), + vec![ + "add", + "update", + "delete", + "move-update", + "add", + "update", + "delete" + ] + ); + } + + #[test] + fn test_streaming_patch_parser_keeps_indented_update_markers_as_context_lines() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "\ +*** Begin Patch +*** Update File: a.txt +@@ +-old a ++new a + *** Update File: b.txt +@@ +-old b ++new b +*** End Patch +", + ), + Ok(vec![UpdateFile { + path: PathBuf::from("a.txt"), + move_path: None, + chunks: vec![ + UpdateFileChunk { + change_context: None, + old_lines: vec!["old a".to_string(), "*** Update File: b.txt".to_string()], + new_lines: vec!["new a".to_string(), "*** Update File: b.txt".to_string()], + is_end_of_file: false, + }, + UpdateFileChunk { + change_context: None, + old_lines: vec!["old b".to_string()], + new_lines: vec!["new b".to_string()], + is_end_of_file: false, + }, + ], + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_preserves_bare_empty_update_lines() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "\ +*** Begin Patch +*** Update File: file.txt +@@ + context before + + context after +*** End Patch +", + ), + Ok(vec![UpdateFile { + path: PathBuf::from("file.txt"), + move_path: None, + chunks: vec![UpdateFileChunk { + change_context: None, + // The normal parser treats a bare empty line in an update hunk as an + // empty context line. Preserve that leniency in the streaming parser. + old_lines: vec![ + "context before".to_string(), + String::new(), + "context after".to_string(), + ], + new_lines: vec![ + "context before".to_string(), + String::new(), + "context after".to_string(), + ], + is_end_of_file: false, + }], + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_matches_line_ending_behavior() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\r\n*** Update File: file.txt\r\n@@\r\n-old\r\n+new\r\n*** End Patch\r\n"), + Ok(vec![UpdateFile { + path: PathBuf::from("file.txt"), + move_path: None, + chunks: vec![UpdateFileChunk { + change_context: None, + old_lines: vec!["old".to_string()], + new_lines: vec!["new".to_string()], + is_end_of_file: false, + }], + }]) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\r\n*** Update File: file.txt\r\n@@\r\n-old\r\r\n+new\r\n*** End Patch\r\n"), + Ok(vec![UpdateFile { + path: PathBuf::from("file.txt"), + move_path: None, + chunks: vec![UpdateFileChunk { + change_context: None, + old_lines: vec!["old\r".to_string()], + new_lines: vec!["new".to_string()], + is_end_of_file: false, + }], + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_finish_processes_final_line_without_newline() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Add File: file.txt\n+hello\n*** End Patch"), + Ok(vec![AddFile { + path: PathBuf::from("file.txt"), + contents: "hello\n".to_string(), + }]) + ); + assert_eq!( + parser.finish(), + Ok(vec![AddFile { + path: PathBuf::from("file.txt"), + contents: "hello\n".to_string(), + }]) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "*** Begin Patch\n*** Update File: file.txt\n@@\n-old\n+new\n *** End Patch", + ), + Ok(vec![UpdateFile { + path: PathBuf::from("file.txt"), + move_path: None, + chunks: vec![UpdateFileChunk { + change_context: None, + old_lines: vec!["old".to_string()], + new_lines: vec!["new".to_string()], + is_end_of_file: false, + }], + }]) + ); + assert_eq!( + parser.finish(), + Ok(vec![UpdateFile { + path: PathBuf::from("file.txt"), + move_path: None, + chunks: vec![UpdateFileChunk { + change_context: None, + old_lines: vec!["old".to_string()], + new_lines: vec!["new".to_string()], + is_end_of_file: false, + }], + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_finish_requires_end_patch() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Add File: file.txt\n+hello\n"), + Ok(vec![AddFile { + path: PathBuf::from("file.txt"), + contents: "hello\n".to_string(), + }]) + ); + assert_eq!( + parser.finish(), + Err(InvalidPatchError( + "The last line of the patch must be '*** End Patch'".to_string(), + )) + ); + } + + #[test] + fn test_streaming_patch_parser_returns_errors() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("bad\n"), + Err(InvalidPatchError( + "The first line of the patch must be '*** Begin Patch'".to_string(), + )) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!(parser.push_delta("*** Begin Patch\n"), Ok(Vec::new())); + assert_eq!( + parser.push_delta("bad\n"), + Err(InvalidHunkError { + message: "'bad' is not a valid hunk header. Valid hunk headers: '*** Add File: {path}', '*** Delete File: {path}', '*** Update File: {path}'" + .to_string(), + line_number: 2, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Add File: file.txt\nbad\n"), + Err(InvalidHunkError { + message: "'bad' is not a valid hunk header. Valid hunk headers: '*** Add File: {path}', '*** Delete File: {path}', '*** Update File: {path}'" + .to_string(), + line_number: 3, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Delete File: file.txt\nbad\n"), + Err(InvalidHunkError { + message: "'bad' is not a valid hunk header. Valid hunk headers: '*** Add File: {path}', '*** Delete File: {path}', '*** Update File: {path}'" + .to_string(), + line_number: 3, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n*** End Patch\n"), + Err(InvalidHunkError { + message: "Update file hunk for path 'file.txt' is empty".to_string(), + line_number: 2, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "*** Begin Patch\n*** Update File: old.txt\n*** Move to: new.txt\n*** Delete File: other.txt\n", + ), + Err(InvalidHunkError { + message: "Update file hunk for path 'old.txt' is empty".to_string(), + line_number: 2, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n@@\n*** End Patch\n"), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 4, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n@@\n*** End of File\n"), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 4, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n@@\n@@\n"), + Err(InvalidHunkError { + message: "Unexpected line found in update hunk: '@@'. Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)" + .to_string(), + line_number: 4, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n@@\n-old\nbad\n"), + Err(InvalidHunkError { + message: "Expected update hunk to start with a @@ context marker, got: 'bad'" + .to_string(), + line_number: 5, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "*** Begin Patch\n*** Update File: file.txt\n@@\n*** Update File: other.txt\n", + ), + Err(InvalidHunkError { + message: "Unexpected line found in update hunk: '*** Update File: other.txt'. Every line should start with ' ' (context line), '+' (added line), or '-' (removed line)" + .to_string(), + line_number: 4, + }) + ); + } +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 49dc1eb107..286aa2d687 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1902,7 +1902,7 @@ async fn try_run_sampling_request( ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take() - && let Some(event) = consumer.flush_on_complete() + && let Ok(Some(event)) = consumer.finish() { sess.send_event(&turn_context, event).await; } diff --git a/codex-rs/core/src/tools/handlers/apply_patch.rs b/codex-rs/core/src/tools/handlers/apply_patch.rs index 7c1b7a92f2..d71eb7931a 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch.rs @@ -33,10 +33,9 @@ use crate::tools::runtimes::apply_patch::ApplyPatchRequest; use crate::tools::runtimes::apply_patch::ApplyPatchRuntime; use crate::tools::sandboxing::ToolCtx; use codex_apply_patch::ApplyPatchAction; -use codex_apply_patch::ApplyPatchArgs; use codex_apply_patch::ApplyPatchFileChange; use codex_apply_patch::Hunk; -use codex_apply_patch::parse_patch_streaming; +use codex_apply_patch::StreamingPatchParser; use codex_exec_server::ExecutorFileSystem; use codex_features::Feature; use codex_protocol::models::AdditionalPermissionProfile; @@ -56,8 +55,7 @@ pub struct ApplyPatchHandler; #[derive(Default)] struct ApplyPatchArgumentDiffConsumer { - input: String, - last_progress: Option>, + parser: StreamingPatchParser, last_sent_at: Option, pending: Option, } @@ -77,26 +75,19 @@ impl ToolArgumentDiffConsumer for ApplyPatchArgumentDiffConsumer { .map(EventMsg::PatchApplyUpdated) } - fn flush_on_complete(&mut self) -> Option { - self.flush_update_on_complete() - .map(EventMsg::PatchApplyUpdated) + fn finish(&mut self) -> Result, FunctionCallError> { + self.finish_update_on_complete() + .map(|event| event.map(EventMsg::PatchApplyUpdated)) } } impl ApplyPatchArgumentDiffConsumer { fn push_delta(&mut self, call_id: String, delta: &str) -> Option { - self.input.push_str(delta); - - let ApplyPatchArgs { hunks, .. } = parse_patch_streaming(&self.input).ok()?; + let hunks = self.parser.push_delta(delta).ok()?; if hunks.is_empty() { return None; } - if self.last_progress.as_ref() == Some(&hunks) { - return None; - } - let changes = convert_apply_patch_hunks_to_protocol(&hunks); - self.last_progress = Some(hunks); let event = PatchApplyUpdatedEvent { call_id, changes }; let now = Instant::now(); match self.last_sent_at { @@ -114,12 +105,18 @@ impl ApplyPatchArgumentDiffConsumer { } } - fn flush_update_on_complete(&mut self) -> Option { + fn finish_update_on_complete( + &mut self, + ) -> Result, FunctionCallError> { + self.parser.finish().map_err(|err| { + FunctionCallError::RespondToModel(format!("failed to parse apply_patch: {err}")) + })?; + let event = self.pending.take(); if event.is_some() { self.last_sent_at = Some(Instant::now()); } - event + Ok(event) } } diff --git a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs index 230e656225..04472e4623 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs @@ -136,7 +136,7 @@ fn diff_consumer_streams_apply_patch_changes() { HashMap::from([( PathBuf::from("hello.txt"), FileChange::Add { - content: "hello\n".to_string(), + content: String::new(), }, )]), ) @@ -147,8 +147,16 @@ fn diff_consumer_streams_apply_patch_changes() { .push_delta("call-1".to_string(), "\n+world") .is_none() ); + assert!( + consumer + .push_delta("call-1".to_string(), "\n*** End Patch") + .is_none() + ); - let event = consumer.flush_update_on_complete().expect("progress event"); + let event = consumer + .finish_update_on_complete() + .expect("finish parser") + .expect("progress event"); assert_eq!( (event.call_id, event.changes), ( @@ -175,7 +183,7 @@ fn diff_consumer_sends_next_update_after_buffer_interval() { HashMap::from([( PathBuf::from("hello.txt"), FileChange::Add { - content: "hello\n".to_string(), + content: String::new(), }, )]) ); @@ -190,7 +198,7 @@ fn diff_consumer_sends_next_update_after_buffer_interval() { HashMap::from([( PathBuf::from("hello.txt"), FileChange::Add { - content: "hello\nworld\n".to_string(), + content: "hello\n".to_string(), }, )]) ); diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 29abab69f3..e1027c9fa9 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -98,9 +98,9 @@ pub(crate) trait ToolArgumentDiffConsumer: Send { fn consume_diff(&mut self, turn: &TurnContext, call_id: String, diff: &str) -> Option; - /// Flush any buffered event before the tool call completes. - fn flush_on_complete(&mut self) -> Option { - None + /// Finish consuming argument diffs before the tool call completes. + fn finish(&mut self) -> Result, FunctionCallError> { + Ok(None) } } diff --git a/codex-rs/core/tests/suite/apply_patch_cli.rs b/codex-rs/core/tests/suite/apply_patch_cli.rs index 44c3aff165..f08dfd5f0e 100644 --- a/codex-rs/core/tests/suite/apply_patch_cli.rs +++ b/codex-rs/core/tests/suite/apply_patch_cli.rs @@ -1027,7 +1027,7 @@ async fn apply_patch_custom_tool_streaming_emits_updated_changes() -> Result<()> .changes .get(&std::path::PathBuf::from("streamed.txt")), Some(&codex_protocol::protocol::FileChange::Add { - content: "hello\n".to_string(), + content: String::new(), }) ); assert_eq!(