wiring in approvals

This commit is contained in:
Roy Han
2026-03-05 14:26:48 -08:00
parent 1ed80dbd5a
commit cf119ea2a0
4 changed files with 377 additions and 12 deletions

View File

@@ -42,6 +42,36 @@ pub(crate) struct SkillInvocation {
pub(crate) invocation_type: InvocationType,
}
#[derive(Clone, Copy, Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ApprovalKind {
ExecCommand,
ApplyPatch,
}
#[derive(Clone, Copy, Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ApprovalLifecycleStage {
Requested,
Resolved,
}
#[derive(Clone, Copy, Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ApprovalEscalationStatus {
Approved,
Rejected,
}
#[derive(Clone, Debug)]
pub(crate) struct ApprovalItemInvocation {
pub(crate) call_id: String,
pub(crate) approval_id: String,
pub(crate) approval_kind: ApprovalKind,
pub(crate) lifecycle_stage: ApprovalLifecycleStage,
pub(crate) escalation_status: Option<ApprovalEscalationStatus>,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum InvocationType {
@@ -81,6 +111,9 @@ impl AnalyticsEventsQueue {
TrackEventsJob::AppUsed(job) => {
send_track_app_used(&auth_manager, job).await;
}
TrackEventsJob::ApprovalItem(job) => {
send_track_approval_item(&auth_manager, job).await;
}
}
}
});
@@ -149,12 +182,26 @@ impl AnalyticsEventsClient {
pub(crate) fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
track_app_used(&self.queue, Arc::clone(&self.config), Some(tracking), app);
}
pub(crate) fn track_approval_item(
&self,
tracking: TrackEventsContext,
approval: ApprovalItemInvocation,
) {
track_approval_item(
&self.queue,
Arc::clone(&self.config),
Some(tracking),
approval,
);
}
}
enum TrackEventsJob {
SkillInvocations(TrackSkillInvocationsJob),
AppMentioned(TrackAppMentionedJob),
AppUsed(TrackAppUsedJob),
ApprovalItem(TrackApprovalItemJob),
}
struct TrackSkillInvocationsJob {
@@ -175,6 +222,12 @@ struct TrackAppUsedJob {
app: AppInvocation,
}
struct TrackApprovalItemJob {
config: Arc<Config>,
tracking: TrackEventsContext,
approval: ApprovalItemInvocation,
}
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
const ANALYTICS_APP_USED_DEDUPE_MAX_KEYS: usize = 4096;
@@ -190,6 +243,7 @@ enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
ApprovalItem(CodexApprovalItemEventRequest),
}
#[derive(Serialize)]
@@ -233,6 +287,27 @@ struct CodexAppUsedEventRequest {
event_params: CodexAppMetadata,
}
#[derive(Serialize)]
struct CodexApprovalItemMetadata {
thread_id: Option<String>,
turn_id: Option<String>,
approval_id: String,
call_id: String,
approval_kind: ApprovalKind,
lifecycle_stage: ApprovalLifecycleStage,
is_tool_call_escalated: bool,
#[serde(skip_serializing_if = "Option::is_none")]
escalation_status: Option<ApprovalEscalationStatus>,
product_client_id: Option<String>,
model_slug: Option<String>,
}
#[derive(Serialize)]
struct CodexApprovalItemEventRequest {
event_type: &'static str,
event_params: CodexApprovalItemMetadata,
}
pub(crate) fn track_skill_invocations(
queue: &AnalyticsEventsQueue,
config: Arc<Config>,
@@ -302,6 +377,26 @@ pub(crate) fn track_app_used(
queue.try_send(job);
}
pub(crate) fn track_approval_item(
queue: &AnalyticsEventsQueue,
config: Arc<Config>,
tracking: Option<TrackEventsContext>,
approval: ApprovalItemInvocation,
) {
if config.analytics_enabled == Some(false) {
return;
}
let Some(tracking) = tracking else {
return;
};
let job = TrackEventsJob::ApprovalItem(TrackApprovalItemJob {
config,
tracking,
approval,
});
queue.try_send(job);
}
async fn send_track_skill_invocations(auth_manager: &AuthManager, job: TrackSkillInvocationsJob) {
let TrackSkillInvocationsJob {
config,
@@ -385,6 +480,22 @@ async fn send_track_app_used(auth_manager: &AuthManager, job: TrackAppUsedJob) {
send_track_events(auth_manager, config, events).await;
}
async fn send_track_approval_item(auth_manager: &AuthManager, job: TrackApprovalItemJob) {
let TrackApprovalItemJob {
config,
tracking,
approval,
} = job;
let event_params = codex_approval_item_metadata(&tracking, approval);
let events = vec![TrackEventRequest::ApprovalItem(
CodexApprovalItemEventRequest {
event_type: "codex_approval_item",
event_params,
},
)];
send_track_events(auth_manager, config, events).await;
}
fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata {
CodexAppMetadata {
connector_id: app.connector_id,
@@ -397,6 +508,24 @@ fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> Code
}
}
fn codex_approval_item_metadata(
tracking: &TrackEventsContext,
approval: ApprovalItemInvocation,
) -> CodexApprovalItemMetadata {
CodexApprovalItemMetadata {
thread_id: Some(tracking.thread_id.clone()),
turn_id: Some(tracking.turn_id.clone()),
approval_id: approval.approval_id,
call_id: approval.call_id,
approval_kind: approval.approval_kind,
lifecycle_stage: approval.lifecycle_stage,
is_tool_call_escalated: true,
escalation_status: approval.escalation_status,
product_client_id: Some(crate::default_client::originator().value),
model_slug: Some(tracking.model_slug.clone()),
}
}
async fn send_track_events(
auth_manager: &AuthManager,
config: Arc<Config>,
@@ -492,12 +621,18 @@ fn normalize_path_for_skill_id(
mod tests {
use super::AnalyticsEventsQueue;
use super::AppInvocation;
use super::ApprovalEscalationStatus;
use super::ApprovalItemInvocation;
use super::ApprovalKind;
use super::ApprovalLifecycleStage;
use super::CodexAppMentionedEventRequest;
use super::CodexAppUsedEventRequest;
use super::CodexApprovalItemEventRequest;
use super::InvocationType;
use super::TrackEventRequest;
use super::TrackEventsContext;
use super::codex_app_metadata;
use super::codex_approval_item_metadata;
use super::normalize_path_for_skill_id;
use pretty_assertions::assert_eq;
use serde_json::json;
@@ -667,4 +802,90 @@ mod tests {
assert_eq!(queue.should_enqueue_app_used(&turn_1, &app), false);
assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true);
}
#[test]
fn approval_item_requested_event_serializes_expected_shape() {
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-9".to_string(),
turn_id: "turn-9".to_string(),
};
let event = TrackEventRequest::ApprovalItem(CodexApprovalItemEventRequest {
event_type: "codex_approval_item",
event_params: codex_approval_item_metadata(
&tracking,
ApprovalItemInvocation {
call_id: "call-1".to_string(),
approval_id: "approval-1".to_string(),
approval_kind: ApprovalKind::ExecCommand,
lifecycle_stage: ApprovalLifecycleStage::Requested,
escalation_status: None,
},
),
});
let payload =
serde_json::to_value(&event).expect("serialize approval item requested event");
assert_eq!(
payload,
json!({
"event_type": "codex_approval_item",
"event_params": {
"thread_id": "thread-9",
"turn_id": "turn-9",
"approval_id": "approval-1",
"call_id": "call-1",
"approval_kind": "exec_command",
"lifecycle_stage": "requested",
"is_tool_call_escalated": true,
"product_client_id": crate::default_client::originator().value,
"model_slug": "gpt-5"
}
})
);
}
#[test]
fn approval_item_resolved_event_serializes_expected_shape() {
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-10".to_string(),
turn_id: "turn-10".to_string(),
};
let event = TrackEventRequest::ApprovalItem(CodexApprovalItemEventRequest {
event_type: "codex_approval_item",
event_params: codex_approval_item_metadata(
&tracking,
ApprovalItemInvocation {
call_id: "call-2".to_string(),
approval_id: "approval-2".to_string(),
approval_kind: ApprovalKind::ApplyPatch,
lifecycle_stage: ApprovalLifecycleStage::Resolved,
escalation_status: Some(ApprovalEscalationStatus::Rejected),
},
),
});
let payload = serde_json::to_value(&event).expect("serialize approval item resolved event");
assert_eq!(
payload,
json!({
"event_type": "codex_approval_item",
"event_params": {
"thread_id": "thread-10",
"turn_id": "turn-10",
"approval_id": "approval-2",
"call_id": "call-2",
"approval_kind": "apply_patch",
"lifecycle_stage": "resolved",
"is_tool_call_escalated": true,
"escalation_status": "rejected",
"product_client_id": crate::default_client::originator().value,
"model_slug": "gpt-5"
}
})
);
}
}

View File

@@ -14,6 +14,10 @@ use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::AppInvocation;
use crate::analytics_client::ApprovalEscalationStatus;
use crate::analytics_client::ApprovalItemInvocation;
use crate::analytics_client::ApprovalKind;
use crate::analytics_client::ApprovalLifecycleStage;
use crate::analytics_client::InvocationType;
use crate::analytics_client::build_track_events_context;
use crate::apps::render_apps_section;
@@ -284,6 +288,28 @@ fn build_user_message_metadata(
}
}
fn approval_kind_for_telemetry(kind: PendingApprovalKind) -> ApprovalKind {
match kind {
PendingApprovalKind::ExecCommand => ApprovalKind::ExecCommand,
PendingApprovalKind::ApplyPatch => ApprovalKind::ApplyPatch,
}
}
fn escalation_status_for_review_decision(decision: &ReviewDecision) -> ApprovalEscalationStatus {
match decision {
ReviewDecision::Approved
| ReviewDecision::ApprovedExecpolicyAmendment { .. }
| ReviewDecision::ApprovedForSession => ApprovalEscalationStatus::Approved,
ReviewDecision::NetworkPolicyAmendment {
network_policy_amendment,
} => match network_policy_amendment.action {
NetworkPolicyRuleAction::Allow => ApprovalEscalationStatus::Approved,
NetworkPolicyRuleAction::Deny => ApprovalEscalationStatus::Rejected,
},
ReviewDecision::Denied | ReviewDecision::Abort => ApprovalEscalationStatus::Rejected,
}
}
/// Notes from the previous real user turn.
///
/// Conceptually this is the same role that `previous_model` used to fill, but
@@ -377,6 +403,9 @@ use crate::skills::injection::app_id_from_path;
use crate::skills::injection::tool_kind_for_path;
use crate::skills::resolve_skill_dependencies_for_turn;
use crate::state::ActiveTurn;
use crate::state::PendingApproval;
use crate::state::PendingApprovalKind;
use crate::state::PendingApprovalTelemetry;
use crate::state::PendingInputItem;
use crate::state::PendingInputSource;
use crate::state::SessionServices;
@@ -2774,6 +2803,29 @@ impl Session {
}
}
fn track_pending_approval_item(
&self,
telemetry: &PendingApprovalTelemetry,
lifecycle_stage: ApprovalLifecycleStage,
escalation_status: Option<ApprovalEscalationStatus>,
) {
let tracking = build_track_events_context(
telemetry.model_slug.clone(),
self.conversation_id.to_string(),
telemetry.turn_id.clone(),
);
self.services.analytics_events_client.track_approval_item(
tracking,
ApprovalItemInvocation {
call_id: telemetry.call_id.clone(),
approval_id: telemetry.approval_id.clone(),
approval_kind: approval_kind_for_telemetry(telemetry.kind),
lifecycle_stage,
escalation_status,
},
);
}
/// Emit an exec approval request event and await the user's decision.
///
/// The request is keyed by `call_id` + `approval_id` so matching responses
@@ -2800,6 +2852,13 @@ impl Session {
// command-level approvals use `call_id`.
// `approval_id` is only present for subcommand callbacks (execve intercept)
let effective_approval_id = approval_id.clone().unwrap_or_else(|| call_id.clone());
let pending_telemetry = PendingApprovalTelemetry {
turn_id: turn_context.sub_id.clone(),
call_id: call_id.clone(),
approval_id: effective_approval_id.clone(),
model_slug: turn_context.model_info.slug.clone(),
kind: PendingApprovalKind::ExecCommand,
};
// Add the tx_approve callback to the map before sending the request.
let (tx_approve, rx_approve) = oneshot::channel();
let prev_entry = {
@@ -2807,7 +2866,13 @@ impl Session {
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_approval(effective_approval_id.clone(), tx_approve)
ts.insert_pending_approval(
effective_approval_id.clone(),
PendingApproval {
tx: tx_approve,
telemetry: pending_telemetry.clone(),
},
)
}
None => None,
}
@@ -2815,6 +2880,11 @@ impl Session {
if prev_entry.is_some() {
warn!("Overwriting existing pending approval for call_id: {effective_approval_id}");
}
self.track_pending_approval_item(
&pending_telemetry,
ApprovalLifecycleStage::Requested,
None,
);
let parsed_cmd = parse_command(&command);
let proposed_network_policy_amendments = network_approval_context.as_ref().map(|context| {
@@ -2866,12 +2936,25 @@ impl Session {
// Add the tx_approve callback to the map before sending the request.
let (tx_approve, rx_approve) = oneshot::channel();
let approval_id = call_id.clone();
let pending_telemetry = PendingApprovalTelemetry {
turn_id: turn_context.sub_id.clone(),
call_id: call_id.clone(),
approval_id: approval_id.clone(),
model_slug: turn_context.model_info.slug.clone(),
kind: PendingApprovalKind::ApplyPatch,
};
let prev_entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_approval(approval_id.clone(), tx_approve)
ts.insert_pending_approval(
approval_id.clone(),
PendingApproval {
tx: tx_approve,
telemetry: pending_telemetry.clone(),
},
)
}
None => None,
}
@@ -2879,6 +2962,11 @@ impl Session {
if prev_entry.is_some() {
warn!("Overwriting existing pending approval for call_id: {approval_id}");
}
self.track_pending_approval_item(
&pending_telemetry,
ApprovalLifecycleStage::Requested,
None,
);
let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
@@ -2981,8 +3069,40 @@ impl Session {
}
};
match entry {
Some(tx_approve) => {
tx_approve.send(decision).ok();
Some(pending_approval) => {
let escalation_status = escalation_status_for_review_decision(&decision);
self.track_pending_approval_item(
&pending_approval.telemetry,
ApprovalLifecycleStage::Resolved,
Some(escalation_status),
);
pending_approval.tx.send(decision).ok();
}
None => {
warn!("No pending approval found for call_id: {approval_id}");
}
}
}
pub async fn discard_pending_approval(&self, approval_id: &str, decision: ReviewDecision) {
let entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.remove_pending_approval(approval_id)
}
None => None,
}
};
match entry {
Some(pending_approval) => {
let escalation_status = escalation_status_for_review_decision(&decision);
self.track_pending_approval_item(
&pending_approval.telemetry,
ApprovalLifecycleStage::Resolved,
Some(escalation_status),
);
}
None => {
warn!("No pending approval found for call_id: {approval_id}");
@@ -4288,6 +4408,8 @@ mod handlers {
}
match decision {
ReviewDecision::Abort => {
sess.discard_pending_approval(&approval_id, ReviewDecision::Abort)
.await;
sess.interrupt_task().await;
}
other => sess.notify_approval(&approval_id, other).await,
@@ -4297,6 +4419,8 @@ mod handlers {
pub async fn patch_approval(sess: &Arc<Session>, id: String, decision: ReviewDecision) {
match decision {
ReviewDecision::Abort => {
sess.discard_pending_approval(&id, ReviewDecision::Abort)
.await;
sess.interrupt_task().await;
}
other => sess.notify_approval(&id, other).await,

View File

@@ -5,6 +5,9 @@ mod turn;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::PendingApproval;
pub(crate) use turn::PendingApprovalKind;
pub(crate) use turn::PendingApprovalTelemetry;
pub(crate) use turn::PendingInputItem;
pub(crate) use turn::PendingInputSource;
pub(crate) use turn::RunningTask;

View File

@@ -18,6 +18,26 @@ use crate::protocol::ReviewDecision;
use crate::protocol::TokenUsage;
use crate::tasks::SessionTask;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum PendingApprovalKind {
ExecCommand,
ApplyPatch,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct PendingApprovalTelemetry {
pub(crate) turn_id: String,
pub(crate) call_id: String,
pub(crate) approval_id: String,
pub(crate) model_slug: String,
pub(crate) kind: PendingApprovalKind,
}
pub(crate) struct PendingApproval {
pub(crate) tx: oneshot::Sender<ReviewDecision>,
pub(crate) telemetry: PendingApprovalTelemetry,
}
/// Metadata about the currently running turn.
pub(crate) struct ActiveTurn {
pub(crate) tasks: IndexMap<String, RunningTask>,
@@ -82,7 +102,7 @@ impl ActiveTurn {
/// Mutable state for a single turn.
#[derive(Default)]
pub(crate) struct TurnState {
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_approvals: HashMap<String, PendingApproval>,
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pending_input: Vec<PendingInputItem>,
@@ -94,15 +114,12 @@ impl TurnState {
pub(crate) fn insert_pending_approval(
&mut self,
key: String,
tx: oneshot::Sender<ReviewDecision>,
) -> Option<oneshot::Sender<ReviewDecision>> {
self.pending_approvals.insert(key, tx)
pending: PendingApproval,
) -> Option<PendingApproval> {
self.pending_approvals.insert(key, pending)
}
pub(crate) fn remove_pending_approval(
&mut self,
key: &str,
) -> Option<oneshot::Sender<ReviewDecision>> {
pub(crate) fn remove_pending_approval(&mut self, key: &str) -> Option<PendingApproval> {
self.pending_approvals.remove(key)
}