From 6bd78a51b55ce8e84511eecf69bd89f822903d74 Mon Sep 17 00:00:00 2001 From: Akshay Nathan Date: Tue, 28 Apr 2026 17:14:58 -0700 Subject: [PATCH] Fix streaming apply_patch parser mismatches --- codex-rs/apply-patch/src/streaming_parser.rs | 365 ++++++++++++++---- .../apply-patch/tests/suite/corpus_compare.rs | 173 --------- codex-rs/apply-patch/tests/suite/mod.rs | 1 - codex-rs/core/src/session/turn.rs | 2 +- .../core/src/tools/handlers/apply_patch.rs | 21 +- .../src/tools/handlers/apply_patch_tests.rs | 24 +- codex-rs/core/src/tools/registry.rs | 6 +- 7 files changed, 326 insertions(+), 266 deletions(-) delete mode 100644 codex-rs/apply-patch/tests/suite/corpus_compare.rs diff --git a/codex-rs/apply-patch/src/streaming_parser.rs b/codex-rs/apply-patch/src/streaming_parser.rs index f532921670..9da2f6ff6f 100644 --- a/codex-rs/apply-patch/src/streaming_parser.rs +++ b/codex-rs/apply-patch/src/streaming_parser.rs @@ -43,36 +43,52 @@ enum StreamingParserMode { fn handle_hunk_headers_and_end_patch( trimmed: &str, hunks: &mut Vec, -) -> Option { +) -> Result, String> { if trimmed == END_PATCH_MARKER { - return Some(StreamingParserMode::EndedPatch); + ensure_update_hunk_is_not_empty(hunks)?; + return Ok(Some(StreamingParserMode::EndedPatch)); } if let Some(path) = trimmed.strip_prefix(ADD_FILE_MARKER) { + ensure_update_hunk_is_not_empty(hunks)?; hunks.push(AddFile { path: PathBuf::from(path), contents: String::new(), }); - return Some(StreamingParserMode::AddFile); + return Ok(Some(StreamingParserMode::AddFile)); } if let Some(path) = trimmed.strip_prefix(DELETE_FILE_MARKER) { + ensure_update_hunk_is_not_empty(hunks)?; hunks.push(DeleteFile { path: PathBuf::from(path), }); - return Some(StreamingParserMode::DeleteFile); + return Ok(Some(StreamingParserMode::DeleteFile)); } if let Some(path) = trimmed.strip_prefix(UPDATE_FILE_MARKER) { + ensure_update_hunk_is_not_empty(hunks)?; hunks.push(UpdateFile { path: PathBuf::from(path), move_path: None, chunks: Vec::new(), }); - return Some(StreamingParserMode::UpdateFile); + return Ok(Some(StreamingParserMode::UpdateFile)); } - None + Ok(None) +} + +fn ensure_update_hunk_is_not_empty(hunks: &[Hunk]) -> Result<(), String> { + if let Some(UpdateFile { chunks, .. }) = hunks.last() + && (chunks.is_empty() + || chunks + .last() + .is_some_and(|chunk| chunk.old_lines.is_empty() && chunk.new_lines.is_empty())) + { + return Err("Update hunk does not contain any lines".to_string()); + } + Ok(()) } impl StreamingPatchParser { - pub fn push_delta(&mut self, delta: &str) -> Result>, ParseError> { + pub fn push_delta(&mut self, delta: &str) -> Result, ParseError> { for ch in delta.chars() { if ch == '\n' { let line = std::mem::take(&mut self.line_buffer); @@ -85,8 +101,21 @@ impl StreamingPatchParser { } } - let hunks = self.state.hunks.clone(); - Ok(if hunks.is_empty() { None } else { Some(hunks) }) + Ok(self.state.hunks.clone()) + } + + pub fn finish(&mut self) -> Result, ParseError> { + if !self.line_buffer.is_empty() { + self.push_delta("\n")?; + } + + if !matches!(self.state.mode, StreamingParserMode::EndedPatch) { + return Err(InvalidPatchError( + "The last line of the patch must be '*** End Patch'".to_string(), + )); + } + + Ok(self.state.hunks.clone()) } fn process_line( @@ -112,7 +141,14 @@ impl StreamingPatchParser { )); } StreamingParserMode::StartedPatch => { - if let Some(mode) = handle_hunk_headers_and_end_patch(trimmed, &mut hunks) { + if let Some(mode) = + handle_hunk_headers_and_end_patch(trimmed, &mut hunks).map_err(|message| { + InvalidHunkError { + message, + line_number, + } + })? + { return Ok(StreamingParserState { mode, hunks }); } return Err(InvalidHunkError { @@ -123,7 +159,14 @@ impl StreamingPatchParser { }); } StreamingParserMode::AddFile => { - if let Some(mode) = handle_hunk_headers_and_end_patch(trimmed, &mut hunks) { + if let Some(mode) = + handle_hunk_headers_and_end_patch(trimmed, &mut hunks).map_err(|message| { + InvalidHunkError { + message, + line_number, + } + })? + { return Ok(StreamingParserState { mode, hunks }); } if let Some(line_to_add) = line.strip_prefix('+') @@ -144,7 +187,14 @@ impl StreamingPatchParser { }); } StreamingParserMode::DeleteFile => { - if let Some(mode) = handle_hunk_headers_and_end_patch(trimmed, &mut hunks) { + if let Some(mode) = + handle_hunk_headers_and_end_patch(trimmed, &mut hunks).map_err(|message| { + InvalidHunkError { + message, + line_number, + } + })? + { return Ok(StreamingParserState { mode, hunks }); } return Err(InvalidHunkError { @@ -155,7 +205,13 @@ impl StreamingPatchParser { }); } StreamingParserMode::UpdateFile => { - if let Some(mode) = handle_hunk_headers_and_end_patch(trimmed, &mut hunks) { + let update_line = line.trim_end(); + if let Some(mode) = handle_hunk_headers_and_end_patch(update_line, &mut hunks) + .map_err(|message| InvalidHunkError { + message, + line_number, + })? + { return Ok(StreamingParserState { mode, hunks }); } @@ -165,7 +221,7 @@ impl StreamingPatchParser { { if chunks.is_empty() && move_path.is_none() - && let Some(move_to_path) = line.trim().strip_prefix(MOVE_TO_MARKER) + && let Some(move_to_path) = update_line.strip_prefix(MOVE_TO_MARKER) { *move_path = Some(PathBuf::from(move_to_path)); return Ok(StreamingParserState { @@ -174,7 +230,7 @@ impl StreamingPatchParser { }); } - match line.trim() { + match update_line { EMPTY_CHANGE_CONTEXT_MARKER => { chunks.push(UpdateFileChunk { change_context: None, @@ -203,7 +259,7 @@ impl StreamingPatchParser { } } - if trimmed == EOF_MARKER { + if update_line == EOF_MARKER { if let Some(chunk) = chunks.last_mut() { chunk.is_end_of_file = true; } @@ -213,45 +269,78 @@ impl StreamingPatchParser { }); } - let chunk = if chunks.is_empty() { - chunks.push(UpdateFileChunk { - change_context: None, - old_lines: Vec::new(), - new_lines: Vec::new(), - is_end_of_file: false, - }); - chunks.last_mut() - } else { - chunks.last_mut() - }; - if let Some(chunk) = chunk { - let parsed_update_line = match line.chars().next() { - None => { - chunk.old_lines.push(String::new()); - chunk.new_lines.push(String::new()); - true - } - Some(' ') => { - chunk.old_lines.push(line[1..].to_string()); - chunk.new_lines.push(line[1..].to_string()); - true - } - Some('+') => { - chunk.new_lines.push(line[1..].to_string()); - true - } - Some('-') => { - chunk.old_lines.push(line[1..].to_string()); - true - } - Some(_) => false, - }; - if parsed_update_line { - return Ok(StreamingParserState { - mode: StreamingParserMode::UpdateFile, - hunks, + if line.is_empty() { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, }); } + if let Some(chunk) = chunks.last_mut() { + chunk.old_lines.push(String::new()); + chunk.new_lines.push(String::new()); + } + return Ok(StreamingParserState { + mode: StreamingParserMode::UpdateFile, + hunks, + }); + } + + if let Some(line_to_add) = line.strip_prefix(' ') { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.old_lines.push(line_to_add.to_string()); + chunk.new_lines.push(line_to_add.to_string()); + } + return Ok(StreamingParserState { + mode: StreamingParserMode::UpdateFile, + hunks, + }); + } + + if let Some(line_to_add) = line.strip_prefix('+') { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.new_lines.push(line_to_add.to_string()); + } + return Ok(StreamingParserState { + mode: StreamingParserMode::UpdateFile, + hunks, + }); + } + + if let Some(line_to_remove) = line.strip_prefix('-') { + if chunks.is_empty() { + chunks.push(UpdateFileChunk { + change_context: None, + old_lines: Vec::new(), + new_lines: Vec::new(), + is_end_of_file: false, + }); + } + if let Some(chunk) = chunks.last_mut() { + chunk.old_lines.push(line_to_remove.to_string()); + } + return Ok(StreamingParserState { + mode: StreamingParserMode::UpdateFile, + hunks, + }); } } return Err(InvalidHunkError { @@ -279,17 +368,17 @@ mod tests { let mut parser = StreamingPatchParser::default(); assert_eq!( parser.push_delta("*** Begin Patch\n*** Add File: src/hello.txt\n+hello\n+wor"), - Ok(Some(vec![AddFile { + Ok(vec![AddFile { path: PathBuf::from("src/hello.txt"), contents: "hello\n".to_string(), - }])) + }]) ); assert_eq!( parser.push_delta("ld\n"), - Ok(Some(vec![AddFile { + Ok(vec![AddFile { path: PathBuf::from("src/hello.txt"), contents: "hello\nworld\n".to_string(), - }])) + }]) ); let mut parser = StreamingPatchParser::default(); @@ -297,7 +386,7 @@ mod tests { parser.push_delta( "*** Begin Patch\n*** Update File: src/old.rs\n*** Move to: src/new.rs\n@@\n-old\n+new\n", ), - Ok(Some(vec![UpdateFile { + Ok(vec![UpdateFile { path: PathBuf::from("src/old.rs"), move_path: Some(PathBuf::from("src/new.rs")), chunks: vec![UpdateFileChunk { @@ -306,19 +395,19 @@ mod tests { new_lines: vec!["new".to_string()], is_end_of_file: false, }], - }])) + }]) ); let mut parser = StreamingPatchParser::default(); assert_eq!( parser.push_delta("*** Begin Patch\n*** Delete File: gone.txt"), - Ok(None) + Ok(Vec::new()) ); assert_eq!( parser.push_delta("\n"), - Ok(Some(vec![DeleteFile { + Ok(vec![DeleteFile { path: PathBuf::from("gone.txt"), - }])) + }]) ); let mut parser = StreamingPatchParser::default(); @@ -326,7 +415,7 @@ mod tests { parser.push_delta( "*** Begin Patch\n*** Add File: src/one.txt\n+one\n*** Delete File: src/two.txt\n", ), - Ok(Some(vec![ + Ok(vec![ AddFile { path: PathBuf::from("src/one.txt"), contents: "one\n".to_string(), @@ -334,7 +423,7 @@ mod tests { DeleteFile { path: PathBuf::from("src/two.txt"), }, - ])) + ]) ); } @@ -386,7 +475,8 @@ mod tests { let mut saw_hunk_counts = Vec::new(); let mut hunks = Vec::new(); for ch in patch.chars() { - if let Some(updated_hunks) = parser.push_delta(&ch.to_string()).unwrap() { + let updated_hunks = parser.push_delta(&ch.to_string()).unwrap(); + if !updated_hunks.is_empty() { let hunk_count = updated_hunks.len(); assert!( hunk_count >= max_hunk_count, @@ -428,6 +518,120 @@ mod tests { ); } + #[test] + fn test_streaming_patch_parser_keeps_indented_update_markers_as_context_lines() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "\ +*** Begin Patch +*** Update File: a.txt +@@ +-old a ++new a + *** Update File: b.txt +@@ +-old b ++new b +*** End Patch +", + ), + Ok(vec![UpdateFile { + path: PathBuf::from("a.txt"), + move_path: None, + chunks: vec![ + UpdateFileChunk { + change_context: None, + old_lines: vec!["old a".to_string(), "*** Update File: b.txt".to_string()], + new_lines: vec!["new a".to_string(), "*** Update File: b.txt".to_string()], + is_end_of_file: false, + }, + UpdateFileChunk { + change_context: None, + old_lines: vec!["old b".to_string()], + new_lines: vec!["new b".to_string()], + is_end_of_file: false, + }, + ], + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_preserves_bare_empty_update_lines() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "\ +*** Begin Patch +*** Update File: file.txt +@@ + context before + + context after +*** End Patch +", + ), + Ok(vec![UpdateFile { + path: PathBuf::from("file.txt"), + move_path: None, + chunks: vec![UpdateFileChunk { + change_context: None, + // The normal parser treats a bare empty line in an update hunk as an + // empty context line. Preserve that leniency in the streaming parser. + old_lines: vec![ + "context before".to_string(), + String::new(), + "context after".to_string(), + ], + new_lines: vec![ + "context before".to_string(), + String::new(), + "context after".to_string(), + ], + is_end_of_file: false, + }], + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_finish_processes_final_line_without_newline() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Add File: file.txt\n+hello\n*** End Patch"), + Ok(vec![AddFile { + path: PathBuf::from("file.txt"), + contents: "hello\n".to_string(), + }]) + ); + assert_eq!( + parser.finish(), + Ok(vec![AddFile { + path: PathBuf::from("file.txt"), + contents: "hello\n".to_string(), + }]) + ); + } + + #[test] + fn test_streaming_patch_parser_finish_requires_end_patch() { + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Add File: file.txt\n+hello\n"), + Ok(vec![AddFile { + path: PathBuf::from("file.txt"), + contents: "hello\n".to_string(), + }]) + ); + assert_eq!( + parser.finish(), + Err(InvalidPatchError( + "The last line of the patch must be '*** End Patch'".to_string(), + )) + ); + } + #[test] fn test_streaming_patch_parser_returns_errors() { let mut parser = StreamingPatchParser::default(); @@ -439,7 +643,7 @@ mod tests { ); let mut parser = StreamingPatchParser::default(); - assert_eq!(parser.push_delta("*** Begin Patch\n"), Ok(None)); + assert_eq!(parser.push_delta("*** Begin Patch\n"), Ok(Vec::new())); assert_eq!( parser.push_delta("bad\n"), Err(InvalidHunkError { @@ -469,5 +673,34 @@ mod tests { line_number: 3, }) ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n*** End Patch\n"), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 3, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta( + "*** Begin Patch\n*** Update File: old.txt\n*** Move to: new.txt\n*** Delete File: other.txt\n", + ), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 4, + }) + ); + + let mut parser = StreamingPatchParser::default(); + assert_eq!( + parser.push_delta("*** Begin Patch\n*** Update File: file.txt\n@@\n*** End Patch\n"), + Err(InvalidHunkError { + message: "Update hunk does not contain any lines".to_string(), + line_number: 4, + }) + ); } } diff --git a/codex-rs/apply-patch/tests/suite/corpus_compare.rs b/codex-rs/apply-patch/tests/suite/corpus_compare.rs deleted file mode 100644 index 7b6382bd70..0000000000 --- a/codex-rs/apply-patch/tests/suite/corpus_compare.rs +++ /dev/null @@ -1,173 +0,0 @@ -use codex_apply_patch::Hunk; -use codex_apply_patch::ParseError; -use codex_apply_patch::StreamingPatchParser; -use codex_apply_patch::parse_patch; - -#[derive(Debug, PartialEq)] -enum CompareResult { - Match, - Mismatch { - legacy: Result, ParseError>, - streaming: Result, ParseError>, - }, -} - -fn parse_with_streaming_parser(patch: &str) -> Result, ParseError> { - let mut parser = StreamingPatchParser::default(); - let mut last_hunks = None; - if let Some(hunks) = parser.push_delta(patch)? { - last_hunks = Some(hunks); - } - if !patch.ends_with('\n') - && let Some(hunks) = parser.push_delta("\n")? - { - last_hunks = Some(hunks); - } - Ok(last_hunks.unwrap_or_default()) -} - -fn compare_patch_outputs(patch: &str) -> CompareResult { - let legacy = parse_patch(patch).map(|args| args.hunks); - let streaming = parse_with_streaming_parser(patch); - if legacy == streaming { - CompareResult::Match - } else { - CompareResult::Mismatch { legacy, streaming } - } -} - -#[test] -fn reduced_repros_document_current_parser_mismatches() { - let cases = [ - ( - "empty update hunk is accepted by streaming parser", - "\ -*** Begin Patch -*** Update File: foo.txt -*** End Patch", - ), - ( - "trailing empty update chunk before end patch is accepted", - "\ -*** Begin Patch -*** Update File: foo.txt -@@ --old -+new -@@ -*** End Patch", - ), - ( - "trimmed nested add-file header inside update content is misparsed as syntax", - "\ -*** Begin Patch -*** Update File: foo.txt -@@ --old -+new - *** Add File: nested.txt - +hello - *** End Patch -*** End Patch", - ), - ( - "trimmed nested context marker inside update content starts a new chunk", - "\ -*** Begin Patch -*** Update File: foo.txt -@@ - line before - @@ nested --line after -+line after new -*** End Patch", - ), - ( - "trimmed nested end marker inside update content ends the patch early", - "\ -*** Begin Patch -*** Update File: foo.txt -@@ --old -+new - *** End Patch - tail -*** End Patch", - ), - ( - "move-only update hunk is accepted and next hunk continues", - "\ -*** Begin Patch -*** Update File: old.txt -*** Move to: new.txt -*** Update File: other.txt -@@ --before -+after -*** End Patch", - ), - ]; - - for (name, patch) in cases { - let result = compare_patch_outputs(patch); - assert!( - matches!(result, CompareResult::Mismatch { .. }), - "{name}: expected mismatch, got {result:?}" - ); - } -} - -#[test] -fn reduced_repro_for_indented_update_header_both_parsers_succeed_but_disagree() { - let patch = "\ -*** Begin Patch -*** Update File: a.txt -@@ --old a -+new a - *** Update File: b.txt -@@ --old b -+new b -*** End Patch"; - - match compare_patch_outputs(patch) { - CompareResult::Mismatch { - legacy: Ok(legacy), - streaming: Ok(streaming), - } => { - assert_eq!(legacy.len(), 1); - assert_eq!(streaming.len(), 2); - - match &legacy[..] { - [Hunk::UpdateFile { path, chunks, .. }] => { - assert_eq!(path.to_string_lossy(), "a.txt"); - assert_eq!(chunks.len(), 2); - } - other => panic!("unexpected legacy parse result: {other:?}"), - } - - match &streaming[..] { - [ - Hunk::UpdateFile { - path: first_path, - chunks: first_chunks, - .. - }, - Hunk::UpdateFile { - path: second_path, - chunks: second_chunks, - .. - }, - ] => { - assert_eq!(first_path.to_string_lossy(), "a.txt"); - assert_eq!(second_path.to_string_lossy(), "b.txt"); - assert_eq!(first_chunks.len(), 1); - assert_eq!(second_chunks.len(), 1); - } - other => panic!("unexpected streaming parse result: {other:?}"), - } - } - other => panic!("expected both parsers to succeed with different hunks, got {other:?}"), - } -} diff --git a/codex-rs/apply-patch/tests/suite/mod.rs b/codex-rs/apply-patch/tests/suite/mod.rs index 357cd40491..7d54de85ad 100644 --- a/codex-rs/apply-patch/tests/suite/mod.rs +++ b/codex-rs/apply-patch/tests/suite/mod.rs @@ -1,5 +1,4 @@ mod cli; -mod corpus_compare; mod scenarios; #[cfg(not(target_os = "windows"))] mod tool; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index db5df955d5..60383de8e1 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1947,7 +1947,7 @@ async fn try_run_sampling_request( ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take() - && let Some(event) = consumer.flush_on_complete() + && let Ok(Some(event)) = consumer.finish() { sess.send_event(&turn_context, event).await; } diff --git a/codex-rs/core/src/tools/handlers/apply_patch.rs b/codex-rs/core/src/tools/handlers/apply_patch.rs index a3ee332e9a..ea7c6ca2ae 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch.rs @@ -75,15 +75,18 @@ impl ToolArgumentDiffConsumer for ApplyPatchArgumentDiffConsumer { .map(EventMsg::PatchApplyUpdated) } - fn flush_on_complete(&mut self) -> Option { - self.flush_update_on_complete() - .map(EventMsg::PatchApplyUpdated) + fn finish(&mut self) -> Result, FunctionCallError> { + self.finish_update_on_complete() + .map(|event| event.map(EventMsg::PatchApplyUpdated)) } } impl ApplyPatchArgumentDiffConsumer { fn push_delta(&mut self, call_id: String, delta: &str) -> Option { - let hunks = self.parser.push_delta(delta).ok()??; + let hunks = self.parser.push_delta(delta).ok()?; + if hunks.is_empty() { + return None; + } let changes = convert_apply_patch_hunks_to_protocol(&hunks); let event = PatchApplyUpdatedEvent { call_id, changes }; let now = Instant::now(); @@ -102,12 +105,18 @@ impl ApplyPatchArgumentDiffConsumer { } } - fn flush_update_on_complete(&mut self) -> Option { + fn finish_update_on_complete( + &mut self, + ) -> Result, FunctionCallError> { + self.parser.finish().map_err(|err| { + FunctionCallError::RespondToModel(format!("failed to parse apply_patch: {err}")) + })?; + let event = self.pending.take(); if event.is_some() { self.last_sent_at = Some(Instant::now()); } - event + Ok(event) } } diff --git a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs index dbec5a870a..04472e4623 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs @@ -147,24 +147,16 @@ fn diff_consumer_streams_apply_patch_changes() { .push_delta("call-1".to_string(), "\n+world") .is_none() ); - - let event = consumer.flush_update_on_complete().expect("progress event"); - assert_eq!( - (event.call_id, event.changes), - ( - "call-1".to_string(), - HashMap::from([( - PathBuf::from("hello.txt"), - FileChange::Add { - content: "hello\n".to_string(), - }, - )]), - ) + assert!( + consumer + .push_delta("call-1".to_string(), "\n*** End Patch") + .is_none() ); - assert!(consumer.push_delta("call-1".to_string(), "\n").is_none()); - - let event = consumer.flush_update_on_complete().expect("progress event"); + let event = consumer + .finish_update_on_complete() + .expect("finish parser") + .expect("progress event"); assert_eq!( (event.call_id, event.changes), ( diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 08bd21548c..60309618e7 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -97,9 +97,9 @@ pub(crate) trait ToolArgumentDiffConsumer: Send { fn consume_diff(&mut self, turn: &TurnContext, call_id: String, diff: &str) -> Option; - /// Flush any buffered event before the tool call completes. - fn flush_on_complete(&mut self) -> Option { - None + /// Finish consuming argument diffs before the tool call completes. + fn finish(&mut self) -> Result, FunctionCallError> { + Ok(None) } }