diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index b0eca67855..f165dd7c45 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -993,6 +993,32 @@ ], "type": "object" }, + "FileChangePatchUpdatedNotification": { + "properties": { + "changes": { + "items": { + "$ref": "#/definitions/FileUpdateChange" + }, + "type": "array" + }, + "itemId": { + "type": "string" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": "string" + } + }, + "required": [ + "changes", + "itemId", + "threadId", + "turnId" + ], + "type": "object" + }, "FileUpdateChange": { "properties": { "diff": { @@ -4668,6 +4694,26 @@ "title": "Item/fileChange/outputDeltaNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/fileChange/patchUpdated" + ], + "title": "Item/fileChange/patchUpdatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/FileChangePatchUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/fileChange/patchUpdatedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 8d95c081dc..7f645dfd3d 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -4290,6 +4290,26 @@ "title": "Item/fileChange/outputDeltaNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/fileChange/patchUpdated" + ], + "title": "Item/fileChange/patchUpdatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/FileChangePatchUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/fileChange/patchUpdatedNotification", + "type": "object" + }, { "properties": { "method": { @@ -7985,6 +8005,34 @@ "title": "FileChangeOutputDeltaNotification", "type": "object" }, + "FileChangePatchUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "changes": { + "items": { + "$ref": "#/definitions/v2/FileUpdateChange" + }, + "type": "array" + }, + "itemId": { + "type": "string" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": "string" + } + }, + "required": [ + "changes", + "itemId", + "threadId", + "turnId" + ], + "title": "FileChangePatchUpdatedNotification", + "type": "object" + }, "FileUpdateChange": { "properties": { "diff": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 7dc8afb75e..d00378de51 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -4342,6 +4342,34 @@ "title": "FileChangeOutputDeltaNotification", "type": "object" }, + "FileChangePatchUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "changes": { + "items": { + "$ref": "#/definitions/FileUpdateChange" + }, + "type": "array" + }, + "itemId": { + "type": "string" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": "string" + } + }, + "required": [ + "changes", + "itemId", + "threadId", + "turnId" + ], + "title": "FileChangePatchUpdatedNotification", + "type": "object" + }, "FileUpdateChange": { "properties": { "diff": { @@ -9640,6 +9668,26 @@ "title": "Item/fileChange/outputDeltaNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/fileChange/patchUpdated" + ], + "title": "Item/fileChange/patchUpdatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/FileChangePatchUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/fileChange/patchUpdatedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/FileChangePatchUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/FileChangePatchUpdatedNotification.json new file mode 100644 index 0000000000..0ae44aa961 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/FileChangePatchUpdatedNotification.json @@ -0,0 +1,107 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "FileUpdateChange": { + "properties": { + "diff": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/PatchChangeKind" + }, + "path": { + "type": "string" + } + }, + "required": [ + "diff", + "kind", + "path" + ], + "type": "object" + }, + "PatchChangeKind": { + "oneOf": [ + { + "properties": { + "type": { + "enum": [ + "add" + ], + "title": "AddPatchChangeKindType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "AddPatchChangeKind", + "type": "object" + }, + { + "properties": { + "type": { + "enum": [ + "delete" + ], + "title": "DeletePatchChangeKindType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "DeletePatchChangeKind", + "type": "object" + }, + { + "properties": { + "move_path": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "update" + ], + "title": "UpdatePatchChangeKindType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "UpdatePatchChangeKind", + "type": "object" + } + ] + } + }, + "properties": { + "changes": { + "items": { + "$ref": "#/definitions/FileUpdateChange" + }, + "type": "array" + }, + "itemId": { + "type": "string" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": "string" + } + }, + "required": [ + "changes", + "itemId", + "threadId", + "turnId" + ], + "title": "FileChangePatchUpdatedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index a43bb80428..f55700bf54 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -16,6 +16,7 @@ import type { DeprecationNoticeNotification } from "./v2/DeprecationNoticeNotifi import type { ErrorNotification } from "./v2/ErrorNotification"; import type { ExternalAgentConfigImportCompletedNotification } from "./v2/ExternalAgentConfigImportCompletedNotification"; import type { FileChangeOutputDeltaNotification } from "./v2/FileChangeOutputDeltaNotification"; +import type { FileChangePatchUpdatedNotification } from "./v2/FileChangePatchUpdatedNotification"; import type { FsChangedNotification } from "./v2/FsChangedNotification"; import type { HookCompletedNotification } from "./v2/HookCompletedNotification"; import type { HookStartedNotification } from "./v2/HookStartedNotification"; @@ -61,4 +62,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "externalAgentConfig/import/completed", "params": ExternalAgentConfigImportCompletedNotification } | { "method": "fs/changed", "params": FsChangedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "warning", "params": WarningNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcript/delta", "params": ThreadRealtimeTranscriptDeltaNotification } | { "method": "thread/realtime/transcript/done", "params": ThreadRealtimeTranscriptDoneNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/sdp", "params": ThreadRealtimeSdpNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/fileChange/patchUpdated", "params": FileChangePatchUpdatedNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "externalAgentConfig/import/completed", "params": ExternalAgentConfigImportCompletedNotification } | { "method": "fs/changed", "params": FsChangedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "warning", "params": WarningNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcript/delta", "params": ThreadRealtimeTranscriptDeltaNotification } | { "method": "thread/realtime/transcript/done", "params": ThreadRealtimeTranscriptDoneNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/sdp", "params": ThreadRealtimeSdpNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/FileChangePatchUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/FileChangePatchUpdatedNotification.ts new file mode 100644 index 0000000000..4a4ed92753 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/FileChangePatchUpdatedNotification.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { FileUpdateChange } from "./FileUpdateChange"; + +export type FileChangePatchUpdatedNotification = { threadId: string, turnId: string, itemId: string, changes: Array, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index e3095a6312..e37759112a 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -98,6 +98,7 @@ export type { FeedbackUploadParams } from "./FeedbackUploadParams"; export type { FeedbackUploadResponse } from "./FeedbackUploadResponse"; export type { FileChangeApprovalDecision } from "./FileChangeApprovalDecision"; export type { FileChangeOutputDeltaNotification } from "./FileChangeOutputDeltaNotification"; +export type { FileChangePatchUpdatedNotification } from "./FileChangePatchUpdatedNotification"; export type { FileChangeRequestApprovalParams } from "./FileChangeRequestApprovalParams"; export type { FileChangeRequestApprovalResponse } from "./FileChangeRequestApprovalResponse"; export type { FileSystemAccessMode } from "./FileSystemAccessMode"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 2c2aad3c66..6253315610 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1023,6 +1023,7 @@ server_notification_definitions! { CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification), TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification), FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification), + FileChangePatchUpdated => "item/fileChange/patchUpdated" (v2::FileChangePatchUpdatedNotification), ServerRequestResolved => "serverRequest/resolved" (v2::ServerRequestResolvedNotification), McpToolCallProgress => "item/mcpToolCall/progress" (v2::McpToolCallProgressNotification), McpServerOauthLoginCompleted => "mcpServer/oauthLogin/completed" (v2::McpServerOauthLoginCompletedNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 46e94de97a..77f339db53 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -5935,6 +5935,16 @@ pub struct FileChangeOutputDeltaNotification { pub delta: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct FileChangePatchUpdatedNotification { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub changes: Vec, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index b2d30d8653..a4c8eac300 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -1069,6 +1069,7 @@ There are additional item-specific events: #### fileChange +- `item/fileChange/patchUpdated` - when `features.apply_patch_streaming_events` is enabled, streams structured file-change snapshots parsed from the model-generated patch before it is executed. - `item/fileChange/outputDelta` - contains the tool call response of the underlying `apply_patch` tool call. ### Errors diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 44fa288061..4701eb3c18 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -40,6 +40,7 @@ use codex_app_server_protocol::ExecCommandApprovalResponse; use codex_app_server_protocol::ExecPolicyAmendment as V2ExecPolicyAmendment; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeOutputDeltaNotification; +use codex_app_server_protocol::FileChangePatchUpdatedNotification; use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::FileUpdateChange; @@ -1636,6 +1637,17 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } } + EventMsg::PatchApplyUpdated(event) => { + let notification = FileChangePatchUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + item_id: event.call_id, + changes: convert_patch_changes(&event.changes), + }; + outgoing + .send_server_notification(ServerNotification::FileChangePatchUpdated(notification)) + .await; + } EventMsg::PatchApplyEnd(patch_end_event) => { // Until we migrate the core to be aware of a first class FileChangeItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 3640666679..65f8442b0d 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -24,6 +24,7 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeOutputDeltaNotification; +use codex_app_server_protocol::FileChangePatchUpdatedNotification; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; @@ -2026,6 +2027,248 @@ async fn turn_start_file_change_approval_v2() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_does_not_stream_apply_patch_change_updates_without_feature_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let call_id = "patch-call"; + let item_id = "fc-patch-call"; + let patch = "*** Begin Patch\n*** Add File: live.txt\n+live line\n*** End Patch\n"; + let patch_delta_1 = "*** Begin Patch\n*** Add File: live.txt\n+live"; + let patch_delta_2 = " line\n*** End Patch\n"; + let responses = vec![ + responses::sse(vec![ + responses::ev_response_created("resp-1"), + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "custom_tool_call", + "id": item_id, + "call_id": call_id, + "name": "apply_patch", + "input": "", + "status": "in_progress" + } + }), + serde_json::json!({ + "type": "response.custom_tool_call_input.delta", + "item_id": item_id, + "call_id": call_id, + "delta": patch_delta_1, + }), + serde_json::json!({ + "type": "response.custom_tool_call_input.delta", + "item_id": item_id, + "call_id": call_id, + "delta": patch_delta_2, + }), + responses::ev_apply_patch_custom_tool_call(call_id, patch), + responses::ev_completed("resp-1"), + ]), + create_final_assistant_message_sse_response("patch applied")?, + ]; + let server = create_mock_responses_server_sequence(responses).await; + create_config_toml(&codex_home, &server.uri(), "never", &BTreeMap::default())?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id, + input: vec![V2UserInput::Text { + text: "apply patch".into(), + text_elements: Vec::new(), + }], + cwd: Some(workspace), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + assert!( + !mcp.pending_notification_methods() + .iter() + .any(|method| method == "item/fileChange/patchUpdated") + ); + + Ok(()) +} + +#[tokio::test] +async fn turn_start_streams_apply_patch_change_updates_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let call_id = "patch-call"; + let item_id = "fc-patch-call"; + let patch = "*** Begin Patch\n*** Add File: live.txt\n+live line\n*** End Patch\n"; + let patch_delta_1 = "*** Begin Patch\n*** Add File: live.txt\n+live"; + let patch_delta_2 = " line\n*** End Patch\n"; + let responses = vec![ + responses::sse(vec![ + responses::ev_response_created("resp-1"), + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "function_call", + "id": "fc-other-call", + "call_id": "other-call", + "name": "not_apply_patch", + "arguments": "", + "status": "in_progress" + } + }), + serde_json::json!({ + "type": "response.function_call_arguments.delta", + "item_id": "fc-other-call", + "delta": r#"{"input":"*** Begin Patch\n*** Add File: ignored.txt\n+ignored"#, + }), + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "custom_tool_call", + "id": item_id, + "call_id": call_id, + "name": "apply_patch", + "input": "", + "status": "in_progress" + } + }), + serde_json::json!({ + "type": "response.custom_tool_call_input.delta", + "item_id": item_id, + "call_id": call_id, + "delta": patch_delta_1, + }), + serde_json::json!({ + "type": "response.custom_tool_call_input.delta", + "item_id": item_id, + "call_id": call_id, + "delta": patch_delta_2, + }), + responses::ev_apply_patch_custom_tool_call(call_id, patch), + responses::ev_completed("resp-1"), + ]), + create_final_assistant_message_sse_response("patch applied")?, + ]; + let server = create_mock_responses_server_sequence(responses).await; + create_config_toml( + &codex_home, + &server.uri(), + "never", + &BTreeMap::from([ + (Feature::ApplyPatchFreeform, true), + (Feature::ApplyPatchStreamingEvents, true), + (Feature::Plugins, false), + (Feature::RemoteModels, false), + (Feature::ShellSnapshot, false), + ]), + )?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "apply patch".into(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let mut streamed_content = String::new(); + while streamed_content != "live line\n" { + let delta_notif = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("item/fileChange/patchUpdated"), + ) + .await??; + let delta: FileChangePatchUpdatedNotification = serde_json::from_value( + delta_notif + .params + .clone() + .expect("item/fileChange/patchUpdated params"), + )?; + assert_eq!(delta.thread_id, thread.id); + assert_eq!(delta.turn_id, turn.id); + assert_eq!(delta.item_id, call_id); + let change = delta + .changes + .iter() + .find(|change| change.path == "live.txt") + .expect("live.txt change"); + assert!(matches!(change.kind, PatchChangeKind::Add)); + streamed_content = change.diff.clone(); + } + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + #[tokio::test] async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 75d1bdea7f..efbcca6716 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1925,7 +1925,11 @@ async fn try_run_sampling_request( match event { ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { - active_tool_argument_diff_consumer = None; + if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take() + && let Some(event) = consumer.flush_on_complete() + { + sess.send_event(&turn_context, event).await; + } let previously_active_item = active_item.take(); if let Some(previous) = previously_active_item.as_ref() && matches!(previous, TurnItem::AgentMessage(_)) diff --git a/codex-rs/core/src/tools/handlers/apply_patch.rs b/codex-rs/core/src/tools/handlers/apply_patch.rs index ac8980027b..f723e57e5b 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use crate::apply_patch; use crate::apply_patch::InternalApplyPatchInvocation; @@ -43,11 +45,15 @@ use codex_sandboxing::policy_transforms::normalize_additional_permissions; use codex_tools::ApplyPatchToolArgs; use codex_utils_absolute_path::AbsolutePathBuf; +const APPLY_PATCH_ARGUMENT_DIFF_BUFFER_INTERVAL: Duration = Duration::from_millis(500); + pub struct ApplyPatchHandler; #[derive(Default)] struct ApplyPatchArgumentDiffConsumer { parser: StreamingPatchParser, + last_sent_at: Option, + pending: Option, } impl ToolArgumentDiffConsumer for ApplyPatchArgumentDiffConsumer { @@ -64,13 +70,40 @@ impl ToolArgumentDiffConsumer for ApplyPatchArgumentDiffConsumer { self.push_delta(call_id, diff) .map(EventMsg::PatchApplyUpdated) } + + fn flush_on_complete(&mut self) -> Option { + self.flush_update_on_complete() + .map(EventMsg::PatchApplyUpdated) + } } impl ApplyPatchArgumentDiffConsumer { fn push_delta(&mut self, call_id: String, delta: &str) -> Option { let hunks = self.parser.push_delta(delta).ok()??; let changes = convert_apply_patch_hunks_to_protocol(&hunks); - Some(PatchApplyUpdatedEvent { call_id, changes }) + let event = PatchApplyUpdatedEvent { call_id, changes }; + let now = Instant::now(); + match self.last_sent_at { + Some(last_sent_at) + if now.duration_since(last_sent_at) < APPLY_PATCH_ARGUMENT_DIFF_BUFFER_INTERVAL => + { + self.pending = Some(event); + None + } + Some(_) | None => { + self.pending = None; + self.last_sent_at = Some(now); + Some(event) + } + } + } + + fn flush_update_on_complete(&mut self) -> Option { + let event = self.pending.take(); + if event.is_some() { + self.last_sent_at = Some(Instant::now()); + } + event } } diff --git a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs index 1aaa756123..aaa5dec2a1 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs @@ -54,9 +54,13 @@ fn diff_consumer_streams_apply_patch_changes() { ) ); - let event = consumer - .push_delta("call-1".to_string(), "\n+world") - .expect("progress event"); + assert!( + consumer + .push_delta("call-1".to_string(), "\n+world") + .is_none() + ); + + let event = consumer.flush_update_on_complete().expect("progress event"); assert_eq!( (event.call_id, event.changes), ( @@ -70,9 +74,9 @@ fn diff_consumer_streams_apply_patch_changes() { ) ); - let event = consumer - .push_delta("call-1".to_string(), "\n") - .expect("progress event"); + assert!(consumer.push_delta("call-1".to_string(), "\n").is_none()); + + let event = consumer.flush_update_on_complete().expect("progress event"); assert_eq!( (event.call_id, event.changes), ( @@ -87,6 +91,39 @@ fn diff_consumer_streams_apply_patch_changes() { ); } +#[test] +fn diff_consumer_sends_next_update_after_buffer_interval() { + let mut consumer = ApplyPatchArgumentDiffConsumer::default(); + consumer.push_delta("call-1".to_string(), "*** Begin Patch\n"); + let first = consumer + .push_delta("call-1".to_string(), "*** Add File: hello.txt\n+hello") + .expect("first progress event"); + assert_eq!( + first.changes, + HashMap::from([( + PathBuf::from("hello.txt"), + FileChange::Add { + content: String::new(), + }, + )]) + ); + + consumer.last_sent_at = + Some(std::time::Instant::now() - APPLY_PATCH_ARGUMENT_DIFF_BUFFER_INTERVAL); + let second = consumer + .push_delta("call-1".to_string(), "\n+world") + .expect("second progress event"); + assert_eq!( + second.changes, + HashMap::from([( + PathBuf::from("hello.txt"), + FileChange::Add { + content: "hello\n".to_string(), + }, + )]) + ); +} + #[tokio::test] async fn approval_keys_include_move_destination() { let tmp = TempDir::new().expect("tmp"); diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 0e0b7b36dd..4c0e3fbea8 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -95,6 +95,11 @@ pub(crate) trait ToolArgumentDiffConsumer: Send { /// Consume the next argument diff for a tool call. fn consume_diff(&mut self, turn: &TurnContext, call_id: String, diff: &str) -> Option; + + /// Flush any buffered event before the tool call completes. + fn flush_on_complete(&mut self) -> Option { + None + } } pub(crate) struct AnyToolResult { diff --git a/codex-rs/tui/src/app/app_server_adapter.rs b/codex-rs/tui/src/app/app_server_adapter.rs index f0abb5326e..c700e5e14f 100644 --- a/codex-rs/tui/src/app/app_server_adapter.rs +++ b/codex-rs/tui/src/app/app_server_adapter.rs @@ -380,6 +380,9 @@ fn server_notification_thread_target( ServerNotification::FileChangeOutputDelta(notification) => { Some(notification.thread_id.as_str()) } + ServerNotification::FileChangePatchUpdated(notification) => { + Some(notification.thread_id.as_str()) + } ServerNotification::ServerRequestResolved(notification) => { Some(notification.thread_id.as_str()) } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 4147eb797f..01b345aa7b 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -6540,6 +6540,7 @@ impl ChatWidget { | ServerNotification::ThreadUnarchived(_) | ServerNotification::RawResponseItemCompleted(_) | ServerNotification::CommandExecOutputDelta(_) + | ServerNotification::FileChangePatchUpdated(_) | ServerNotification::McpToolCallProgress(_) | ServerNotification::McpServerOauthLoginCompleted(_) | ServerNotification::AppListUpdated(_)