[codex-analytics] guardian review analytics events emission

This commit is contained in:
rhan-oai
2026-04-15 15:03:50 -07:00
parent d59234e830
commit 5480d723fd
3 changed files with 373 additions and 18 deletions

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use async_channel::Receiver;
use async_channel::Sender;
use codex_analytics::GuardianApprovalRequestSource;
use codex_async_utils::OrCancelExt;
use codex_exec_server::EnvironmentManager;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
@@ -753,6 +754,7 @@ fn spawn_guardian_review(
review_id,
request,
retry_reason,
GuardianApprovalRequestSource::DelegatedSubagent,
cancel_token,
));
let _ = tx.send(decision);

View File

@@ -1,5 +1,14 @@
use std::sync::Arc;
use std::time::Instant;
use codex_analytics::GuardianApprovalRequestSource;
use codex_analytics::GuardianReviewDecision;
use codex_analytics::GuardianReviewFailureReason;
use codex_analytics::GuardianReviewSessionKind;
use codex_analytics::GuardianReviewTerminalStatus;
use codex_analytics::GuardianReviewedAction;
use codex_analytics::now_unix_seconds;
use codex_features::Feature;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
@@ -10,12 +19,14 @@ use codex_protocol::protocol::GuardianRiskLevel;
use codex_protocol::protocol::GuardianUserAuthorization;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::WarningEvent;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
use super::GUARDIAN_REVIEW_TIMEOUT;
use super::GUARDIAN_REVIEWER_NAME;
use super::GuardianApprovalRequest;
use super::GuardianAssessment;
@@ -76,10 +87,34 @@ pub(crate) fn guardian_timeout_message() -> String {
#[derive(Debug)]
pub(super) enum GuardianReviewOutcome {
Completed(anyhow::Result<GuardianAssessment>),
Failed(GuardianReviewFailure),
TimedOut,
Aborted,
}
#[derive(Debug)]
pub(super) enum GuardianReviewFailure {
PromptBuild(anyhow::Error),
Session(anyhow::Error),
Parse(anyhow::Error),
}
impl GuardianReviewFailure {
fn reason(&self) -> GuardianReviewFailureReason {
match self {
Self::PromptBuild(_) => GuardianReviewFailureReason::PromptBuildError,
Self::Session(_) => GuardianReviewFailureReason::SessionError,
Self::Parse(_) => GuardianReviewFailureReason::ParseError,
}
}
fn error(&self) -> &anyhow::Error {
match self {
Self::PromptBuild(err) | Self::Session(err) | Self::Parse(err) => err,
}
}
}
fn guardian_risk_level_str(level: GuardianRiskLevel) -> &'static str {
match level {
GuardianRiskLevel::Low => "low",
@@ -89,6 +124,174 @@ fn guardian_risk_level_str(level: GuardianRiskLevel) -> &'static str {
}
}
fn guardian_reviewed_action(request: &GuardianApprovalRequest) -> GuardianReviewedAction {
match request {
GuardianApprovalRequest::Shell {
sandbox_permissions,
additional_permissions,
..
} => GuardianReviewedAction::Shell {
sandbox_permissions: *sandbox_permissions,
additional_permissions: additional_permissions.clone(),
},
GuardianApprovalRequest::ExecCommand {
sandbox_permissions,
additional_permissions,
tty,
..
} => GuardianReviewedAction::UnifiedExec {
sandbox_permissions: *sandbox_permissions,
additional_permissions: additional_permissions.clone(),
tty: *tty,
},
#[cfg(unix)]
GuardianApprovalRequest::Execve {
source,
program,
additional_permissions,
..
} => GuardianReviewedAction::Execve {
source: *source,
program: program.clone(),
additional_permissions: additional_permissions.clone(),
},
GuardianApprovalRequest::ApplyPatch { .. } => GuardianReviewedAction::ApplyPatch {},
GuardianApprovalRequest::NetworkAccess { protocol, port, .. } => {
GuardianReviewedAction::NetworkAccess {
protocol: *protocol,
port: *port,
}
}
GuardianApprovalRequest::McpToolCall {
server,
tool_name,
connector_id,
connector_name,
tool_title,
..
} => GuardianReviewedAction::McpToolCall {
server: server.clone(),
tool_name: tool_name.clone(),
connector_id: connector_id.clone(),
connector_name: connector_name.clone(),
tool_title: tool_title.clone(),
},
}
}
struct GuardianReviewAnalyticsContext {
thread_id: String,
turn_id: String,
review_id: String,
target_item_id: Option<String>,
approval_request_source: GuardianApprovalRequestSource,
reviewed_action: GuardianReviewedAction,
started_at: u64,
started_instant: Instant,
}
struct GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision,
terminal_status: GuardianReviewTerminalStatus,
failure_reason: Option<GuardianReviewFailureReason>,
risk_level: Option<GuardianRiskLevel>,
user_authorization: Option<GuardianUserAuthorization>,
outcome: Option<GuardianAssessmentOutcome>,
guardian_thread_id: Option<String>,
guardian_session_kind: Option<GuardianReviewSessionKind>,
guardian_model: Option<String>,
guardian_reasoning_effort: Option<String>,
had_prior_review_context: Option<bool>,
reviewed_action_truncated: bool,
token_usage: Option<TokenUsage>,
time_to_first_token_ms: Option<u64>,
completed_at: u64,
}
#[derive(Default)]
struct GuardianReviewMetadataFields {
guardian_thread_id: Option<String>,
guardian_session_kind: Option<GuardianReviewSessionKind>,
guardian_model: Option<String>,
guardian_reasoning_effort: Option<String>,
had_prior_review_context: Option<bool>,
reviewed_action_truncated: bool,
token_usage: Option<TokenUsage>,
time_to_first_token_ms: Option<u64>,
}
impl GuardianReviewAnalyticsResult {
fn from_metadata(metadata: GuardianReviewMetadataFields, completed_at: u64) -> Self {
Self {
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::FailedClosed,
failure_reason: None,
risk_level: None,
user_authorization: None,
outcome: None,
guardian_thread_id: metadata.guardian_thread_id,
guardian_session_kind: metadata.guardian_session_kind,
guardian_model: metadata.guardian_model,
guardian_reasoning_effort: metadata.guardian_reasoning_effort,
had_prior_review_context: metadata.had_prior_review_context,
reviewed_action_truncated: metadata.reviewed_action_truncated,
token_usage: metadata.token_usage,
time_to_first_token_ms: metadata.time_to_first_token_ms,
completed_at,
}
}
}
impl GuardianReviewAnalyticsContext {
fn track(&self, session: &Session, turn: &TurnContext, result: GuardianReviewAnalyticsResult) {
if !turn.config.features.enabled(Feature::GeneralAnalytics) {
return;
}
let completion_latency_ms = self.started_instant.elapsed().as_millis() as u64;
session
.services
.analytics_events_client
.track_guardian_review(codex_analytics::GuardianReviewEventParams {
thread_id: self.thread_id.clone(),
turn_id: self.turn_id.clone(),
review_id: self.review_id.clone(),
target_item_id: self.target_item_id.clone(),
approval_request_source: self.approval_request_source,
reviewed_action: self.reviewed_action.clone(),
reviewed_action_truncated: result.reviewed_action_truncated,
decision: result.decision,
terminal_status: result.terminal_status,
failure_reason: result.failure_reason,
risk_level: result.risk_level,
user_authorization: result.user_authorization,
outcome: result.outcome,
guardian_thread_id: result.guardian_thread_id,
guardian_session_kind: result.guardian_session_kind,
guardian_model: result.guardian_model,
guardian_reasoning_effort: result.guardian_reasoning_effort,
had_prior_review_context: result.had_prior_review_context,
review_timeout_ms: GUARDIAN_REVIEW_TIMEOUT.as_millis() as u64,
// TODO(rhan-oai): plumb nested Guardian review session tool-call counts.
tool_call_count: None,
time_to_first_token_ms: result.time_to_first_token_ms,
completion_latency_ms: Some(completion_latency_ms),
started_at: self.started_at,
completed_at: Some(result.completed_at),
input_tokens: result.token_usage.as_ref().map(|usage| usage.input_tokens),
cached_input_tokens: result
.token_usage
.as_ref()
.map(|usage| usage.cached_input_tokens),
output_tokens: result.token_usage.as_ref().map(|usage| usage.output_tokens),
reasoning_output_tokens: result
.token_usage
.as_ref()
.map(|usage| usage.reasoning_output_tokens),
total_tokens: result.token_usage.as_ref().map(|usage| usage.total_tokens),
});
}
}
/// Whether this turn should route `on-request` approval prompts through the
/// guardian reviewer instead of surfacing them to the user. ARC may still
/// block actions earlier in the flow.
@@ -116,11 +319,24 @@ async fn run_guardian_review(
review_id: String,
request: GuardianApprovalRequest,
retry_reason: Option<String>,
approval_request_source: GuardianApprovalRequestSource,
external_cancel: Option<CancellationToken>,
) -> ReviewDecision {
let started_at = now_unix_seconds();
let started_instant = Instant::now();
let target_item_id = guardian_request_target_item_id(&request).map(str::to_string);
let assessment_turn_id = guardian_request_turn_id(&request, &turn.sub_id).to_string();
let action_summary = guardian_assessment_action(&request);
let analytics_context = GuardianReviewAnalyticsContext {
thread_id: session.conversation_id.to_string(),
turn_id: assessment_turn_id.clone(),
review_id: review_id.clone(),
target_item_id: target_item_id.clone(),
approval_request_source,
reviewed_action: guardian_reviewed_action(&request),
started_at,
started_instant,
};
session
.send_event(
turn.as_ref(),
@@ -142,6 +358,19 @@ async fn run_guardian_review(
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
analytics_context.track(
session.as_ref(),
turn.as_ref(),
GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision::Aborted,
terminal_status: GuardianReviewTerminalStatus::Aborted,
failure_reason: Some(GuardianReviewFailureReason::Cancelled),
..GuardianReviewAnalyticsResult::from_metadata(
GuardianReviewMetadataFields::default(),
now_unix_seconds(),
)
},
);
session
.send_event(
turn.as_ref(),
@@ -167,24 +396,97 @@ async fn run_guardian_review(
session.clone(),
turn.clone(),
request,
retry_reason,
retry_reason.clone(),
schema,
external_cancel,
))
.await;
let completed_at = now_unix_seconds();
let result = || {
GuardianReviewAnalyticsResult::from_metadata(
GuardianReviewMetadataFields::default(),
completed_at,
)
};
let assessment = match outcome {
GuardianReviewOutcome::Completed(Ok(assessment)) => assessment,
GuardianReviewOutcome::Completed(Err(err)) => GuardianAssessment {
risk_level: GuardianRiskLevel::High,
user_authorization: GuardianUserAuthorization::Unknown,
outcome: GuardianAssessmentOutcome::Deny,
rationale: format!("Automatic approval review failed: {err}"),
},
GuardianReviewOutcome::Completed(Ok(assessment)) => {
let approved = matches!(assessment.outcome, GuardianAssessmentOutcome::Allow);
analytics_context.track(
session.as_ref(),
turn.as_ref(),
GuardianReviewAnalyticsResult {
decision: if approved {
GuardianReviewDecision::Approved
} else {
GuardianReviewDecision::Denied
},
terminal_status: if approved {
GuardianReviewTerminalStatus::Approved
} else {
GuardianReviewTerminalStatus::Denied
},
failure_reason: None,
risk_level: Some(assessment.risk_level),
user_authorization: Some(assessment.user_authorization),
outcome: Some(assessment.outcome),
..result()
},
);
assessment
}
GuardianReviewOutcome::Completed(Err(err)) => {
let rationale = format!("Automatic approval review failed: {err}");
analytics_context.track(
session.as_ref(),
turn.as_ref(),
GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::FailedClosed,
failure_reason: Some(GuardianReviewFailureReason::SessionError),
..result()
},
);
GuardianAssessment {
risk_level: GuardianRiskLevel::High,
user_authorization: GuardianUserAuthorization::Unknown,
outcome: GuardianAssessmentOutcome::Deny,
rationale,
}
}
GuardianReviewOutcome::Failed(failure) => {
let rationale = format!("Automatic approval review failed: {}", failure.error());
analytics_context.track(
session.as_ref(),
turn.as_ref(),
GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::FailedClosed,
failure_reason: Some(failure.reason()),
..result()
},
);
GuardianAssessment {
risk_level: GuardianRiskLevel::High,
user_authorization: GuardianUserAuthorization::Unknown,
outcome: GuardianAssessmentOutcome::Deny,
rationale,
}
}
GuardianReviewOutcome::TimedOut => {
let rationale =
"Automatic approval review timed out while evaluating the requested approval."
.to_string();
analytics_context.track(
session.as_ref(),
turn.as_ref(),
GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::TimedOut,
failure_reason: Some(GuardianReviewFailureReason::Timeout),
..result()
},
);
session
.send_event(
turn.as_ref(),
@@ -212,6 +514,16 @@ async fn run_guardian_review(
return ReviewDecision::TimedOut;
}
GuardianReviewOutcome::Aborted => {
analytics_context.track(
session.as_ref(),
turn.as_ref(),
GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision::Aborted,
terminal_status: GuardianReviewTerminalStatus::Aborted,
failure_reason: Some(GuardianReviewFailureReason::Cancelled),
..result()
},
);
session
.send_event(
turn.as_ref(),
@@ -311,6 +623,7 @@ pub(crate) async fn review_approval_request(
review_id,
request,
retry_reason,
GuardianApprovalRequestSource::MainTurn,
/*external_cancel*/ None,
))
.await
@@ -322,16 +635,18 @@ pub(crate) async fn review_approval_request_with_cancel(
review_id: String,
request: GuardianApprovalRequest,
retry_reason: Option<String>,
approval_request_source: GuardianApprovalRequestSource,
cancel_token: CancellationToken,
) -> ReviewDecision {
Box::pin(run_guardian_review(
run_guardian_review(
Arc::clone(session),
Arc::clone(turn),
review_id,
request,
retry_reason,
approval_request_source,
Some(cancel_token),
))
)
.await
}
@@ -360,7 +675,9 @@ pub(super) async fn run_guardian_review_session(
let live_network_config = match session.services.network_proxy.as_ref() {
Some(network_proxy) => match network_proxy.proxy().current_cfg().await {
Ok(config) => Some(config),
Err(err) => return GuardianReviewOutcome::Completed(Err(err)),
Err(err) => {
return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err));
}
},
None => None,
};
@@ -410,7 +727,7 @@ pub(super) async fn run_guardian_review_session(
);
let guardian_config = match guardian_config {
Ok(config) => config,
Err(err) => return GuardianReviewOutcome::Completed(Err(err)),
Err(err) => return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err)),
};
match Box::pin(
@@ -432,15 +749,49 @@ pub(super) async fn run_guardian_review_session(
)
.await
{
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) => {
GuardianReviewOutcome::Completed(parse_guardian_assessment(
last_agent_message.as_deref(),
))
}
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) => match last_agent_message
{
Some(last_agent_message) => {
match parse_guardian_assessment(Some(&last_agent_message)) {
Ok(assessment) => GuardianReviewOutcome::Completed(Ok(assessment)),
Err(err) => GuardianReviewOutcome::Failed(GuardianReviewFailure::Parse(err)),
}
}
None => GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(anyhow::anyhow!(
"guardian review completed without an assessment payload"
))),
},
GuardianReviewSessionOutcome::Completed(Err(err)) => {
GuardianReviewOutcome::Completed(Err(err))
GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(err))
}
GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut,
GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted,
}
}
#[cfg(test)]
mod review_tests {
use super::*;
#[test]
fn guardian_review_failure_reason_distinguishes_failure_kinds() {
let parse_failure = GuardianReviewFailure::Parse(anyhow::anyhow!("bad guardian JSON"));
let prompt_failure =
GuardianReviewFailure::PromptBuild(anyhow::anyhow!("bad prompt/config"));
let session_failure =
GuardianReviewFailure::Session(anyhow::anyhow!("guardian runtime failed"));
assert!(matches!(
parse_failure.reason(),
GuardianReviewFailureReason::ParseError
));
assert!(matches!(
prompt_failure.reason(),
GuardianReviewFailureReason::PromptBuildError
));
assert!(matches!(
session_failure.reason(),
GuardianReviewFailureReason::SessionError
));
}
}

View File

@@ -15,6 +15,7 @@ use crate::config_loader::NetworkDomainPermissionsToml;
use crate::config_loader::RequirementSource;
use crate::config_loader::Sourced;
use crate::test_support;
use codex_analytics::GuardianApprovalRequestSource;
use codex_config::config_toml::ConfigToml;
use codex_network_proxy::NetworkProxyConfig;
use codex_protocol::ThreadId;
@@ -701,6 +702,7 @@ async fn cancelled_guardian_review_emits_terminal_abort_without_warning() {
.to_string(),
},
/*retry_reason*/ None,
GuardianApprovalRequestSource::MainTurn,
cancel_token,
)
.await;