Compare commits

...

2 Commits

Author SHA1 Message Date
Owen Lin
409a3b3b32 feat(guardian): collect review session metrics 2026-04-07 16:13:10 -07:00
Owen Lin
823938ff87 feat(analytics): add guardian review event schema 2026-04-07 16:06:57 -07:00
11 changed files with 737 additions and 42 deletions

View File

@@ -20,6 +20,7 @@ codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
os_info = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha1 = { workspace = true }
tokio = { workspace = true, features = [
"macros",
@@ -29,4 +30,3 @@ tracing = { workspace = true, features = ["log"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
serde_json = { workspace = true }

View File

@@ -19,6 +19,15 @@ use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::GuardianReviewDecision;
use crate::facts::GuardianReviewEventParams;
use crate::facts::GuardianReviewFailureKind;
use crate::facts::GuardianReviewRiskLevel;
use crate::facts::GuardianReviewSessionKind;
use crate::facts::GuardianReviewTerminalStatus;
use crate::facts::GuardianReviewTrigger;
use crate::facts::GuardianReviewedAction;
use crate::facts::GuardianToolCallCounts;
use crate::facts::InvocationType;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
@@ -823,6 +832,127 @@ async fn reducer_ingests_plugin_state_changed_fact() {
);
}
#[tokio::test]
async fn reducer_ingests_guardian_review_fact() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let tool_counts = GuardianToolCallCounts {
shell: 1,
mcp: 2,
..Default::default()
};
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(Box::new(
GuardianReviewEventParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
review_id: "review-1".to_string(),
target_item_id: "tool-1".to_string(),
product_client_id: Some("codex_app".to_string()),
trigger: GuardianReviewTrigger::McpToolCall,
retry_reason: Some("requires approval".to_string()),
delegated_review: true,
reviewed_action: GuardianReviewedAction::McpToolCall {
server: "github".to_string(),
tool_name: "create_pr".to_string(),
arguments: Some(json!({"title": "Guardian analytics"})),
connector_id: Some("github".to_string()),
connector_name: Some("GitHub".to_string()),
tool_title: Some("Create PR".to_string()),
},
reviewed_action_truncated: false,
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::FailedClosed,
failure_kind: Some(GuardianReviewFailureKind::ParseError),
risk_score: Some(100),
risk_level: Some(GuardianReviewRiskLevel::High),
rationale: Some("Automatic approval review failed".to_string()),
guardian_thread_id: Some("guardian-thread-1".to_string()),
guardian_session_kind: Some(GuardianReviewSessionKind::EphemeralForked),
guardian_model: Some("gpt-5.4".to_string()),
guardian_reasoning_effort: Some("low".to_string()),
had_prior_review_context: Some(true),
review_timeout_ms: 90_000,
guardian_tool_call_count: tool_counts.total(),
guardian_tool_call_counts: tool_counts,
guardian_time_to_first_token_ms: Some(123),
guardian_completion_latency_ms: Some(456),
started_at: 1_716_000_000,
completed_at: Some(1_716_000_001),
input_tokens: Some(10),
cached_input_tokens: Some(2),
output_tokens: Some(3),
reasoning_output_tokens: Some(1),
total_tokens: Some(13),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize guardian review event");
assert_eq!(
payload,
json!([{
"event_type": "codex_guardian_review",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"review_id": "review-1",
"target_item_id": "tool-1",
"product_client_id": "codex_app",
"trigger": "mcp_tool_call",
"retry_reason": "requires approval",
"delegated_review": true,
"reviewed_action": {
"type": "mcp_tool_call",
"server": "github",
"tool_name": "create_pr",
"arguments": {"title": "Guardian analytics"},
"connector_id": "github",
"connector_name": "GitHub",
"tool_title": "Create PR"
},
"reviewed_action_truncated": false,
"decision": "denied",
"terminal_status": "failed_closed",
"failure_kind": "parse_error",
"risk_score": 100,
"risk_level": "high",
"rationale": "Automatic approval review failed",
"guardian_thread_id": "guardian-thread-1",
"guardian_session_kind": "ephemeral_forked",
"guardian_model": "gpt-5.4",
"guardian_reasoning_effort": "low",
"had_prior_review_context": true,
"review_timeout_ms": 90000,
"guardian_tool_call_count": 3,
"guardian_tool_call_counts": {
"shell": 1,
"unified_exec": 0,
"mcp": 2,
"dynamic": 0,
"apply_patch": 0,
"web_search": 0,
"image_generation": 0,
"view_image": 0
},
"guardian_time_to_first_token_ms": 123,
"guardian_completion_latency_ms": 456,
"started_at": 1716000000,
"completed_at": 1716000001,
"input_tokens": 10,
"cached_input_tokens": 2,
"output_tokens": 3,
"reasoning_output_tokens": 1,
"total_tokens": 13
}
}])
);
}
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
PluginTelemetryMetadata {
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),

View File

@@ -7,6 +7,7 @@ use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::GuardianReviewEventParams;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
use crate::facts::SkillInvocation;
@@ -151,6 +152,12 @@ impl AnalyticsEventsClient {
));
}
pub fn track_guardian_review(&self, input: GuardianReviewEventParams) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(
Box::new(input),
)));
}
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
if mentions.is_empty() {
return;

View File

@@ -1,4 +1,5 @@
use crate::facts::AppInvocation;
use crate::facts::GuardianReviewEventParams;
use crate::facts::InvocationType;
use crate::facts::PluginState;
use crate::facts::SubAgentThreadStartedInput;
@@ -35,6 +36,7 @@ pub(crate) struct TrackEventsRequest {
pub(crate) enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
ThreadInitialized(ThreadInitializedEvent),
GuardianReview(Box<GuardianReviewEventRequest>),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
@@ -99,6 +101,12 @@ pub(crate) struct ThreadInitializedEvent {
pub(crate) event_params: ThreadInitializedEventParams,
}
#[derive(Serialize)]
pub(crate) struct GuardianReviewEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: GuardianReviewEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexAppMetadata {
pub(crate) connector_id: Option<String>,

View File

@@ -6,6 +6,9 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::approvals::NetworkApprovalProtocol;
use codex_protocol::models::PermissionProfile;
use codex_protocol::models::SandboxPermissions;
use codex_protocol::protocol::SkillScope;
use codex_protocol::protocol::SubAgentSource;
use serde::Serialize;
@@ -89,6 +92,7 @@ pub(crate) enum AnalyticsFact {
pub(crate) enum CustomAnalyticsFact {
SubAgentThreadStarted(SubAgentThreadStartedInput),
GuardianReview(Box<GuardianReviewEventParams>),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),
@@ -128,3 +132,175 @@ pub(crate) enum PluginState {
Enabled,
Disabled,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewTrigger {
Shell,
UnifiedExec,
Execve,
ApplyPatch,
NetworkAccess,
McpToolCall,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewDecision {
Approved,
Denied,
Aborted,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewTerminalStatus {
Approved,
Denied,
Aborted,
TimedOut,
FailedClosed,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewFailureKind {
Timeout,
Cancelled,
PromptBuildError,
SessionError,
ParseError,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewSessionKind {
TrunkSpawned,
TrunkReused,
EphemeralForked,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum GuardianReviewRiskLevel {
Low,
Medium,
High,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct GuardianToolCallCounts {
pub shell: u64,
pub unified_exec: u64,
pub mcp: u64,
pub dynamic: u64,
pub apply_patch: u64,
pub web_search: u64,
pub image_generation: u64,
pub view_image: u64,
}
impl GuardianToolCallCounts {
pub fn total(&self) -> u64 {
self.shell
+ self.unified_exec
+ self.mcp
+ self.dynamic
+ self.apply_patch
+ self.web_search
+ self.image_generation
+ self.view_image
}
}
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum GuardianReviewedAction {
Shell {
command: Vec<String>,
command_display: String,
cwd: String,
sandbox_permissions: SandboxPermissions,
additional_permissions: Option<PermissionProfile>,
justification: Option<String>,
},
UnifiedExec {
command: Vec<String>,
command_display: String,
cwd: String,
sandbox_permissions: SandboxPermissions,
additional_permissions: Option<PermissionProfile>,
justification: Option<String>,
tty: bool,
},
Execve {
source: GuardianCommandSource,
program: String,
argv: Vec<String>,
cwd: String,
additional_permissions: Option<PermissionProfile>,
},
ApplyPatch {
cwd: String,
files: Vec<String>,
patch: Option<String>,
},
NetworkAccess {
target: String,
host: String,
protocol: NetworkApprovalProtocol,
port: u16,
},
McpToolCall {
server: String,
tool_name: String,
arguments: Option<serde_json::Value>,
connector_id: Option<String>,
connector_name: Option<String>,
tool_title: Option<String>,
},
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianCommandSource {
Shell,
UnifiedExec,
}
#[derive(Clone, Debug, Serialize)]
pub struct GuardianReviewEventParams {
pub thread_id: String,
pub turn_id: String,
pub review_id: String,
pub target_item_id: String,
pub product_client_id: Option<String>,
pub trigger: GuardianReviewTrigger,
pub retry_reason: Option<String>,
pub delegated_review: bool,
pub reviewed_action: GuardianReviewedAction,
pub reviewed_action_truncated: bool,
pub decision: GuardianReviewDecision,
pub terminal_status: GuardianReviewTerminalStatus,
pub failure_kind: Option<GuardianReviewFailureKind>,
pub risk_score: Option<u8>,
pub risk_level: Option<GuardianReviewRiskLevel>,
pub rationale: Option<String>,
pub guardian_thread_id: Option<String>,
pub guardian_session_kind: Option<GuardianReviewSessionKind>,
pub guardian_model: Option<String>,
pub guardian_reasoning_effort: Option<String>,
pub had_prior_review_context: Option<bool>,
pub review_timeout_ms: u64,
pub guardian_tool_call_count: u64,
pub guardian_tool_call_counts: GuardianToolCallCounts,
pub guardian_time_to_first_token_ms: Option<u64>,
pub guardian_completion_latency_ms: Option<u64>,
pub started_at: u64,
pub completed_at: Option<u64>,
pub input_tokens: Option<i64>,
pub cached_input_tokens: Option<i64>,
pub output_tokens: Option<i64>,
pub reasoning_output_tokens: Option<i64>,
pub total_tokens: Option<i64>,
}

View File

@@ -6,6 +6,16 @@ mod reducer;
pub use client::AnalyticsEventsClient;
pub use events::AppServerRpcTransport;
pub use facts::AppInvocation;
pub use facts::GuardianCommandSource;
pub use facts::GuardianReviewDecision;
pub use facts::GuardianReviewEventParams;
pub use facts::GuardianReviewFailureKind;
pub use facts::GuardianReviewRiskLevel;
pub use facts::GuardianReviewSessionKind;
pub use facts::GuardianReviewTerminalStatus;
pub use facts::GuardianReviewTrigger;
pub use facts::GuardianReviewedAction;
pub use facts::GuardianToolCallCounts;
pub use facts::InvocationType;
pub use facts::SkillInvocation;
pub use facts::SubAgentThreadStartedInput;

View File

@@ -5,6 +5,7 @@ use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::GuardianReviewEventRequest;
use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializationMode;
@@ -81,6 +82,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::GuardianReview(input) => {
self.ingest_guardian_review(*input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
@@ -135,6 +139,19 @@ impl AnalyticsReducer {
));
}
fn ingest_guardian_review(
&mut self,
input: crate::facts::GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
out.push(TrackEventRequest::GuardianReview(Box::new(
GuardianReviewEventRequest {
event_type: "codex_guardian_review",
event_params: input,
},
)));
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,

View File

@@ -15,6 +15,7 @@ mod approval_request;
mod prompt;
mod review;
mod review_session;
mod review_session_analytics;
use std::time::Duration;

View File

@@ -336,15 +336,13 @@ pub(super) async fn run_guardian_review_session(
})
.await
{
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) => {
GuardianReviewOutcome::Completed(parse_guardian_assessment(
GuardianReviewSessionOutcome::Completed { result, report: _ } => match result {
Ok(last_agent_message) => GuardianReviewOutcome::Completed(parse_guardian_assessment(
last_agent_message.as_deref(),
))
}
GuardianReviewSessionOutcome::Completed(Err(err)) => {
GuardianReviewOutcome::Completed(Err(err))
}
GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut,
GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted,
)),
Err(err) => GuardianReviewOutcome::Completed(Err(err)),
},
GuardianReviewSessionOutcome::TimedOut { report: _ } => GuardianReviewOutcome::TimedOut,
GuardianReviewSessionOutcome::Aborted { report: _ } => GuardianReviewOutcome::Aborted,
}
}

View File

@@ -5,8 +5,11 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use anyhow::anyhow;
use codex_analytics::GuardianReviewSessionKind;
use codex_analytics::GuardianToolCallCounts;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::DeveloperInstructions;
@@ -19,6 +22,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use serde_json::Value;
use tokio::sync::Mutex;
@@ -42,6 +46,10 @@ use codex_model_provider_info::ModelProviderInfo;
use super::GUARDIAN_REVIEW_TIMEOUT;
use super::GUARDIAN_REVIEWER_NAME;
use super::prompt::guardian_policy_prompt;
use super::review_session_analytics::GuardianReviewSessionReport;
use super::review_session_analytics::duration_millis_u64;
use super::review_session_analytics::guardian_event_records_time_to_first_token;
use super::review_session_analytics::record_guardian_tool_call_count;
const GUARDIAN_INTERRUPT_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
const GUARDIAN_FOLLOWUP_REVIEW_REMINDER: &str = concat!(
@@ -52,10 +60,18 @@ const GUARDIAN_FOLLOWUP_REVIEW_REMINDER: &str = concat!(
);
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum GuardianReviewSessionOutcome {
Completed(anyhow::Result<Option<String>>),
TimedOut,
Aborted,
Completed {
result: anyhow::Result<Option<String>>,
report: Option<GuardianReviewSessionReport>,
},
TimedOut {
report: Option<GuardianReviewSessionReport>,
},
Aborted {
report: Option<GuardianReviewSessionReport>,
},
}
pub(crate) struct GuardianReviewSessionParams {
@@ -248,9 +264,11 @@ impl GuardianReviewSessionManager {
&self,
params: GuardianReviewSessionParams,
) -> GuardianReviewSessionOutcome {
let request_started_at = Instant::now();
let deadline = tokio::time::Instant::now() + GUARDIAN_REVIEW_TIMEOUT;
let next_reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(&params.spawn_config);
let mut stale_trunk_to_shutdown = None;
let mut trunk_session_kind = GuardianReviewSessionKind::TrunkReused;
let trunk_candidate = match run_before_review_deadline(
deadline,
params.external_cancel.as_ref(),
@@ -284,11 +302,15 @@ impl GuardianReviewSessionManager {
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => {
return GuardianReviewSessionOutcome::Completed(Err(err));
return GuardianReviewSessionOutcome::Completed {
result: Err(err),
report: None,
};
}
Err(outcome) => return outcome,
};
state.trunk = Some(Arc::clone(&review_session));
trunk_session_kind = GuardianReviewSessionKind::TrunkSpawned;
}
state.trunk.as_ref().cloned()
@@ -301,9 +323,12 @@ impl GuardianReviewSessionManager {
}
let Some(trunk) = trunk_candidate else {
return GuardianReviewSessionOutcome::Completed(Err(anyhow!(
"guardian review session was not available after spawn"
)));
return GuardianReviewSessionOutcome::Completed {
result: Err(anyhow!(
"guardian review session was not available after spawn"
)),
report: None,
};
};
if trunk.reuse_key != next_reuse_key {
@@ -313,6 +338,7 @@ impl GuardianReviewSessionManager {
next_reuse_key,
deadline,
/*initial_history*/ None,
request_started_at,
)
.await;
}
@@ -322,14 +348,34 @@ impl GuardianReviewSessionManager {
Err(_) => {
let initial_history = trunk.fork_initial_history().await;
return self
.run_ephemeral_review(params, next_reuse_key, deadline, initial_history)
.run_ephemeral_review(
params,
next_reuse_key,
deadline,
initial_history,
request_started_at,
)
.await;
}
};
let (outcome, keep_review_session) =
run_review_on_session(trunk.as_ref(), &params, deadline).await;
if keep_review_session && matches!(outcome, GuardianReviewSessionOutcome::Completed(_)) {
let (outcome, keep_review_session) = run_review_on_session(
trunk.as_ref(),
&params,
deadline,
trunk_session_kind,
request_started_at,
)
.await;
if keep_review_session
&& matches!(
outcome,
GuardianReviewSessionOutcome::Completed {
result: _,
report: _
}
)
{
trunk.refresh_last_committed_rollout_items().await;
}
drop(trunk_guard);
@@ -420,6 +466,7 @@ impl GuardianReviewSessionManager {
reuse_key: GuardianReviewSessionReuseKey,
deadline: tokio::time::Instant,
initial_history: Option<InitialHistory>,
request_started_at: Instant,
) -> GuardianReviewSessionOutcome {
let spawn_cancel_token = CancellationToken::new();
let mut fork_config = params.spawn_config.clone();
@@ -439,7 +486,12 @@ impl GuardianReviewSessionManager {
.await
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => return GuardianReviewSessionOutcome::Completed(Err(err)),
Ok(Err(err)) => {
return GuardianReviewSessionOutcome::Completed {
result: Err(err),
report: None,
};
}
Err(outcome) => return outcome,
};
self.register_active_ephemeral(Arc::clone(&review_session))
@@ -447,7 +499,14 @@ impl GuardianReviewSessionManager {
let mut cleanup =
EphemeralReviewCleanup::new(Arc::clone(&self.state), Arc::clone(&review_session));
let (outcome, _) = run_review_on_session(review_session.as_ref(), &params, deadline).await;
let (outcome, _) = run_review_on_session(
review_session.as_ref(),
&params,
deadline,
GuardianReviewSessionKind::EphemeralForked,
request_started_at,
)
.await;
if let Some(review_session) = self.take_active_ephemeral(&review_session).await {
cleanup.disarm();
review_session.shutdown_in_background();
@@ -490,8 +549,11 @@ async fn run_review_on_session(
review_session: &GuardianReviewSession,
params: &GuardianReviewSessionParams,
deadline: tokio::time::Instant,
session_kind: GuardianReviewSessionKind,
request_started_at: Instant,
) -> (GuardianReviewSessionOutcome, bool) {
if review_session.has_prior_review.load(Ordering::Relaxed) {
let had_prior_review_context = review_session.has_prior_review.load(Ordering::Relaxed);
if had_prior_review_context {
append_guardian_followup_reminder(review_session).await;
}
@@ -534,14 +596,37 @@ async fn run_review_on_session(
};
if let Err(err) = submit_result {
return (
GuardianReviewSessionOutcome::Completed(Err(err.into())),
GuardianReviewSessionOutcome::Completed {
result: Err(err.into()),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
GuardianToolCallCounts::default(),
/*time_to_first_token_ms*/ None,
/*token_usage*/ None,
)),
},
false,
);
}
let outcome =
wait_for_guardian_review(review_session, deadline, params.external_cancel.as_ref()).await;
if matches!(outcome.0, GuardianReviewSessionOutcome::Completed(_)) {
let outcome = wait_for_guardian_review(
review_session,
deadline,
params.external_cancel.as_ref(),
session_kind,
had_prior_review_context,
request_started_at,
)
.await;
if matches!(
outcome.0,
GuardianReviewSessionOutcome::Completed {
result: _,
report: _
}
) {
review_session
.has_prior_review
.store(true, Ordering::Relaxed);
@@ -575,16 +660,34 @@ async fn wait_for_guardian_review(
review_session: &GuardianReviewSession,
deadline: tokio::time::Instant,
external_cancel: Option<&CancellationToken>,
session_kind: GuardianReviewSessionKind,
had_prior_review_context: bool,
request_started_at: Instant,
) -> (GuardianReviewSessionOutcome, bool) {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
let mut last_error_message: Option<String> = None;
let mut tool_call_counts = GuardianToolCallCounts::default();
let mut time_to_first_token_ms = None;
let mut token_usage = None;
loop {
tokio::select! {
_ = &mut timeout => {
let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok();
return (GuardianReviewSessionOutcome::TimedOut, keep_review_session);
return (
GuardianReviewSessionOutcome::TimedOut {
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
keep_review_session,
);
}
_ = async {
if let Some(cancel_token) = external_cancel {
@@ -594,22 +697,69 @@ async fn wait_for_guardian_review(
}
} => {
let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok();
return (GuardianReviewSessionOutcome::Aborted, keep_review_session);
return (
GuardianReviewSessionOutcome::Aborted {
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
keep_review_session,
);
}
event = review_session.codex.next_event() => {
match event {
Ok(event) => match event.msg {
Ok(event) => {
if time_to_first_token_ms.is_none()
&& guardian_event_records_time_to_first_token(&event.msg)
{
time_to_first_token_ms = Some(duration_millis_u64(
request_started_at.elapsed(),
));
}
record_guardian_tool_call_count(&event.msg, &mut tool_call_counts);
if let EventMsg::TokenCount(token_count) = &event.msg
&& let Some(info) = token_count.info.as_ref()
{
token_usage = Some(info.last_token_usage.clone());
}
match event.msg {
EventMsg::TurnComplete(turn_complete) => {
if turn_complete.last_agent_message.is_none()
&& let Some(error_message) = last_error_message
{
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))),
GuardianReviewSessionOutcome::Completed {
result: Err(anyhow!(error_message)),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
true,
);
}
return (
GuardianReviewSessionOutcome::Completed(Ok(turn_complete.last_agent_message)),
GuardianReviewSessionOutcome::Completed {
result: Ok(turn_complete.last_agent_message),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
true,
);
}
@@ -617,13 +767,36 @@ async fn wait_for_guardian_review(
last_error_message = Some(error.message);
}
EventMsg::TurnAborted(_) => {
return (GuardianReviewSessionOutcome::Aborted, true);
return (
GuardianReviewSessionOutcome::Aborted {
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
true,
);
}
_ => {}
},
}
}
Err(err) => {
return (
GuardianReviewSessionOutcome::Completed(Err(err.into())),
GuardianReviewSessionOutcome::Completed {
result: Err(err.into()),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
false,
);
}
@@ -633,6 +806,29 @@ async fn wait_for_guardian_review(
}
}
fn guardian_review_session_report(
review_session: &GuardianReviewSession,
session_kind: GuardianReviewSessionKind,
had_prior_review_context: bool,
tool_call_counts: GuardianToolCallCounts,
time_to_first_token_ms: Option<u64>,
token_usage: Option<TokenUsage>,
) -> GuardianReviewSessionReport {
GuardianReviewSessionReport {
guardian_thread_id: review_session.codex.session.conversation_id.to_string(),
session_kind,
guardian_model: review_session.reuse_key.model.clone(),
guardian_reasoning_effort: review_session
.reuse_key
.model_reasoning_effort
.map(|effort| effort.to_string()),
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
}
}
pub(crate) fn build_guardian_review_session_config(
parent_config: &Config,
live_network_config: Option<codex_network_proxy::NetworkProxyConfig>,
@@ -694,7 +890,9 @@ async fn run_before_review_deadline<T>(
future: impl Future<Output = T>,
) -> Result<T, GuardianReviewSessionOutcome> {
tokio::select! {
_ = tokio::time::sleep_until(deadline) => Err(GuardianReviewSessionOutcome::TimedOut),
_ = tokio::time::sleep_until(deadline) => Err(GuardianReviewSessionOutcome::TimedOut {
report: None,
}),
result = future => Ok(result),
_ = async {
if let Some(cancel_token) = external_cancel {
@@ -702,7 +900,9 @@ async fn run_before_review_deadline<T>(
} else {
std::future::pending::<()>().await;
}
} => Err(GuardianReviewSessionOutcome::Aborted),
} => Err(GuardianReviewSessionOutcome::Aborted {
report: None,
}),
}
}
@@ -788,7 +988,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::TimedOut)
Err(GuardianReviewSessionOutcome::TimedOut { report: None })
));
}
@@ -810,7 +1010,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::Aborted)
Err(GuardianReviewSessionOutcome::Aborted { report: None })
));
}
@@ -830,7 +1030,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::TimedOut)
Err(GuardianReviewSessionOutcome::TimedOut { report: None })
));
assert!(cancel_token.is_cancelled());
}
@@ -855,7 +1055,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::Aborted)
Err(GuardianReviewSessionOutcome::Aborted { report: None })
));
assert!(cancel_token.is_cancelled());
}

View File

@@ -0,0 +1,148 @@
use std::time::Duration;
use codex_analytics::GuardianReviewSessionKind;
use codex_analytics::GuardianToolCallCounts;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::TokenUsage;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct GuardianReviewSessionReport {
pub(crate) guardian_thread_id: String,
pub(crate) session_kind: GuardianReviewSessionKind,
pub(crate) guardian_model: Option<String>,
pub(crate) guardian_reasoning_effort: Option<String>,
pub(crate) had_prior_review_context: bool,
pub(crate) tool_call_counts: GuardianToolCallCounts,
pub(crate) time_to_first_token_ms: Option<u64>,
pub(crate) token_usage: Option<TokenUsage>,
}
pub(super) fn duration_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
pub(super) fn guardian_event_records_time_to_first_token(event: &EventMsg) -> bool {
matches!(
event,
EventMsg::AgentMessage(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::DynamicToolCallRequest(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::ImageGenerationBegin(_)
| EventMsg::ViewImageToolCall(_)
)
}
pub(super) fn record_guardian_tool_call_count(
event: &EventMsg,
counts: &mut GuardianToolCallCounts,
) {
match event {
EventMsg::ExecCommandBegin(begin) => match begin.source {
ExecCommandSource::Agent | ExecCommandSource::UserShell => {
counts.shell += 1;
}
ExecCommandSource::UnifiedExecStartup | ExecCommandSource::UnifiedExecInteraction => {
counts.unified_exec += 1;
}
},
EventMsg::McpToolCallBegin(_) => {
counts.mcp += 1;
}
EventMsg::DynamicToolCallRequest(_) => {
counts.dynamic += 1;
}
EventMsg::PatchApplyBegin(_) => {
counts.apply_patch += 1;
}
EventMsg::WebSearchBegin(_) => {
counts.web_search += 1;
}
EventMsg::ImageGenerationBegin(_) => {
counts.image_generation += 1;
}
EventMsg::ViewImageToolCall(_) => {
counts.view_image += 1;
}
EventMsg::Error(_)
| EventMsg::Warning(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationRealtime(_)
| EventMsg::RealtimeConversationClosed(_)
| EventMsg::ModelReroute(_)
| EventMsg::ContextCompacted(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_)
| EventMsg::TokenCount(_)
| EventMsg::AgentMessage(_)
| EventMsg::UserMessage(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::McpStartupUpdate(_)
| EventMsg::McpStartupComplete(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ImageGenerationEnd(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::TerminalInteraction(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::RequestPermissions(_)
| EventMsg::RequestUserInput(_)
| EventMsg::DynamicToolCallResponse(_)
| EventMsg::ElicitationRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::GuardianAssessment(_)
| EventMsg::DeprecationNotice(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::UndoStarted(_)
| EventMsg::UndoCompleted(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListSkillsResponse(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::ShutdownComplete
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::HookStarted(_)
| EventMsg::HookCompleted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::CollabAgentSpawnBegin(_)
| EventMsg::CollabAgentSpawnEnd(_)
| EventMsg::CollabAgentInteractionBegin(_)
| EventMsg::CollabAgentInteractionEnd(_)
| EventMsg::CollabWaitingBegin(_)
| EventMsg::CollabWaitingEnd(_)
| EventMsg::CollabCloseBegin(_)
| EventMsg::CollabCloseEnd(_)
| EventMsg::CollabResumeBegin(_)
| EventMsg::CollabResumeEnd(_) => {}
}
}