Move apply-patch file changes into turn items (#20540)

## Why

Apply-patch file changes are now part of the core turn item stream, so
v2 clients can consume the same first-class item lifecycle path used by
other turn items instead of relying on app-server-specific remapping
from legacy patch events.

## What changed

- Added a core `TurnItem::FileChange` carrying apply-patch changes and
completion metadata.
- Updated the apply-patch tool emitter to send `ItemStarted` /
`ItemCompleted` with the new `FileChange` item while preserving legacy
`PatchApplyBegin` / `PatchApplyEnd` fan-out.
- Updated app-server v2 conversion to render the new core item directly
and stopped `event_mapping` from remapping old patch begin/end events
into item notifications.
- Kept thread history reconstruction based on the existing old
apply-patch events for rollout compatibility.

## Verification

- `cargo test -p codex-protocol -p codex-app-server-protocol`
- `cargo test -p codex-core --test all
apply_patch_tool_executes_and_emits_patch_events`
- `cargo test -p codex-app-server bespoke_event_handling`
This commit is contained in:
pakrym-oai
2026-05-01 08:47:18 -07:00
committed by GitHub
parent 0b04d1b3cc
commit f476338f93
10 changed files with 240 additions and 165 deletions

View File

@@ -29,7 +29,6 @@ use codex_app_server_protocol::ExecPolicyAmendment as V2ExecPolicyAmendment;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::FileUpdateChange;
use codex_app_server_protocol::GrantedPermissionProfile as V2GrantedPermissionProfile;
use codex_app_server_protocol::GuardianWarningNotification;
use codex_app_server_protocol::HookCompletedNotification;
@@ -46,7 +45,6 @@ use codex_app_server_protocol::ModelVerificationNotification;
use codex_app_server_protocol::NetworkApprovalContext as V2NetworkApprovalContext;
use codex_app_server_protocol::NetworkPolicyAmendment as V2NetworkPolicyAmendment;
use codex_app_server_protocol::NetworkPolicyRuleAction as V2NetworkPolicyRuleAction;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PermissionsRequestApprovalParams;
use codex_app_server_protocol::PermissionsRequestApprovalResponse;
use codex_app_server_protocol::RawResponseItemCompletedNotification;
@@ -82,11 +80,8 @@ use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::WarningNotification;
use codex_app_server_protocol::build_file_change_approval_request_item;
use codex_app_server_protocol::build_file_change_end_item;
use codex_app_server_protocol::build_item_from_guardian_event;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_app_server_protocol::convert_patch_changes;
use codex_app_server_protocol::guardian_auto_approval_review_notification;
use codex_app_server_protocol::item_event_to_server_notification;
use codex_core::CodexThread;
@@ -524,28 +519,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let permission_guard = thread_watch_manager
.note_permission_requested(&conversation_id.to_string())
.await;
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = event.call_id.clone();
let patch_changes = convert_patch_changes(&event.changes);
let first_start = {
let mut state = thread_state.lock().await;
state
.turn_summary
.file_change_started
.insert(item_id.clone())
};
if first_start {
let item = build_file_change_approval_request_item(&event);
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
let params = FileChangeRequestApprovalParams {
thread_id: conversation_id.to_string(),
@@ -559,14 +533,10 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
tokio::spawn(async move {
on_file_change_request_approval_response(
event_turn_id,
conversation_id,
item_id,
patch_changes,
pending_request_id,
rx,
conversation,
outgoing,
thread_state.clone(),
permission_guard,
)
@@ -1104,40 +1074,9 @@ pub(crate) async fn apply_bespoke_event_handling(
)
.await;
}
EventMsg::PatchApplyBegin(patch_begin_event) => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_begin_event.call_id.clone();
let first_start = {
let mut state = thread_state.lock().await;
state
.turn_summary
.file_change_started
.insert(item_id.clone())
};
if first_start {
let notification = item_event_to_server_notification(
EventMsg::PatchApplyBegin(patch_begin_event),
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
}
EventMsg::PatchApplyEnd(patch_end_event) => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_end_event.call_id.clone();
complete_file_change_item(
conversation_id,
item_id,
build_file_change_end_item(&patch_end_event),
event_turn_id.clone(),
&outgoing,
&thread_state,
)
.await;
EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyEnd(_) => {
// Core still fans out these deprecated events for legacy clients;
// v2 clients receive the canonical FileChange item instead.
}
EventMsg::ExecCommandBegin(exec_command_begin_event) => {
if matches!(
@@ -1425,31 +1364,6 @@ async fn emit_turn_completed_with_status(
.await;
}
async fn complete_file_change_item(
conversation_id: ThreadId,
item_id: String,
item: ThreadItem,
turn_id: String,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
thread_state
.lock()
.await
.turn_summary
.file_change_started
.remove(&item_id);
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id,
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
#[allow(clippy::too_many_arguments)]
async fn start_command_execution_item(
conversation_id: &ThreadId,
@@ -2002,38 +1916,28 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
}
}
fn map_file_change_approval_decision(
decision: FileChangeApprovalDecision,
) -> (ReviewDecision, Option<PatchApplyStatus>) {
fn map_file_change_approval_decision(decision: FileChangeApprovalDecision) -> ReviewDecision {
match decision {
FileChangeApprovalDecision::Accept => (ReviewDecision::Approved, None),
FileChangeApprovalDecision::AcceptForSession => (ReviewDecision::ApprovedForSession, None),
FileChangeApprovalDecision::Decline => {
(ReviewDecision::Denied, Some(PatchApplyStatus::Declined))
}
FileChangeApprovalDecision::Cancel => {
(ReviewDecision::Abort, Some(PatchApplyStatus::Declined))
}
FileChangeApprovalDecision::Accept => ReviewDecision::Approved,
FileChangeApprovalDecision::AcceptForSession => ReviewDecision::ApprovedForSession,
FileChangeApprovalDecision::Decline => ReviewDecision::Denied,
FileChangeApprovalDecision::Cancel => ReviewDecision::Abort,
}
}
#[allow(clippy::too_many_arguments)]
async fn on_file_change_request_approval_response(
event_turn_id: String,
conversation_id: ThreadId,
item_id: String,
changes: Vec<FileUpdateChange>,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
codex: Arc<CodexThread>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
permission_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(permission_guard);
let (decision, completion_status) = match response {
let decision = match response {
Ok(Ok(value)) => {
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
@@ -2043,39 +1947,19 @@ async fn on_file_change_request_approval_response(
}
});
let (decision, completion_status) =
map_file_change_approval_decision(response.decision);
// Allow EventMsg::PatchApplyEnd to emit ItemCompleted for accepted patches.
// Only short-circuit on declines/cancels/failures.
(decision, completion_status)
map_file_change_approval_decision(response.decision)
}
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
ReviewDecision::Denied
}
Err(err) => {
error!("request failed: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
ReviewDecision::Denied
}
};
if let Some(status) = completion_status {
complete_file_change_item(
conversation_id,
item_id.clone(),
ThreadItem::FileChange {
id: item_id.clone(),
changes,
status,
},
event_turn_id.clone(),
&outgoing,
&thread_state,
)
.await;
}
if let Err(err) = codex
.submit(Op::PatchApproval {
id: item_id,
@@ -2886,10 +2770,9 @@ mod tests {
#[test]
fn file_change_accept_for_session_maps_to_approved_for_session() {
let (decision, completion_status) =
let decision =
map_file_change_approval_decision(FileChangeApprovalDecision::AcceptForSession);
assert_eq!(decision, ReviewDecision::ApprovedForSession);
assert_eq!(completion_status, None);
}
#[test]

View File

@@ -61,7 +61,6 @@ pub(crate) enum ThreadListenerCommand {
#[derive(Default, Clone)]
pub(crate) struct TurnSummary {
pub(crate) started_at: Option<i64>,
pub(crate) file_change_started: HashSet<String>,
pub(crate) command_execution_started: HashSet<String>,
pub(crate) last_error: Option<TurnError>,
}